remove http signature functions from urllib client

This commit is contained in:
Izalia Mae 2021-09-24 18:49:22 -04:00
parent 8744090ef7
commit be98e94355
4 changed files with 20 additions and 342 deletions

View file

@ -1,31 +1,12 @@
from .signatures import (
verify_request,
verify_headers,
parse_signature,
fetch_actor,
fetch_instance,
fetch_nodeinfo,
fetch_webfinger_account,
generate_rsa_key
)
from . import error from . import error
from .client import HttpUrllibClient, set_default_client from .client import HttpUrllibClient, set_default_client
from .request import HttpUrllibRequest from .request import HttpUrllibRequest
from .response import HttpUrllibResponse from .response import HttpUrllibResponse
#__all__ = [ __all__ = [
#'HttpRequestsClient', 'HttpUrllibClient',
#'HttpRequestsRequest', 'HttpUrllibRequest',
#'HttpRequestsResponse', 'HttpUrllibResponse',
#'fetch_actor', 'set_default_client',
#'fetch_instance', 'error'
#'fetch_nodeinfo', ]
#'fetch_webfinger_account',
#'generate_rsa_key',
#'parse_signature',
#'set_requests_client',
#'verify_headers',
#'verify_request',
#]

View file

@ -1,13 +1,15 @@
import json import json
from Crypto.Hash import SHA256
from izzylib import DotDict, LowerDotDict, Url, boolean
from base64 import b64decode, b64encode
from datetime import datetime from datetime import datetime
from izzylib import izzylog as logging
from izzylib.http_signatures import sign_request
from .signatures import sign_pkcs_headers from ..dotdict import DotDict, LowerDotDict
from ..misc import Url, boolean
try:
from ..http_signatures import sign_request
except ModuleNotFoundError:
sign_request = None
methods = ['delete', 'get', 'head', 'options', 'patch', 'post', 'put'] methods = ['delete', 'get', 'head', 'options', 'patch', 'post', 'put']
@ -84,33 +86,7 @@ class HttpUrllibRequest:
def sign(self, privkey, keyid): def sign(self, privkey, keyid):
if not sign_request:
raise AttributeError('PyCryptodome not installed. Request signing is disabled.')
sign_request(self, privkey, keyid) sign_request(self, privkey, keyid)
def sign2(self, privkey, keyid):
self.unset_header('signature')
self.set_header('(request-target)', f'{self.method.lower()} {self.url.path}')
self.set_header('host', self.url.host)
self.set_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if self.body:
body_hash = b64encode(SHA256.new(self.body).digest()).decode("UTF-8")
self.set_header('digest', f'SHA-256={body_hash}')
self.set_header('content-length', str(len(self.body)))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in self.headers.keys()]),
'signature': b64encode(sign_pkcs_headers(privkey, self.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
self.set_header('signature', sig_string)
self.unset_header('(request-target)')
self.unset_header('host')

View file

@ -1,280 +0,0 @@
import json, requests, sys
from PIL import Image
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from base64 import b64decode, b64encode
from datetime import datetime
from functools import lru_cache
from izzylib import DefaultDotDict, DotDict
from izzylib import izzylog
from tldextract import extract
from urllib.parse import urlparse
Client = None
def set_client(client):
global Client
Client = client
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
try:
actor = resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@lru_cache(maxsize=512)
def fetch_instance(domain):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
headers = {'Accept': 'application/json'}
resp = Client.request(f'https://{domain}/api/v1/instance', headers=headers)
try:
return resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
@lru_cache(maxsize=512)
def fetch_nodeinfo(domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
webfinger = Client.request(f'https://{domain}/.well-known/nodeinfo')
webfinger_data = DotDict(webfinger.body)
for link in webfinger.json.links:
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
nodeinfo = Client.request(nodeinfo_url)
return nodeinfo.json
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
data = DefaultDotDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json.subject.replace('acct:', '').split('@')
for link in webfinger.json.links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def generate_rsa_key():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key
def parse_signature(signature: str):
if not signature:
return
raise AssertionError('Missing signature header')
split_sig = signature.split(',')
sig = DefaultDotDict()
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
sig.headers = sig.headers.split()
sig.domain = urlparse(sig.keyid).netloc
sig.top_domain = '.'.join(extract(sig.domain)[1:])
sig.actor = sig.keyid.split('#')[0]
return sig
def verify_headers(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = parse_signature(headers.get('signature'))
digest = headers.get('digest')
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
raise AssertionError('Missing signature')
if not actor:
actor = fetch_actor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
raise AssertionError(f'Missing headers: {missing_headers}')
## Fail if body verification fails
if digest:
digest_hash = parse_body_digest(headers.get('digest'))
if not verify_string(body, digest_hash.sig, digest_hash.alg):
raise AssertionError('Failed body digest verification')
pubkey = actor.publicKey['publicKeyPem']
return sign_pkcs_headers(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature)
async def verify_request(request, actor: dict=None):
'''Verify a header signature from a SimpleASGI request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
return verify_headers(
request.Headers.to_dict(),
request.method,
request.path,
actor = actor,
body = request.body
)
### Helper functions that shouldn't be used directly ###
def parse_body_digest(digest):
if not digest:
raise AssertionError('Empty digest')
parsed = DotDict()
alg, sig = digest.split('=', 1)
parsed.sig = sig
parsed.alg = alg.replace('-', '')
return parsed
def sign_pkcs_headers(key: str, headers: dict, sig=None):
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def sign_request(request, privkey, keyid):
assert isinstance(request.body, bytes)
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', str(len(request.body)))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(sign_pkcs_headers(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def verify_string(string, enc_string, alg='SHA256', fail=False):
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise AssertionError('String failed validation')
else:
return False

View file

@ -1,4 +1,5 @@
from izzylib import DotDict from .. import izzylog
from ..dotdict import DotDict
class RowClasses(DotDict): class RowClasses(DotDict):