http_server: Application and Request don't use ctx

This commit is contained in:
Izalia Mae 2021-09-22 13:24:09 -04:00
parent 9f175b00fc
commit aff4a03f6d
11 changed files with 363 additions and 91 deletions

View file

@ -1,85 +0,0 @@
import typing
class DBusType(typing.NewType):
def __init__(self, name, type, string):
super().__init__(name, type)
self.name = name
self.type = type
self.string = string
def __str__(self):
return self.string
class Dict(DBusType):
def __init__(key=Str, value=Str):
super().__init__('Dict', dict, None)
self.key = key
self.value = value
# I'm pretty sure there's an easier way to do f-strings without parsing curly brackets, but I'm not sure how atm
def __str__(self):
return '{' + f'{self.key}{self.value}' + '}'
class List(DBusType):
def __init__(*types):
super().__init__('List', list, None)
self.types = types
def __str__(self):
types = ''.join(self.types)
return f'a{types}'
def Set(List):
def __init__(*types):
super().__init__('Set', set, None)
self.types = types
def Tuple(List):
def __init__(*types):
super().__init__('Tuple', Tuple, None)
self.types = types
Str = DBusType('String', str, 's')
Byte = DBusType('Byte', bytes, 'y')
Bool = DBusType('Boolean', bool, 'b')
Float = DBusType('Float', float, 'd')
Int = DbusType('Int64', int, 'x')
Int16 = DBusType('Int16', int, 'n')
Int32 = DbusType('Int32', int, 'i')
Int64 = DbusType('Int64', int, 'x')
Uint16 = DBusType('Uint16', int, 'q')
Uint32 = DBusType('Uint32', int, 'u')
Uint64 = DBusType('Uint64', int, 't')
__all__ = [
'Any',
'Bytes',
'Dict',
'Float',
'Int',
'Int16',
'Int32',
'Int64',
'List',
'Set',
'Str',
'Tuple',
'Uint16',
'Uint32',
'Uint64'
]

View file

@ -1 +0,0 @@
from .hasher import PasswordHasher

View file

@ -30,6 +30,9 @@ log_ext_ignore = [
frontend = Path(__file__).resolve.parent.join('frontend')
class Application(sanic.Sanic):
_extra = DotDict()
def __init__(self, class_views=[], **kwargs):
self.cfg = Config(**kwargs)
@ -72,6 +75,37 @@ class Application(sanic.Sanic):
self.start = self.run
def __getattr__(self, key):
if key in self.slots:
return super().__getattribute__(key)
return self._extra[key]
def __setattr__(self, key, value):
if key in self.slots:
super().__setattr__(key, value)
else:
self._extra[key] = value
def __getitem__(self, key):
return self._extra[key]
def __setitem__(self, key, value):
self._extra[key] = value
@property
def slots(self):
try:
return super().__getattribute__('__fake_slots__')
except:
return super().__getattribute__('__slots__')
def add_class_route(self, cls):
for route in cls.paths:
self.add_route(cls.as_view(), route)

View file

@ -1,5 +1,6 @@
import sanic
from functools import cached_property
from izzylib import DotDict
from urllib.parse import parse_qsl
@ -7,17 +8,53 @@ from .misc import Headers
class Request(sanic.request.Request):
_extra = DotDict()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.Headers = Headers(self.headers)
self.address = self.headers.get('x-real-ip', self.forwarded.get('for', self.remote_addr))
self.host = self.headers.get('host')
self.raw_headers = DotDict()
self.data = Data(self)
self.template = self.app.template
self.user_level = 0
self.setup()
def __getattr__(self, key):
if key in self.slots:
return super().__getattribute__(key)
return self._extra[key]
def __setattr__(self, key, value):
if key in self.slots:
super().__setattr__(key, value)
else:
self._extra[key] = value
def __getitem__(self, key):
return self._extra[key]
def __setitem__(self, key, value):
self._extra[key] = value
@cached_property
def slots(self):
try:
return super().__getattribute__('__fake_slots__')
except:
return super().__getattribute__('__slots__')
def setup(self):
pass
@ -49,6 +86,16 @@ class Request(sanic.request.Request):
return False
def headers_as_dict(self):
headers = DotDict()
for key in self.headers:
#headers[key] = ';'.join(self.headers.getall(key))
headers[key] = self.headers.getone(key)
return headers
class Data(object):
def __init__(self, request):
self.request = request

184
izzylib/http_signatures.py Normal file
View file

@ -0,0 +1,184 @@
import json, requests, sys
from PIL import Image
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from base64 import b64decode, b64encode
from datetime import datetime
from functools import lru_cache
from izzylib import DefaultDotDict, DotDict
from izzylib import izzylog
from tldextract import extract
from urllib.parse import urlparse
def generate_rsa_key():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key
def parse_signature(signature: str):
if not signature:
return
raise AssertionError('Missing signature header')
split_sig = signature.split(',')
sig = DefaultDotDict()
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
sig.headers = sig.headers.split()
sig.domain = urlparse(sig.keyid).netloc
sig.top_domain = '.'.join(extract(sig.domain)[1:])
sig.actor = sig.keyid.split('#')[0]
return sig
def verify_headers(headers: dict, method: str, path: str, actor: dict, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
headers = {k.lower(): headers[k] for k in headers}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = parse_signature(headers.get('signature'))
digest = headers.get('digest')
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
raise AssertionError('Missing signature')
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
raise AssertionError(f'Missing headers: {missing_headers}')
## Fail if body verification fails
if digest:
digest_hash = parse_body_digest(headers.get('digest'))
if not verify_string(body, digest_hash.sig, digest_hash.alg):
raise AssertionError('Failed body digest verification')
pubkey = actor.publicKey['publicKeyPem']
return sign_pkcs_headers(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature)
def verify_request(request, actor: dict):
'''Verify a header signature from a SimpleASGI request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
return verify_headers(
headers = request.headers,
method = request.method,
path = request.path,
actor = actor,
body = request.body
)
### Helper functions that shouldn't be used directly ###
def parse_body_digest(digest):
if not digest:
raise AssertionError('Empty digest')
parsed = DotDict()
alg, sig = digest.split('=', 1)
parsed.sig = sig
parsed.alg = alg.replace('-', '')
return parsed
def sign_pkcs_headers(key: str, headers: dict, sig=None):
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
try:
return pkcs.verify(h, b64decode(sig.signature))
except ValueError:
return False
else:
return pkcs.sign(h)
def sign_request(request, privkey, keyid):
assert isinstance(request.body, bytes)
request.set_header('(request-target)', f'{request.method.lower()} {request.url.path}')
request.set_header('host', request.url.host)
request.set_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.set_header('digest', f'SHA-256={body_hash}')
request.set_header('content-length', str(len(request.body)))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(sign_pkcs_headers(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.set_header('signature', sig_string)
request.unset_header('(request-target)')
request.unset_header('host')
return request
def verify_string(string, enc_string, alg='SHA256', fail=False):
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise AssertionError('String failed validation')
else:
return False

View file

@ -4,7 +4,7 @@ from PIL import Image
from base64 import b64encode
from datetime import datetime
from functools import cached_property
from functools import cached_property, lru_cache
from io import BytesIO
from izzylib import DefaultDotDict, DotDict, LowerDotDict, Path, izzylog as logging, __version__
from izzylib.exceptions import HttpFileDownloadedError
@ -13,7 +13,6 @@ from urllib.parse import urlparse
from .request import HttpUrllibRequest
from .response import HttpUrllibResponse
from .signatures import set_client
Client = None
@ -125,4 +124,92 @@ class HttpUrllibClient:
def set_default_client(client=None):
global Client
Client = client or HttpClient()
set_client(Client)
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
try:
actor = resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@lru_cache(maxsize=512)
def fetch_instance(domain):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
headers = {'Accept': 'application/json'}
resp = Client.request(f'https://{domain}/api/v1/instance', headers=headers)
try:
return resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
@lru_cache(maxsize=512)
def fetch_nodeinfo(domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
webfinger = Client.request(f'https://{domain}/.well-known/nodeinfo')
webfinger_data = DotDict(webfinger.body)
for link in webfinger.json.links:
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
nodeinfo = Client.request(nodeinfo_url)
return nodeinfo.json
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
data = DefaultDotDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json.subject.replace('acct:', '').split('@')
for link in webfinger.json.links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data

View file

@ -5,6 +5,7 @@ from izzylib import DotDict, LowerDotDict, Url, boolean
from base64 import b64decode, b64encode
from datetime import datetime
from izzylib import izzylog as logging
from izzylib.http_signatures import sign_request
from .signatures import sign_pkcs_headers
@ -83,6 +84,10 @@ class HttpUrllibRequest:
def sign(self, privkey, keyid):
sign_request(self, privkey, keyid)
def sign2(self, privkey, keyid):
self.unset_header('signature')
self.set_header('(request-target)', f'{self.method.lower()} {self.url.path}')

View file

@ -49,9 +49,10 @@ hasher =
http_server =
sanic == 21.6.2
envbash == 1.2.0
http_signatures =
pycryptodome == 3.10.1
http_urllib_client =
pillow == 8.3.2
pycryptodome == 3.10.1
urllib3 == 1.26.6
tldextract == 3.1.2
sql =