uncia/uncia/middleware.py

112 lines
3 KiB
Python

from izzylib import logging
from izzylib.http_server_async import error
from izzylib.http_signatures import parse_signature, verify_request, verify_headers
from izzylib.sql.rows import Row
from .database import db
from .functions import fetch_actor
auth_paths = [
'/logout',
'/user',
'/admin'
]
# Instances that just shouldn't exist and/or are known to harass others
blocked_instances = [
'kiwifarms.cc',
'neckbeard.xyz',
'gameliberty.club',
'shitposter.club',
'freespeechextremist.com',
'smuglo.li',
'yggdrasil.social',
'gleasonator.com',
'ligma.pro',
'fedi.absturztau.be',
'blob.cat',
'social.i2p.rocks',
'lolicon.rocks',
'pawoo.net',
'baraag.net'
]
def ban_check(s, request):
if any(map(s.get.ban, [None], [request.signature.domain, request.signature.top_domain])):
return True
if not request.actor:
return
handle = request.actor.preferredUsername
if s.get.ban(handle):
return True
if s.get.ban(handle, request.signature.domain):
return True
if s.get.ban(handle, request.signature.top_domain):
return True
async def AuthCheck(request):
validated = False
token = request.headers.getone('token')
with db.session as s:
request.token = s.fetch('token', code=token)
request.user = s.fetch('user', id=request.token.id) if request.token else None
request.signature = parse_signature(request.headers.getone('signature'))
request.instance = None
request.actor = None
if request.signature:
request.instance = s.get.instance(request.signature.domain)
request.actor = fetch_actor(request.signature.actor)
if request.signature.top_domain in blocked_instances:
raise error.Teapot('This teapot kills fascists')
if ban_check(s, request):
raise error.Forbidden('no')
if request.path in ['/inbox', '/actor'] and request.method.lower() == 'post':
## The actor was deleted, so return a 200 to not get the same message over and over again
if request.signature and request.actor == False:
raise error.Ok({'error': 'Could not fetch deleted actor'})
if not request.actor:
raise error.Unauthorized({'error': 'Could not get actor'})
try:
data = await request.json()
except:
logging.verbose('Failed to parse post data')
raise error.BadRequest({'error': 'Invalid data'})
if type(request.actor).__name__ == 'Row':
logging.warning('Actor data is a db row:', request.actor)
raise error.InternalServerError({'error': f'An unknown error happened'})
if not request.instance and data.get('type', '').lower() != 'follow':
raise error.Unauthorized({'error': f'Follow the relay first'})
#validated = verify_request(request, request.actor)
validated = verify_headers(
request.headers.as_dict(),
request.method,
request.path,
request.actor,
await request.body())
if not validated:
logging.debug(f'Not validated: {request.signature.actor}')
raise error.Unauthorized({'error': f'Failed signature check'})
if any(map(request.path.startswith, auth_paths)) and not request.user:
raise error.Found('/login')