add ability to run web server for signature verification

This commit is contained in:
Izalia Mae 2024-04-03 12:02:21 -04:00
parent 3fbe5f1555
commit 192313ac71

282
aputils/__main__.py Normal file
View file

@ -0,0 +1,282 @@
#!/usr/bin/env python3
import aputils
import argparse
import json
from functools import cached_property, lru_cache
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError
from urllib.request import Request, urlopen
class ClientError(Exception):
...
class Response(aputils.JsonBase):
def __init__(self,
status: int,
message: str,
method: str,
path: str,
address: str,
valid: bool,
headers: dict[str, Any]) -> None:
aputils.JsonBase.__init__(self,
status = status, # type: ignore
message = message, # type: ignore
method = method, # type: ignore
path = path.split("?", 1)[0], # type: ignore
address = address, # type: ignore
valid = valid, # type: ignore
headers = headers # type: ignore
)
class RequestHandler(BaseHTTPRequestHandler):
default_request_version = "HTTP/1.1"
signature: aputils.Signature
actor: aputils.Message
signer: aputils.Signer
@property
def content_length(self) -> int:
try:
return int(self.headers.get("Content-Length", 0))
except ValueError:
raise ClientError(f"Actor is larger than {args.size_limit} bytes") from None
@property
def method(self) -> str:
return self.command.upper()
@property
def remote(self) -> str:
return self.headers.get("X-Real-Ip", self.headers.get("X-Forwarded-For", self.address_string()))
@cached_property
def parsed_headers(self) -> dict[str, str]:
headers = {}
for key, value in self.headers.items():
key = key.title()
if key.startswith(("X-Forwarded", "X-Real")):
continue
headers[key] = value
return headers
def body(self) -> bytes:
return self.rfile.read(self.content_length or -1)
def log_request(self, status: int, length: int = 0) -> None: # type: ignore
date = self.date_time_string()
path = self.path.split("?", 1)[0]
agent = self.headers.get("User-Agent", "n/a")
message = f"[{date}] {self.remote} \"{self.method} {path}\" {status} {length} {agent}"
print(message, flush = True)
def send(self,
status: int,
message: aputils.JsonBase,
headers: dict[str, str] | None = None) -> None:
data = message.to_json().encode("utf-8")
if headers is None:
headers = {"Content-Type": "application/json"}
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
self.log_request(status, len(data))
self.send_response_only(status, None)
self.send_header("Server", self.version_string())
self.send_header("Date", self.date_time_string())
for key, value in headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(data)
self.wfile.flush()
def send_error(self, status: int, message: str) -> None: # type: ignore
response = Response(
status,
message,
self.method,
self.path,
self.remote,
False,
self.parsed_headers
)
self.send(status, response, {"Content-Type": "application/json"})
def handle_actor(self) -> None:
bio = "Verifies any incoming signatures. Just send any kind of HTTP request to any path."
actor = aputils.JsonBase({
"@context": [
"https://www.w3.org/ns/activitystreams"
],
"type": "Application",
"id": f"{URL}/actor",
"preferredUsername": "relay",
"name": "Signature Verifier",
"summary": bio,
"manuallyApprovesFollowers": True,
"inbox": f"{URL}/inbox",
"url": f"{URL}/",
"endpoints": {
"sharedInbox": f"{URL}/inbox"
},
"publicKey": {
"id": keyid,
"owner": f"{URL}/actor",
"publicKeyPem": signer.pubkey
}
})
self.send(200, actor, {"Content-Type": "application/activity+json"})
@lru_cache(maxsize = 1024, typed = True)
def fetch_actor(self, url: str) -> aputils.Message:
try:
url, _ = url.split("#", 1)
except ValueError:
pass
request = signer.sign_request(Request(
url,
method = "GET", headers = {
"User-Agent": f"ApUtils Signature Verifier ({URL})"
}
))
try:
with urlopen(request) as response:
if (length := int(response.headers.get("Content-Length", 0))) <= 0:
raise ClientError("Actor body length is 0")
if length > args.size_limit:
raise ClientError(f"Actor is larger than {args.size_limit} bytes")
return aputils.Message.parse(response.read())
except HTTPError as error:
msg = f"Failed to fetch actor: Status={error.code} Message='{str(error.read())}'"
raise ClientError(msg) from None
except json.JSONDecodeError as error:
raise ClientError(f"Failed to parse actor: {str(error)}") from None
except ValueError:
raise ClientError("Content-Length header is not an integer") from None
def parse_request(self) -> bool:
if not BaseHTTPRequestHandler.parse_request(self):
return False
path = self.path.split("?", 1)[0]
if self.method == "GET" and path == "/actor":
self.handle_actor()
if self.content_length > args.size_limit:
self.send_error(400, f"Incoming message is larger than {args.size_limit} bytes")
if self.method == "GET" and self.content_length:
self.send_error(400, "GET messages should not have a body")
return False
if self.method in {"POST", "PUT"} and self.content_length <= 0:
self.send_error(400, f"{self.method} messages should have a body")
return False
try:
self.signature = aputils.Signature.new_from_headers(self.parsed_headers)
except KeyError:
self.send_error(400, "Missing signature header")
return False
try:
self.actor = self.fetch_actor(self.signature.keyid)
except ClientError as error:
self.send_error(400, str(error))
return False
self.signer = aputils.Signer.new_from_actor(self.actor)
if self.command.upper() == "GET" and self.content_length:
self.send_error(400, "'GET' messages should not have a body")
return False
try:
self.signer.validate_signature(self.method, path, self.parsed_headers)
except aputils.SignatureFailureError as error:
self.send_error(401, str(error))
return False
response = Response(
200,
"HTTP signature is valid :3",
self.method,
self.path,
self.remote,
True,
self.parsed_headers
)
self.send(200, response)
return False
parser = argparse.ArgumentParser(
prog = "aputils",
description = "Starts a server for validating HTTP signatures"
)
parser.add_argument("hostname")
parser.add_argument("--addr", "-a", default = "0.0.0.0")
parser.add_argument("--port", "-p", default = 8080, type = int)
parser.add_argument("--size-limit", "-s", default = 1024 * 1024, type = int)
parser.add_argument("--protocol", "-r", default = "https", choices = ["https", "http"])
args = parser.parse_args()
URL = f"{args.protocol}://{args.hostname}"
keyid = URL + "/actor#main-key"
if not (key := Path("privkey.pem")).exists():
signer = aputils.Signer.new(keyid)
signer.export(key)
else:
signer = aputils.Signer(key, keyid)
server = ThreadingHTTPServer((args.addr, args.port), RequestHandler)
server.serve_forever()