social/barkshark_social/middleware.py
2024-04-18 22:36:06 -04:00

169 lines
3.8 KiB
Python

from __future__ import annotations
import httpx
import logging
import traceback
from aputils import Message, Signature, SignatureFailureError
from basgi import Client, HttpError, Request
from blib import JsonBase
from typing import TYPE_CHECKING
from .database.connection import Connection
from .database.objects import Instance, User
if TYPE_CHECKING:
from .server import Application
COOKIE_PATHS = {
"/@",
"/settings",
"/logout"
}
SIGN_PATHS: dict[str, tuple[str, ...]] = {
"GET": (
"/user",
),
"POST": (
"/actor",
"/relay",
"/user"
)
}
SOFTWARE_NO_API = {
"activityrelay",
"activity-relay",
"aoderelay",
}
PUBLIC_API: dict[str, tuple[str, ...]] = {
"GET": (
"/api/v1/instance",
"/api/v1/instance/peers"
)
}
async def FrontendAuthMiddleware(app: Application, request: Request) -> bool | None:
if request.path.startswith("/static"):
return False
if request.path.endswith((".json", ".css", ".ico")):
return False
if (cookie := request.cookies.get(app.config.cookie_name)) is None:
return False
if cookie.value is None:
return False
with app.database.session(False) as s:
if (token := s.get_cookie(cookie.value)) is None:
return False
if (user := s.get_user_by_id(token.userid)) is None:
return False
request.state.user = user
request.state.token = token
return False
async def ActivitypubAuthMiddleware(app: Application, request: Request) -> bool | None:
signature: Signature | None = None
try:
# todo: set typing for `new_from_headers` to `Mapping`
signature = Signature.new_from_headers(request.headers) # type: ignore[arg-type]
request.state.signature = signature
except KeyError:
pass
if not signature:
return False
with app.database.session(False) as s:
signer = s.get_user_by_id(-99).signer # type: ignore[union-attr]
try:
actor = await app.client.fetch_json(
signature.keyid,
signer = signer,
cls = Message
)
request.state.actor = actor
except httpx.HTTPStatusError as error:
logging.error("Error when sending request: %i", error.response.status_code)
logging.error(error.response.read())
raise HttpError(500, "Failed to fetch actor")
except Exception:
traceback.print_exc()
raise HttpError(500, "Failed to fetch actor")
try:
if not await actor.signer.validate_request_async(request):
logging.error("Failed to validate signature")
raise HttpError(401, "Failed to validate signature")
except SignatureFailureError as e:
raise HttpError(401, str(e))
try:
instance, user = await put_user(app.client, actor, s)
request.state.instance = instance
request.state.user = user
except Exception:
traceback.print_exc()
raise HttpError(500, "Failed to fetch user and instance info")
return False
async def put_user(client: Client, actor: Message, session: Connection) -> tuple[Instance, User]:
instance: Instance
user: User
domain = actor.domain
if (instance_row := session.get_instance(domain)) is None:
nodeinfo = await client.fetch_nodeinfo(domain)
if nodeinfo.sw_name not in SOFTWARE_NO_API and \
None in {nodeinfo["metadata"].get("nodeName"), nodeinfo.get("nodeDescription")}:
api: JsonBase = await client.fetch_json(f"https://{domain}/api/v1/instance")
if "metadata" not in nodeinfo:
nodeinfo["metadata"] = {}
nodeinfo["metadata"].update({
"nodeName": api["title"],
"nodeDescription": api["description"] or api["short_description"]
})
webfinger = await client.fetch_webfinger(actor.username, domain)
with session.transaction():
instance = session.put_instance_data(nodeinfo, webfinger, actor)
else:
instance = instance_row
if (user_row := session.get_user_by_actor(actor.id)) is None:
with session.transaction():
user = session.put_remote_user(actor)
else:
user = user_row
return instance, user