add http signature functions
This commit is contained in:
parent
af0f7bc5fe
commit
95d5da969a
132
IzzyLib/httpSignatures.py
Normal file
132
IzzyLib/httpSignatures.py
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
'''
|
||||||
|
Functions for working with HTTP signatures
|
||||||
|
Note: I edited this while tired, so this may be broken
|
||||||
|
'''
|
||||||
|
from base64 import b64decode, b64encode
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from Crypto.PublicKey import RSA
|
||||||
|
from Crypto.Hash import SHA, SHA256, SHA512
|
||||||
|
from Crypto.Signature import PKCS1_v1_5
|
||||||
|
|
||||||
|
from .misc import formatUTC
|
||||||
|
from . import logging
|
||||||
|
|
||||||
|
|
||||||
|
CACHE = TTLCache(ttl='1h')
|
||||||
|
HASHES = {
|
||||||
|
'sha1': SHA,
|
||||||
|
'sha256': SHA256,
|
||||||
|
'sha512': SHA512
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def sign_headers(headers, target, privkey, key_id):
|
||||||
|
'''sign headers and add the signature to them'''
|
||||||
|
key = RSA.importKey(privkey)
|
||||||
|
headers = {k.lower(): v for k, v in headers.items()}
|
||||||
|
headers['date'] = formatUTC(None)
|
||||||
|
headers['(request-target)'] = target
|
||||||
|
|
||||||
|
used_headers = headers.keys()
|
||||||
|
|
||||||
|
sigstring = build_sigstring(headers, used_headers, target=target)
|
||||||
|
signed_sigstring = sign_sigstring(sigstring, key)
|
||||||
|
|
||||||
|
sig = {
|
||||||
|
'keyId': key_id,
|
||||||
|
'algorithm': 'rsa-sha256',
|
||||||
|
'headers': ' '.join(used_headers),
|
||||||
|
'signature': signed_sigstring
|
||||||
|
}
|
||||||
|
|
||||||
|
sig_header = ['{}="{}"'.format(k, v) for k, v in sig.items()]
|
||||||
|
headers['signature'] = ','.join(sig_header)
|
||||||
|
|
||||||
|
del headers['(request-target)']
|
||||||
|
|
||||||
|
return headers
|
||||||
|
|
||||||
|
|
||||||
|
def validate(headers, pubkey):
|
||||||
|
'''validate a signature header from an incoming request'''
|
||||||
|
sig = parse_sig(headers.get('signature'))
|
||||||
|
|
||||||
|
if not sig:
|
||||||
|
raise MissingData('Missing signature')
|
||||||
|
|
||||||
|
pkcs = PKCS1_v1_5.new(RSA.importKey(pubkey))
|
||||||
|
|
||||||
|
sigstring = build_sigstring(request, sig['headers'])
|
||||||
|
logging.debug(f'Signing string: {sigstring}')
|
||||||
|
|
||||||
|
signalg, hashalg = sig['algorithm'].split('-')
|
||||||
|
sigdata = b64decode(sig['signature'])
|
||||||
|
|
||||||
|
h = HASHES[hashalg].new()
|
||||||
|
h.update(sigstring.encode('ascii'))
|
||||||
|
result = pkcs.verify(h, sigdata)
|
||||||
|
|
||||||
|
logging.debug(f'Sig verification result: {result}')
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
raise error.ValidationFailed('Failed signature validation')
|
||||||
|
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def parse_sig(sig):
|
||||||
|
if not sig:
|
||||||
|
raise error.MissingData('Missing signature header')
|
||||||
|
|
||||||
|
parts = {'headers': 'date'}
|
||||||
|
|
||||||
|
for part in sig.strip().split(','):
|
||||||
|
k, v = part.replace('"', '').split('=', maxsplit=1)
|
||||||
|
|
||||||
|
if k == 'headers':
|
||||||
|
parts['headers'] = v.split()
|
||||||
|
|
||||||
|
else:
|
||||||
|
parts[k] = v
|
||||||
|
|
||||||
|
return parts
|
||||||
|
|
||||||
|
|
||||||
|
def build_sigstring(headers, used_headers):
|
||||||
|
string = ''
|
||||||
|
|
||||||
|
for header in used_headers:
|
||||||
|
string += f'{header.lower()}: {headers[header]}'
|
||||||
|
|
||||||
|
if header != list(used_headers)[-1]:
|
||||||
|
string += '\n'
|
||||||
|
|
||||||
|
return string
|
||||||
|
|
||||||
|
|
||||||
|
def sign_sigstring(sigstring, key, hashalg='SHA256'):
|
||||||
|
cached_data = cache.sig.fetch(sigstring)
|
||||||
|
|
||||||
|
if cached_data:
|
||||||
|
logging.info('Returning cache sigstring')
|
||||||
|
return cached_data
|
||||||
|
|
||||||
|
pkcs = PKCS1_v1_5.new(key)
|
||||||
|
h = HASHES[hashalg.lower()].new()
|
||||||
|
h.update(sigstring.encode('ascii'))
|
||||||
|
|
||||||
|
sigdata = b64encode(pkcs.sign(h)).decode('ascii')
|
||||||
|
cache.sig.store(sigstring, sigdata)
|
||||||
|
|
||||||
|
return sigdata
|
||||||
|
|
||||||
|
|
||||||
|
class error:
|
||||||
|
class MissingData(Exception):
|
||||||
|
'''raise when data is expected but nothing is returned'''
|
||||||
|
|
||||||
|
class ValidationFailed(Exception):
|
||||||
|
'''raise when signature validation fails'''
|
|
@ -4,6 +4,8 @@ Miscellaneous functions
|
||||||
|
|
||||||
import random, string
|
import random, string
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
from . import logging
|
from . import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -37,6 +39,11 @@ def randomgen(chars=20):
|
||||||
return ''.join(random.choices(string.ascii_letters + string.digits, k=chars))
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=chars))
|
||||||
|
|
||||||
|
|
||||||
|
def formatUTC(timestamp=None):
|
||||||
|
date = datetime.fromtimestamp(timestamp) if timestamp else datetime.utcnow()
|
||||||
|
return date.strftime('%a, %d %b %Y %H:%M:%S GMT')
|
||||||
|
|
||||||
|
|
||||||
def merp():
|
def merp():
|
||||||
logging.setLevel('MERP')
|
logging.setLevel('MERP')
|
||||||
logging.merp('heck')
|
logging.merp('heck')
|
||||||
|
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
|
@ -0,0 +1 @@
|
||||||
|
pycryptodome>=3.9.4
|
Loading…
Reference in a new issue