434 lines
11 KiB
Python
434 lines
11 KiB
Python
import functools, json, sys
|
|
|
|
from IzzyLib import logging
|
|
from IzzyLib.misc import DefaultDict, DotDict, Path
|
|
from base64 import b64decode, b64encode
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from ssl import SSLCertVerificationError
|
|
from urllib.error import HTTPError
|
|
from urllib.parse import urlparse
|
|
from urllib.request import Request, urlopen
|
|
|
|
from . import error, __version__
|
|
|
|
try:
|
|
from Crypto.Hash import SHA256
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.Signature import PKCS1_v1_5
|
|
crypto_enabled = True
|
|
except ImportError:
|
|
logging.verbose('Pycryptodome module not found. HTTP header signing and verifying is disabled')
|
|
crypto_enabled = False
|
|
|
|
try:
|
|
from sanic.request import Request as SanicRequest
|
|
except ImportError:
|
|
logging.verbose('Sanic module not found. Request verification is disabled')
|
|
SanicRequest = False
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
logging.verbose('Pillow module not found. Image downloading is disabled')
|
|
Image = False
|
|
|
|
|
|
Client = None
|
|
|
|
|
|
class HttpClient(object):
|
|
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
|
|
proxy_ports = {
|
|
'http': 80,
|
|
'https': 443
|
|
}
|
|
|
|
if proxy_type not in ['http', 'https']:
|
|
raise ValueError(f'Not a valid proxy type: {proxy_type}')
|
|
|
|
self.headers=headers
|
|
self.agent = f'{useragent} ({appagent})' if appagent else useragent
|
|
self.proxy = DotDict({
|
|
'enabled': True if proxy_host else False,
|
|
'ptype': proxy_type,
|
|
'host': proxy_host,
|
|
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
|
|
})
|
|
|
|
self.SetGlobal = SetClient
|
|
|
|
|
|
def __sign_request(self, request, privkey, keyid):
|
|
if not crypto_enabled:
|
|
logging.error('Crypto functions disabled')
|
|
return
|
|
|
|
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
|
|
request.add_header('host', request.host)
|
|
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
|
|
|
|
if request.body:
|
|
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
|
|
request.add_header('digest', f'SHA-256={body_hash}')
|
|
request.add_header('content-length', len(request.body))
|
|
|
|
sig = {
|
|
'keyId': keyid,
|
|
'algorithm': 'rsa-sha256',
|
|
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
|
|
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
|
|
}
|
|
|
|
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
|
|
sig_string = ','.join(sig_items)
|
|
|
|
request.add_header('signature', sig_string)
|
|
|
|
request.remove_header('(request-target)')
|
|
request.remove_header('host')
|
|
|
|
|
|
def __build_request(self, url, data=None, headers={}, method='GET'):
|
|
new_headers = self.headers.copy()
|
|
new_headers.update(headers)
|
|
|
|
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
|
|
|
|
if not parsed_headers.get('user-agent'):
|
|
parsed_headers['user-agent'] = self.agent
|
|
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data)
|
|
|
|
if isinstance(data, str):
|
|
data = data.encode('UTF-8')
|
|
|
|
request = Request(url, data=data, headers=parsed_headers, method=method)
|
|
|
|
if self.proxy.enabled:
|
|
request.set_proxy(f'{self.proxy.host}:{self.proxy.host}', self.proxy.ptype)
|
|
|
|
return request
|
|
|
|
|
|
def request(self, *args, **kwargs):
|
|
request = self.__build_request(*args, **kwargs)
|
|
|
|
try:
|
|
response = urlopen(request)
|
|
|
|
except HTTPError as e:
|
|
response = e.fp
|
|
|
|
except SSLCertVerificationError as e:
|
|
logging.error('HttpClient.request: Certificate error:', e)
|
|
return
|
|
|
|
return HttpResponse(response)
|
|
|
|
|
|
def file(self, url, filepath, *args, filename=None, **kwargs):
|
|
resp = self.request(url, *args, **kwargs)
|
|
|
|
if resp.status != 200:
|
|
logging.error(f'Failed to download {url}:', resp.status, resp.body)
|
|
return False
|
|
|
|
path = Path(filepath)
|
|
|
|
if not path.exists():
|
|
logging.error('Path does not exist:', path)
|
|
return False
|
|
|
|
with path.join(filename).open('wb') as fd:
|
|
fd.write(resp.body)
|
|
return True
|
|
|
|
|
|
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
|
|
if not Image:
|
|
logging.error('Pillow module is not installed')
|
|
return
|
|
|
|
resp = self.request(url, *args, **kwargs)
|
|
|
|
if resp.status != 200:
|
|
logging.error(f'Failed to download {url}:', resp.status, resp.body)
|
|
return False
|
|
|
|
if not filename:
|
|
filename = Path(url).stem()
|
|
|
|
path = Path(filepath)
|
|
|
|
if not path.exists():
|
|
logging.error('Path does not exist:', path)
|
|
return False
|
|
|
|
byte = BytesIO()
|
|
image = Image.open(BytesIO(resp.body))
|
|
image.thumbnail(dimensions)
|
|
image.save(byte, format=ext.upper())
|
|
|
|
with path.join(filename).open('wb') as fd:
|
|
fd.write(byte.getvalue())
|
|
|
|
|
|
def json(self, *args, headers={}, activity=True, **kwargs):
|
|
json_type = 'activity+json' if activity else 'json'
|
|
headers.update({
|
|
'accept': f'application/{json_type}'
|
|
})
|
|
return self.request(*args, headers=headers, **kwargs)
|
|
|
|
|
|
def signed_request(self, privkey, keyid, *args, **kwargs):
|
|
request = self.__build_request(*args, **kwargs)
|
|
|
|
self.__sign_request(request, privkey, keyid)
|
|
|
|
try:
|
|
response = urlopen(request)
|
|
except HTTPError as e:
|
|
response = e
|
|
|
|
return HttpResponse(response)
|
|
|
|
|
|
class HttpResponse(object):
|
|
def __init__(self, response):
|
|
self.body = response.read()
|
|
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
|
|
self.status = response.status
|
|
self.url = response.url
|
|
|
|
|
|
def text(self):
|
|
return self.body.decode('UTF-8')
|
|
|
|
|
|
def json(self, fail=False):
|
|
try:
|
|
return DotDict(self.text())
|
|
except Exception as e:
|
|
if fail:
|
|
raise e from None
|
|
|
|
else:
|
|
return DotDict()
|
|
|
|
|
|
def json_pretty(self, indent=4):
|
|
return json.dumps(self.json().asDict(), indent=indent)
|
|
|
|
|
|
def VerifyRequest(request: SanicRequest, actor: dict):
|
|
'''Verify a header signature from a sanic request
|
|
|
|
request: The request with the headers to verify
|
|
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
|
|
'''
|
|
if not SanicRequest:
|
|
logging.error('Sanic request verification disabled')
|
|
return
|
|
|
|
body = request.body if request.body else None
|
|
return VerifyHeaders(request.headers, request.method, request.path, body, actor)
|
|
|
|
|
|
def VerifyHeaders(headers: dict, method: str, path: str, actor: dict=None, body=None):
|
|
'''Verify a header signature
|
|
|
|
headers: A dictionary containing all the headers from a request
|
|
method: The HTTP method of the request
|
|
path: The path of the HTTP request
|
|
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
|
|
body (optional): The body of the request. Only needed if the signature includes the digest header
|
|
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
|
|
'''
|
|
if not crypto_enabled:
|
|
logging.error('Crypto functions disabled')
|
|
return
|
|
|
|
headers = {k.lower(): v for k,v in headers.items()}
|
|
headers['(request-target)'] = f'{method.lower()} {path}'
|
|
signature = ParseSig(headers.get('signature'))
|
|
digest = ParseBodyDigest(headers.get('digest'))
|
|
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
|
|
|
|
if not signature:
|
|
logging.verbose('Missing signature')
|
|
return False
|
|
|
|
if not actor:
|
|
actor = FetchActor(signature.keyid)
|
|
|
|
## Add digest header to missing headers list if it doesn't exist
|
|
if method.lower() == 'post' and not digest:
|
|
missing_headers.append('digest')
|
|
|
|
## Fail if missing date, host or digest (if POST) headers
|
|
if missing_headers:
|
|
logging.verbose('Missing headers:', missing_headers)
|
|
return False
|
|
|
|
## Fail if body verification fails
|
|
if digest and not VerifyString(body, digest.sig, digest.alg):
|
|
logging.verbose('Failed body digest verification')
|
|
return False
|
|
|
|
pubkey = actor.publicKey['publicKeyPem']
|
|
|
|
if PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature):
|
|
return True
|
|
|
|
logging.verbose('Failed header verification')
|
|
return False
|
|
|
|
|
|
def ParseBodyDigest(digest):
|
|
if not digest:
|
|
return
|
|
|
|
parsed = DotDict()
|
|
parts = digest.split('=', 1)
|
|
|
|
if len(parts) != 2:
|
|
return
|
|
|
|
parsed.sig = parts[1]
|
|
parsed.alg = parts[0].replace('-', '')
|
|
|
|
return parsed
|
|
|
|
|
|
def VerifyString(string, enc_string, alg='SHA256', fail=False):
|
|
if not crypto_enabled:
|
|
logging.error('Crypto functions disabled')
|
|
return
|
|
|
|
if type(string) != bytes:
|
|
string = string.encode('UTF-8')
|
|
|
|
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
|
|
|
|
if body_hash == enc_string:
|
|
return True
|
|
|
|
if fail:
|
|
raise error.VerificationError()
|
|
|
|
else:
|
|
return False
|
|
|
|
|
|
def PkcsHeaders(key: str, headers: dict, sig=None):
|
|
if not crypto_enabled:
|
|
logging.error('Crypto functions disabled')
|
|
return
|
|
|
|
if sig:
|
|
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
|
|
|
|
else:
|
|
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
|
|
|
|
head_string = '\n'.join(head_items)
|
|
head_bytes = head_string.encode('UTF-8')
|
|
|
|
KEY = RSA.importKey(key)
|
|
pkcs = PKCS1_v1_5.new(KEY)
|
|
h = SHA256.new(head_bytes)
|
|
|
|
if sig:
|
|
return pkcs.verify(h, b64decode(sig.signature))
|
|
|
|
else:
|
|
return pkcs.sign(h)
|
|
|
|
|
|
def ParseSig(signature: str):
|
|
if not signature:
|
|
logging.verbose('Missing signature header')
|
|
return
|
|
|
|
split_sig = signature.split(',')
|
|
sig = DefaultDict({})
|
|
|
|
for part in split_sig:
|
|
key, value = part.split('=', 1)
|
|
sig[key.lower()] = value.replace('"', '')
|
|
|
|
if not sig.headers:
|
|
logging.verbose('Missing headers section in signature')
|
|
return
|
|
|
|
sig.headers = sig.headers.split()
|
|
|
|
return sig
|
|
|
|
|
|
def FetchActor(url):
|
|
if not Client:
|
|
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
|
|
return {}
|
|
|
|
url = url.split('#')[0]
|
|
headers = {'Accept': 'application/activity+json'}
|
|
resp = Client.request(url, headers=headers)
|
|
|
|
if not resp.json():
|
|
logging.verbose('functions.FetchActor: Failed to fetch actor:', url)
|
|
logging.debug(f'Error {resp.status}: {resp.body}')
|
|
return {}
|
|
|
|
actor = resp.json()
|
|
actor.web_domain = urlparse(url).netloc
|
|
actor.shared_inbox = actor.inbox
|
|
actor.pubkey = None
|
|
actor.handle = actor.preferredUsername
|
|
|
|
if actor.get('endpoints'):
|
|
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
|
|
|
|
if actor.get('publicKey'):
|
|
actor.pubkey = actor.publicKey.get('publicKeyPem')
|
|
|
|
return actor
|
|
|
|
|
|
@functools.lru_cache(maxsize=512)
|
|
def FetchWebfingerAcct(handle, domain):
|
|
if not Client:
|
|
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
|
|
return {}
|
|
|
|
data = DefaultDict()
|
|
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
|
|
|
|
if not webfinger.body:
|
|
return
|
|
|
|
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
|
|
|
|
for link in webfinger.json().links:
|
|
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
|
|
data.actor = link['href']
|
|
|
|
return data
|
|
|
|
|
|
def SetClient(client=None):
|
|
global Client
|
|
Client = client or HttpClient()
|
|
|
|
|
|
def GenRsaKey():
|
|
privkey = RSA.generate(2048)
|
|
|
|
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
|
|
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
|
|
|
|
return key
|