izzylib/IzzyLib/httpSignatures.py

131 lines
2.9 KiB
Python

'''Functions for working with HTTP signatures
Note: I edited this while tired, so this may be broken'''
from base64 import b64decode, b64encode
from urllib.parse import urlparse
from datetime import datetime
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.Signature import PKCS1_v1_5
from .misc import formatUTC
from . import logging
CACHE = TTLCache(ttl='1h')
HASHES = {
'sha1': SHA,
'sha256': SHA256,
'sha512': SHA512
}
def sign_headers(headers, target, privkey, key_id):
'''sign headers and add the signature to them'''
key = RSA.importKey(privkey)
headers = {k.lower(): v for k, v in headers.items()}
headers['date'] = formatUTC(None)
headers['(request-target)'] = target
used_headers = headers.keys()
sigstring = build_sigstring(headers, used_headers, target=target)
signed_sigstring = sign_sigstring(sigstring, key)
sig = {
'keyId': key_id,
'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers),
'signature': signed_sigstring
}
sig_header = ['{}="{}"'.format(k, v) for k, v in sig.items()]
headers['signature'] = ','.join(sig_header)
del headers['(request-target)']
return headers
def validate(headers, pubkey):
'''validate a signature header from an incoming request'''
sig = parse_sig(headers.get('signature'))
if not sig:
raise MissingData('Missing signature')
pkcs = PKCS1_v1_5.new(RSA.importKey(pubkey))
sigstring = build_sigstring(request, sig['headers'])
logging.debug(f'Signing string: {sigstring}')
signalg, hashalg = sig['algorithm'].split('-')
sigdata = b64decode(sig['signature'])
h = HASHES[hashalg].new()
h.update(sigstring.encode('ascii'))
result = pkcs.verify(h, sigdata)
logging.debug(f'Sig verification result: {result}')
if not result:
raise error.ValidationFailed('Failed signature validation')
else:
return True
def parse_sig(sig):
if not sig:
raise error.MissingData('Missing signature header')
parts = {'headers': 'date'}
for part in sig.strip().split(','):
k, v = part.replace('"', '').split('=', maxsplit=1)
if k == 'headers':
parts['headers'] = v.split()
else:
parts[k] = v
return parts
def build_sigstring(headers, used_headers):
string = ''
for header in used_headers:
string += f'{header.lower()}: {headers[header]}'
if header != list(used_headers)[-1]:
string += '\n'
return string
def sign_sigstring(sigstring, key, hashalg='SHA256'):
cached_data = cache.sig.fetch(sigstring)
if cached_data:
logging.info('Returning cache sigstring')
return cached_data
pkcs = PKCS1_v1_5.new(key)
h = HASHES[hashalg.lower()].new()
h.update(sigstring.encode('ascii'))
sigdata = b64encode(pkcs.sign(h)).decode('ascii')
cache.sig.store(sigstring, sigdata)
return sigdata
class error:
class MissingData(Exception):
'''raise when data is expected but nothing is returned'''
class ValidationFailed(Exception):
'''raise when signature validation fails'''