uncia/uncia/middleware.py

55 lines
1.6 KiB
Python

from izzylib import logging
from izzylib.http_requests_client import parse_signature, verify_request
from izzylib.http_server import MiddlewareBase
from .database import db
auth_paths = [
'/logout',
'/user',
'/admin'
]
class AuthCheck(MiddlewareBase):
attach = 'request'
async def handler(self, request, response):
validated = False
token = request.headers.get('token')
with db.session as s:
request.ctx.token = s.fetch('token', code=token)
request.ctx.user = s.fetch('user', id=request.token.id) if request.token else None
request.ctx.signature = parse_signature(request.headers.get('signature'))
request.ctx.instance = None
if request.ctx.signature:
if any([s.get.ban(domain=request.ctx.signature.domain), s.get.ban(domain=request.ctx.signature.top_domain)]):
return response.text(f'BEGONE!', status=403)
request.ctx.instance = s.get.inbox(request.ctx.signature.domain)
if request.path == '/inbox' and request.method.lower() == 'post':
try:
data = request.data.json
except:
logging.verbose('Failed to parse post data')
return response.text(f'Invalid data', status=400)
try:
validated = await verify_request(request)
except AssertionError as e:
logging.debug(f'Failed sig check: {e}')
return response.text(f'Failed signature check: {e}', status=401)
if not request.ctx.instance and data and data.type.lower() != 'follow':
return response.text(f'Follow the relay first', status=401)
if any(map(request.path.startswith, auth_paths)) and not request.ctx.user:
return response.redir('/login')