izzylib/IzzyLib/http.py

648 lines
16 KiB
Python

import functools, json, sys
from IzzyLib import logging
from IzzyLib.misc import DefaultDict, DotDict, Path
from base64 import b64decode, b64encode
from datetime import datetime
from io import BytesIO
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from . import error, __version__
try:
import requests
except ImportError:
logging.verbose('Requests module not found. RequestsClient disabled')
requests = False
try:
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
crypto_enabled = True
except ImportError:
logging.verbose('Pycryptodome module not found. HTTP header signing and verifying is disabled')
crypto_enabled = False
try:
from sanic.request import Request as SanicRequest
except ImportError:
logging.verbose('Sanic module not found. Request verification is disabled')
SanicRequest = False
try:
from PIL import Image
except ImportError:
logging.verbose('Pillow module not found. Image downloading is disabled')
Image = False
Client = None
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class HttpClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def __build_request(self, url, data=None, headers={}, method='GET'):
new_headers = self.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = self.agent
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode('UTF-8')
request = Request(url, data=data, headers=parsed_headers, method=method)
if self.proxy.enabled:
request.set_proxy(f'{self.proxy.host}:{self.proxy.port}', self.proxy.ptype)
return request
def request(self, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
except SSLCertVerificationError as e:
logging.error('HttpClient.request: Certificate error:', e)
return
return HttpResponse(response)
def file(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
path = Path(filepath)
if not path.exists():
logging.error('Path does not exist:', path)
return False
with path.join(filename).open('wb') as fd:
fd.write(resp.body)
return True
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
logging.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
logging.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
self.__sign_request(request, privkey, keyid)
try:
response = urlopen(request)
except HTTPError as e:
response = e
return HttpResponse(response)
class HttpResponse(object):
def __init__(self, response):
self.body = response.read()
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status
self.url = response.url
def text(self):
return self.body.decode('UTF-8')
def json(self, fail=False):
try:
return DotDict(self.text())
except Exception as e:
if fail:
raise e from None
else:
return DotDict()
def json_pretty(self, indent=4):
return json.dumps(self.json().asDict(), indent=indent)
class RequestsClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def request(self, *args, method='get', **kwargs):
if method.lower() not in methods:
raise ValueError(f'Invalid method: {method}')
request = RequestsRequest(self, *args, method=method.lower(), **kwargs)
return RequestsResponse(request.send())
def file(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
return resp.save(filepath)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
logging.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
logging.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, **kwargs):
return self.dict(*args, **kwargs)
def dict(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs).dict
def signed_request(self, privkey, keyid, *args, **kwargs):
request = RequestsRequest(self, *args, **kwargs)
self.__sign_request(request, privkey, keyid)
return RequestsResponse(request.send())
class RequestsRequest(object):
def __init__(self, client, url, data=None, headers={}, query={}, method='get'):
self.args = [url]
self.kwargs = {'params': query}
self.method = method.lower()
self.client = client
new_headers = client.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = client.agent
self.kwargs['headers'] = new_headers
self.kwargs['data'] = data
if client.proxy.enabled:
self.kwargs['proxies'] = {self.proxy.ptype: f'{self.proxy.ptype}://{self.proxy.host}:{self.proxy.port}'}
def send(self):
func = getattr(requests, self.method)
return func(*self.args, **self.kwargs)
class RequestsResponse(object):
def __init__(self, response):
self.response = response
self.data = b''
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status_code
self.url = response.url
def chunks(self, size=256):
return self.response.iter_content(chunk_size=256)
@property
def body(self):
for chunk in self.chunks():
self.data += chunk
return self.data
@property
def text(self):
if not self.data:
return self.body.decode(self.response.encoding)
return self.data.decode(self.response.encoding)
@property
def dict(self):
try:
return DotDict(self.text)
except Exception as e:
return DotDict()
@property
def json(self):
return json.dumps(self.dict)
@property
def json_pretty(self, indent=4):
return json.dumps(self.dict, indent=indent)
def save(self, path, overwrite=True):
path = Path(path)
parent = path.parent()
if not parent.exists():
raise ValueError(f'Path does not exist: {parent}')
if overwrite and path.exists():
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
def VerifyRequest(request: SanicRequest, actor: dict):
'''Verify a header signature from a sanic request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
if not SanicRequest:
logging.error('Sanic request verification disabled')
return
body = request.body if request.body else None
return VerifyHeaders(request.headers, request.method, request.path, body, actor)
def VerifyHeaders(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = ParseSig(headers.get('signature'))
digest = ParseBodyDigest(headers.get('digest'))
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
logging.verbose('Missing signature')
return False
if not actor:
actor = FetchActor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
logging.verbose('Missing headers:', missing_headers)
return False
## Fail if body verification fails
if digest and not VerifyString(body, digest.sig, digest.alg):
logging.verbose('Failed body digest verification')
return False
pubkey = actor.publicKey['publicKeyPem']
if PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature):
return True
logging.verbose('Failed header verification')
return False
def ParseBodyDigest(digest):
if not digest:
return
parsed = DotDict()
parts = digest.split('=', 1)
if len(parts) != 2:
return
parsed.sig = parts[1]
parsed.alg = parts[0].replace('-', '')
return parsed
def VerifyString(string, enc_string, alg='SHA256', fail=False):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise error.VerificationError()
else:
return False
def PkcsHeaders(key: str, headers: dict, sig=None):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def ParseSig(signature: str):
if not signature:
logging.verbose('Missing signature header')
return
split_sig = signature.split(',')
sig = DefaultDict({})
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
if not sig.headers:
logging.verbose('Missing headers section in signature')
return
sig.headers = sig.headers.split()
return sig
def FetchActor(url):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
if not resp.json():
logging.verbose('functions.FetchActor: Failed to fetch actor:', url)
logging.debug(f'Error {resp.status}: {resp.body}')
return {}
actor = resp.json()
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@functools.lru_cache(maxsize=512)
def FetchWebfingerAcct(handle, domain):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
data = DefaultDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
return
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
for link in webfinger.json().links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def SetClient(client=None):
global Client
Client = client or HttpClient()
def GenRsaKey():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key