208 lines
5.3 KiB
Python
208 lines
5.3 KiB
Python
import traceback, urllib3, json
|
|
|
|
from base64 import b64decode, b64encode
|
|
from urllib.parse import urlparse
|
|
from datetime import datetime
|
|
|
|
import httpsig
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.Hash import SHA, SHA256, SHA384, SHA512
|
|
from Crypto.Signature import PKCS1_v1_5
|
|
|
|
from . import logging, __version__
|
|
from .cache import TTLCache, LRUCache
|
|
from .misc import formatUTC
|
|
|
|
|
|
version = '.'.join([str(num) for num in __version__])
|
|
|
|
|
|
class httpClient:
|
|
def __init__(self, pool=100, timeout=30, headers={}, agent=None):
|
|
self.cache = LRUCache()
|
|
self.pool = pool
|
|
self.timeout = timeout
|
|
self.agent = agent if agent else f'IzzyLib/{version}'
|
|
self.headers = headers
|
|
|
|
self.client = urllib3.PoolManager(num_pools=self.pool, timeout=self.timeout)
|
|
self.headers['User-Agent'] = self.agent
|
|
|
|
|
|
def _fetch(self, url, headers={}, method='GET', data=None, cached=True):
|
|
cached_data = self.cache.fetch(url)
|
|
#url = url.split('#')[0]
|
|
|
|
if cached and cached_data:
|
|
logging.debug(f'Returning cached data for {url}')
|
|
return cached_data
|
|
|
|
if not headers.get('User-Agent'):
|
|
headers.update({'User-Agent': self.agent})
|
|
|
|
logging.debug(f'Fetching new data for {url}')
|
|
|
|
try:
|
|
if data:
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data)
|
|
|
|
resp = self.client.request(method, url, headers=headers, body=data)
|
|
|
|
else:
|
|
resp = self.client.request(method, url, headers=headers)
|
|
|
|
except Exception as e:
|
|
logging.debug(f'Failed to fetch url: {e}')
|
|
return
|
|
|
|
if cached:
|
|
logging.debug(f'Caching {url}')
|
|
self.cache.store(url, resp)
|
|
|
|
return resp
|
|
|
|
|
|
def raw(self, *args, **kwargs):
|
|
'''
|
|
Return a response object
|
|
'''
|
|
return self._fetch(*args, **kwargs)
|
|
|
|
|
|
def text(self, *args, **kwargs):
|
|
'''
|
|
Return the body as text
|
|
'''
|
|
resp = self._fetch(*args, **kwargs)
|
|
|
|
return resp.data.decode() if resp else None
|
|
|
|
|
|
def json(self, *args, **kwargs):
|
|
'''
|
|
Return the body as a dict if it's json
|
|
'''
|
|
|
|
headers = kwargs.get('headers')
|
|
|
|
if not headers:
|
|
kwargs['headers'] = {}
|
|
|
|
kwargs['headers'].update({'Accept': 'application/json'})
|
|
resp = self._fetch(*args, **kwargs)
|
|
|
|
try:
|
|
data = json.loads(resp.data.decode())
|
|
|
|
except Exception as e:
|
|
logging.debug(f'Failed to load json: {e}')
|
|
return
|
|
|
|
return data
|
|
|
|
|
|
def ParseSig(headers):
|
|
sig_header = headers.get('signature')
|
|
|
|
if not sig_header:
|
|
logging.verbose('Missing signature header')
|
|
return
|
|
|
|
split_sig = sig_header.split(',')
|
|
signature = {}
|
|
|
|
for part in split_sig:
|
|
key, value = part.split('=', 1)
|
|
signature[key.lower()] = value.replace('"', '')
|
|
|
|
if not signature.get('headers'):
|
|
logging.verbose('Missing headers section in signature')
|
|
return
|
|
|
|
signature['headers'] = signature['headers'].split()
|
|
|
|
return signature
|
|
|
|
|
|
def SignHeaders(headers, keyid, privkey, url, method='GET'):
|
|
'''
|
|
Signs headers and returns them with a signature header
|
|
|
|
headers (dict): Headers to be signed
|
|
keyid (str): Url to the public key used to verify the signature
|
|
privkey (str): Private key used to sign the headers
|
|
url (str): Url of the request for the signed headers
|
|
method (str): Http method of the request for the signed headers
|
|
'''
|
|
|
|
RSAkey = RSA.import_key(privkey)
|
|
key_size = int(RSAkey.size_in_bytes()/2)
|
|
logging.debug('Signing key size:', key_size)
|
|
|
|
parsed_url = urlparse(url)
|
|
logging.debug(parsed_url)
|
|
|
|
raw_headers = {'date': formatUTC(), 'host': parsed_url.netloc, '(request-target)': ' '.join([method, parsed_url.path])}
|
|
raw_headers.update(dict(headers))
|
|
header_keys = raw_headers.keys()
|
|
|
|
signer = httpsig.HeaderSigner(keyid, privkey, f'rsa-sha{key_size}', headers=header_keys, sign_header='signature')
|
|
new_headers = signer.sign(raw_headers, parsed_url.netloc, method, parsed_url.path)
|
|
logging.debug('Signed headers:', new_headers)
|
|
|
|
del new_headers['(request-target)']
|
|
|
|
return new_headers
|
|
|
|
|
|
def ValidateSignature(headers, method, path, client=None, agent=None):
|
|
'''
|
|
Validates the signature header.
|
|
|
|
headers (dict): All of the headers to be used to check a signature. The signature header must be included too
|
|
method (str): The http method used in relation to the headers
|
|
path (str): The path of the request in relation to the headers
|
|
client (pool object): Specify a httpClient to use for fetching the actor. optional
|
|
agent (str): User agent used for fetching actor data. optional
|
|
'''
|
|
|
|
client = httpClient(agent=agent) if not client else client
|
|
headers = {k.lower(): v for k,v in headers.items()}
|
|
|
|
signature = ParseSig(headers)
|
|
|
|
actor_data = client.json(signature['keyid'])
|
|
logging.debug(actor_data)
|
|
|
|
try:
|
|
pubkey = actor_data['publicKey']['publicKeyPem']
|
|
|
|
except Exception as e:
|
|
logging.verbose(f'Failed to get public key for actor {signature["keyid"]}')
|
|
return
|
|
|
|
valid = httpsig.HeaderVerifier(headers, pubkey, signature['headers'], method, path, sign_header='signature').verify()
|
|
|
|
if not valid:
|
|
if not isinstance(valid, tuple):
|
|
logging.verbose('Signature validation failed for unknown actor')
|
|
logging.verbose(valid)
|
|
|
|
else:
|
|
logging.verbose(f'Signature validation failed for actor: {valid[1]}')
|
|
|
|
return
|
|
|
|
else:
|
|
return True
|
|
|
|
|
|
def ValidateRequest(request, client=None, agent=None):
|
|
'''
|
|
Validates the headers in a Sanic or Aiohttp request (other frameworks may be supported)
|
|
See ValidateSignature for 'client' and 'agent' usage
|
|
'''
|
|
return ValidateSignature(request.headers, request.method, request.path, client, agent)
|