uncia/uncia/signatures.py
2020-10-10 23:47:08 -04:00

150 lines
3.4 KiB
Python

import json
from base64 import b64decode, b64encode
from urllib.parse import urlparse
from datetime import datetime
import httpsig
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.Signature import PKCS1_v1_5
from .log import logging
from .functions import cache, format_date, fetch
from .database import get
HASHES = {
'sha1': SHA,
'sha256': SHA256,
'sha512': SHA512
}
def ParseSig(headers):
sig_header = headers.get('signature')
if not sig_header:
logging.verbose('Missing signature header')
return
split_sig = sig_header.split(',')
signature = {}
for part in split_sig:
key, value = part.split('=', 1)
signature[key.lower()] = value.replace('"', '')
if not signature.get('headers'):
logging.verbose('Missing headers section in signature')
return
signature['headers'] = signature['headers'].split()
return signature
def build_sigstring(request, used_headers, target=None):
string = ''
if not target:
if type(request) == dict:
headers = request
else:
headers = request.headers.copy()
headers['(request-target)'] = f'{request.method.lower()} {request.path}'
else:
headers = request.copy()
headers['(request-target)'] = target
for header in used_headers:
string += f'{header.lower()}: {headers[header]}'
if header != list(used_headers)[-1]:
string += '\n'
return string
def SignBody(body):
bodyhash = cache.sig.fetch(body)
if not bodyhash:
h = SHA256.new(body.encode('utf-8'))
bodyhash = b64encode(h.digest()).decode('utf-8')
cache.sig[body] = bodyhash
return bodyhash
def ValidateSignature(headers, method, path):
headers = {k.lower(): v for k,v in headers.items()}
signature = ParseSig(headers)
actor_data = fetch(signature['keyid'])
logging.debug(actor_data)
try:
pubkey = actor_data['publicKey']['publicKeyPem']
except Exception as e:
logging.verbose(f'Failed to get public key for actor {signature["keyid"]}')
return
valid = httpsig.HeaderVerifier(headers, pubkey, signature['headers'], method, path, sign_header='signature').verify()
if not valid:
if not isinstance(valid, tuple):
logging.verbose('Signature validation failed for unknown actor')
logging.verbose(valid)
else:
logging.verbose(f'Signature validation failed for actor: {valid[1]}')
return
else:
return True
def ValidateRequest(request):
'''
Validates the headers in a Sanic or Aiohttp request (other frameworks may be supported)
See ValidateSignature for 'client' and 'agent' usage
'''
return ValidateSignature(request.headers, request.method, request.path)
def SignHeaders(headers, key, keyid, url, method='get'):
if headers.get('date'):
del headers['date']
actor_key = get.rsa_key(key)
if not actor_key:
logging.error('Could not find signing key:', key)
return
privkey = actor_key['privkey']
RSAkey = RSA.import_key(privkey)
key_size = int(RSAkey.size_in_bytes()/2)
logging.debug('Signing key size:', key_size)
parsed_url = urlparse(url)
raw_headers = {'date': format_date(), 'host': parsed_url.netloc, '(request-target)': ' '.join([method.lower(), parsed_url.path])}
raw_headers.update(dict(headers))
header_keys = raw_headers.keys()
signer = httpsig.HeaderSigner(keyid, privkey, f'rsa-sha{key_size}', headers=header_keys, sign_header='signature')
new_headers = signer.sign(raw_headers, parsed_url.netloc, method, parsed_url.path)
logging.debug('Signed headers:', new_headers)
del new_headers['(request-target)']
return new_headers