izzylib/IzzyLib/http.py

394 lines
10 KiB
Python

import functools, json, sys
from IzzyLib import logging
from IzzyLib.misc import DefaultDict, DotDict
from base64 import b64decode, b64encode
from datetime import datetime
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from . import error, __version__
try:
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
crypto_enabled = True
except ImportError:
logging.verbose('Pycryptodome module not found. HTTP header signing and verifying is disabled')
crypto_enabled = False
try:
from sanic.request import Request as SanicRequest
except ImportError:
logging.verbose('Sanic module not found. Request verification is disabled')
SanicRequest = False
Client = None
class HttpClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def __build_request(self, url, data=None, headers={}, method='GET'):
new_headers = self.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v.lower() for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = self.agent
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode('UTF-8')
request = HttpRequest(url, data=data, headers=parsed_headers, method=method)
if self.proxy.enabled:
request.set_proxy(f'{self.proxy.host}:{self.proxy.host}', self.proxy.ptype)
return request
def request(self, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
except SSLCertVerificationError as e:
logging.error('HttpClient.request: Certificate error:', e)
return
return HttpResponse(response)
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
self.__sign_request(request, privkey, keyid)
try:
response = urlopen(request)
except HTTPError as e:
response = e
return HttpResponse(response)
class HttpRequest(Request):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
parsed = urlparse(self.full_url)
self.scheme = parsed.scheme
self.host = parsed.netloc
self.domain = parsed.hostname
self.port = parsed.port
self.path = parsed.path
self.query = parsed.query
self.body = self.data if self.data else b''
class HttpResponse(object):
def __init__(self, response):
self.body = response.read()
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status
self.url = response.url
def text(self):
return self.body.decode('UTF-8')
def json(self, fail=False):
try:
return DotDict(self.text())
except Exception as e:
if fail:
raise e from None
else:
return DotDict()
def json_pretty(self, indent=4):
return json.dumps(self.json().asDict(), indent=indent)
def VerifyRequest(request: SanicRequest, actor: dict):
'''Verify a header signature from a sanic request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
if not SanicRequest:
logging.error('Sanic request verification disabled')
return
body = request.body if request.body else None
return VerifyHeaders(request.headers, request.method, request.path, body, actor)
def VerifyHeaders(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = ParseSig(headers.get('signature'))
digest = ParseBodyDigest(headers.get('digest'))
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
logging.verbose('Missing signature')
return False
if not actor:
actor = FetchActor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
logging.verbose('Missing headers:', missing_headers)
return False
## Fail if body verification fails
if digest and not VerifyString(body, digest.sig, digest.alg):
logging.verbose('Failed body digest verification')
return False
pubkey = actor.publicKey['publicKeyPem']
if PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature):
return True
logging.verbose('Failed header verification')
return False
def ParseBodyDigest(digest):
if not digest:
return
parsed = DotDict()
parts = digest.split('=', 1)
if len(parts) != 2:
return
parsed.sig = parts[1]
parsed.alg = parts[0].replace('-', '')
return parsed
def VerifyString(string, enc_string, alg='SHA256', fail=False):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise error.VerificationError()
else:
return False
def PkcsHeaders(key: str, headers: dict, sig=None):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def ParseSig(signature: str):
if not signature:
logging.verbose('Missing signature header')
return
split_sig = signature.split(',')
sig = DefaultDict({})
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
if not sig.headers:
logging.verbose('Missing headers section in signature')
return
sig.headers = sig.headers.split()
return sig
def FetchActor(url):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
if not resp.json():
logging.verbose('functions.FetchActor: Failed to fetch actor:', url)
logging.debug(f'Error {resp.status}: {resp.body}')
return {}
actor = resp.json()
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@functools.lru_cache(maxsize=512)
def FetchWebfingerAcct(handle, domain):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
data = DefaultDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
return
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
for link in webfinger.json().links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def SetClient(client=None):
global Client
Client = client or HttpClient()
def GenRsaKey():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key