131 lines
2.9 KiB
Python
131 lines
2.9 KiB
Python
'''Functions for working with HTTP signatures
|
|
Note: I edited this while tired, so this may be broken'''
|
|
from base64 import b64decode, b64encode
|
|
from urllib.parse import urlparse
|
|
from datetime import datetime
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from Crypto.Hash import SHA, SHA256, SHA512
|
|
from Crypto.Signature import PKCS1_v1_5
|
|
|
|
from .misc import formatUTC
|
|
from . import logging
|
|
|
|
|
|
CACHE = TTLCache(ttl='1h')
|
|
HASHES = {
|
|
'sha1': SHA,
|
|
'sha256': SHA256,
|
|
'sha512': SHA512
|
|
}
|
|
|
|
|
|
def sign_headers(headers, target, privkey, key_id):
|
|
'''sign headers and add the signature to them'''
|
|
key = RSA.importKey(privkey)
|
|
headers = {k.lower(): v for k, v in headers.items()}
|
|
headers['date'] = formatUTC(None)
|
|
headers['(request-target)'] = target
|
|
|
|
used_headers = headers.keys()
|
|
|
|
sigstring = build_sigstring(headers, used_headers, target=target)
|
|
signed_sigstring = sign_sigstring(sigstring, key)
|
|
|
|
sig = {
|
|
'keyId': key_id,
|
|
'algorithm': 'rsa-sha256',
|
|
'headers': ' '.join(used_headers),
|
|
'signature': signed_sigstring
|
|
}
|
|
|
|
sig_header = ['{}="{}"'.format(k, v) for k, v in sig.items()]
|
|
headers['signature'] = ','.join(sig_header)
|
|
|
|
del headers['(request-target)']
|
|
|
|
return headers
|
|
|
|
|
|
def validate(headers, pubkey):
|
|
'''validate a signature header from an incoming request'''
|
|
sig = parse_sig(headers.get('signature'))
|
|
|
|
if not sig:
|
|
raise MissingData('Missing signature')
|
|
|
|
pkcs = PKCS1_v1_5.new(RSA.importKey(pubkey))
|
|
|
|
sigstring = build_sigstring(request, sig['headers'])
|
|
logging.debug(f'Signing string: {sigstring}')
|
|
|
|
signalg, hashalg = sig['algorithm'].split('-')
|
|
sigdata = b64decode(sig['signature'])
|
|
|
|
h = HASHES[hashalg].new()
|
|
h.update(sigstring.encode('ascii'))
|
|
result = pkcs.verify(h, sigdata)
|
|
|
|
logging.debug(f'Sig verification result: {result}')
|
|
|
|
if not result:
|
|
raise error.ValidationFailed('Failed signature validation')
|
|
|
|
else:
|
|
return True
|
|
|
|
|
|
def parse_sig(sig):
|
|
if not sig:
|
|
raise error.MissingData('Missing signature header')
|
|
|
|
parts = {'headers': 'date'}
|
|
|
|
for part in sig.strip().split(','):
|
|
k, v = part.replace('"', '').split('=', maxsplit=1)
|
|
|
|
if k == 'headers':
|
|
parts['headers'] = v.split()
|
|
|
|
else:
|
|
parts[k] = v
|
|
|
|
return parts
|
|
|
|
|
|
def build_sigstring(headers, used_headers):
|
|
string = ''
|
|
|
|
for header in used_headers:
|
|
string += f'{header.lower()}: {headers[header]}'
|
|
|
|
if header != list(used_headers)[-1]:
|
|
string += '\n'
|
|
|
|
return string
|
|
|
|
|
|
def sign_sigstring(sigstring, key, hashalg='SHA256'):
|
|
cached_data = cache.sig.fetch(sigstring)
|
|
|
|
if cached_data:
|
|
logging.info('Returning cache sigstring')
|
|
return cached_data
|
|
|
|
pkcs = PKCS1_v1_5.new(key)
|
|
h = HASHES[hashalg.lower()].new()
|
|
h.update(sigstring.encode('ascii'))
|
|
|
|
sigdata = b64encode(pkcs.sign(h)).decode('ascii')
|
|
cache.sig.store(sigstring, sigdata)
|
|
|
|
return sigdata
|
|
|
|
|
|
class error:
|
|
class MissingData(Exception):
|
|
'''raise when data is expected but nothing is returned'''
|
|
|
|
class ValidationFailed(Exception):
|
|
'''raise when signature validation fails'''
|