too much to list

This commit is contained in:
Izalia Mae 2020-05-23 11:26:50 -04:00
parent ff0defe028
commit 6ae253b40d
8 changed files with 280 additions and 145 deletions

View file

@ -8,4 +8,4 @@ import sys
assert sys.version_info >= (3, 6)
__version__ = (0, 1, 1)
__version__ = (0, 2, 0)

View file

@ -3,14 +3,19 @@ import re
from colour import Color
from . import logging
check = lambda color: Color(f'#{str(color)}' if'^(?:[0-9a-fA-F]{3}){1,2}$', color) else color)
def _multi(multiplier):
if multiplier >= 1:
if multiplier > 100:
return 1
elif multiplier <= 0:
elif multiplier > 1:
return multiplier/100
elif multiplier < 0:
return 0
return multiplier

IzzyLib/ Normal file
View file

@ -0,0 +1,213 @@
import traceback, urllib3, json
from base64 import b64decode, b64encode
from urllib.parse import urlparse
from datetime import datetime
import httpsig
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, SHA256, SHA384, SHA512
from Crypto.Signature import PKCS1_v1_5
from . import logging, __version__
from .cache import TTLCache, LRUCache
from .misc import formatUTC
version = '.'.join([str(num) for num in __version__])
class httpClient:
def __init__(self, pool=100, timeout=30, headers={}, agent=None):
self.cache = LRUCache()
self.pool = pool
self.timeout = timeout
self.agent = agent if agent else f'IzzyLib/{version}'
self.headers = headers
self.client = urllib3.PoolManager(num_pools=self.pool, timeout=self.timeout)
self.headers['User-Agent'] = self.agent
def _fetch(self, url, headers={}, method='GET', data=None, cached=True):
cached_data = self.cache.fetch(url)
#url = url.split('#')[0]
if cached and cached_data:
logging.debug(f'Returning cached data for {url}')
return cached_data
if not headers.get('User-Agent'):
headers.update({'User-Agent': self.agent})
logging.debug(f'Fetching new data for {url}')
if data:
if isinstance(data, dict):
data = json.dumps(data)
elif not isinstance(data, bytes):
data = bytes(data)
raise TypeError('Post data cannot be turned into bytes')
resp = self.client.request(method, url, headers=headers, body=data)
resp = self.client.request(method, url, headers=headers)
except Exception as e:
raise ConnectionError(f'Failed to fetch url: {e}')
if cached:
logging.debug(f'Caching {url}'), resp)
return resp
def raw(self, *args, **kwargs):
Return a response object
return self._fetch(*args, **kwargs)
def text(self, *args, **kwargs):
Return the body as text
resp = self._fetch(*args, **kwargs)
def json(self, *args, **kwargs):
Return the body as a dict if it's json
headers = kwargs.get('headers')
if not headers:
kwargs['headers'] = {}
kwargs['headers'].update({'Accept': 'application/json'})
resp = self._fetch(*args, **kwargs)
data = json.loads(
except Exception as e:
logging.debug(f'Failed to load json: {e}')
return data
def SignHeaders(headers, keyid, privkey, url, method='GET'):
Signs headers and returns them with a signature header
headers (dict): Headers to be signed
keyid (str): Url to the public key used to verify the signature
privkey (str): Private key used to sign the headers
url (str): Url of the request for the signed headers
method (str): Http method of the request for the signed headers
RSAkey = RSA.import_key(privkey)
key_size = int(RSAkey.size_in_bytes()/2)
logging.debug('Signing key size:', key_size)
parsed_url = urlparse(url)
raw_headers = {'date': formatUTC(), 'host': parsed_url.netloc, '(request-target)': ' '.join([method, parsed_url.path])}
header_keys = raw_headers.keys()
signer = httpsig.HeaderSigner(keyid, privkey, f'rsa-sha{key_size}', headers=header_keys, sign_header='signature')
new_headers = signer.sign(raw_headers, parsed_url.netloc, method, parsed_url.path)
logging.debug('Signed headers:', new_headers)
del new_headers['(request-target)']
return new_headers
def ValidateSignature(headers, method, path, client=None, agent=None):
Validates the signature header.
headers (dict): All of the headers to be used to check a signature. The signature header must be included too
method (str): The http method used in relation to the headers
path (str): The path of the request in relation to the headers
client (pool object): Specify a urllib3 pool to use for fetching the actor. optional
agent (str): User agent used for fetching actor data. optional
client = httpClient(agent) if not client else client
headers = {k.lower(): v for k,v in headers.items()}
sig_header = headers.get('signature')
except Exception as e:
if not sig_header:
logging.error('Missing signature header')
split_sig = sig_header.split(',')
signature = {}
for part in split_sig:
key, value = part.split('=', 1)
signature[key.lower()] = value.replace('"', '')
if not signature.get('headers'):
logging.verbose('Missing headers section in signature')
signature['headers'] = signature['headers'].split()
actor_data = client.json(signature['keyid'])
pubkey = actor_data['publicKey']['publicKeyPem']
except Exception as e:
logging.verbose(f'Failed to get public key for actor {signature["keyid"]}')
valid = httpsig.HeaderVerifier(headers, pubkey, signature['headers'], method, path, sign_header='signature').verify()
if not valid:
if not isinstance(valid, tuple):
logging.verbose('Signature validation failed for unknown actor')
logging.verbose(f'Signature validation failed for actor: {valid[1]}')
return json_error(401, 'signature check failed')
return True
def ValidateRequest(request, client=None, agent=None):
Validates the headers in a Sanic or Aiohttp request (other frameworks may be supported)
See ValidateSignature for 'client' and 'agent' usage
return ValidateSignature(request.headers, request.method, request.path, client, agent)

View file

@ -1,130 +0,0 @@
'''Functions for working with HTTP signatures
Note: I edited this while tired, so this may be broken'''
from base64 import b64decode, b64encode
from urllib.parse import urlparse
from datetime import datetime
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA, SHA256, SHA512
from Crypto.Signature import PKCS1_v1_5
from .misc import formatUTC
from . import logging
CACHE = TTLCache(ttl='1h')
'sha1': SHA,
'sha256': SHA256,
'sha512': SHA512
def sign_headers(headers, target, privkey, key_id):
'''sign headers and add the signature to them'''
key = RSA.importKey(privkey)
headers = {k.lower(): v for k, v in headers.items()}
headers['date'] = formatUTC(None)
headers['(request-target)'] = target
used_headers = headers.keys()
sigstring = build_sigstring(headers, used_headers, target=target)
signed_sigstring = sign_sigstring(sigstring, key)
sig = {
'keyId': key_id,
'algorithm': 'rsa-sha256',
'headers': ' '.join(used_headers),
'signature': signed_sigstring
sig_header = ['{}="{}"'.format(k, v) for k, v in sig.items()]
headers['signature'] = ','.join(sig_header)
del headers['(request-target)']
return headers
def validate(headers, pubkey):
'''validate a signature header from an incoming request'''
sig = parse_sig(headers.get('signature'))
if not sig:
raise MissingData('Missing signature')
pkcs =
sigstring = build_sigstring(request, sig['headers'])
logging.debug(f'Signing string: {sigstring}')
signalg, hashalg = sig['algorithm'].split('-')
sigdata = b64decode(sig['signature'])
h = HASHES[hashalg].new()
result = pkcs.verify(h, sigdata)
logging.debug(f'Sig verification result: {result}')
if not result:
raise error.ValidationFailed('Failed signature validation')
return True
def parse_sig(sig):
if not sig:
raise error.MissingData('Missing signature header')
parts = {'headers': 'date'}
for part in sig.strip().split(','):
k, v = part.replace('"', '').split('=', maxsplit=1)
if k == 'headers':
parts['headers'] = v.split()
parts[k] = v
return parts
def build_sigstring(headers, used_headers):
string = ''
for header in used_headers:
string += f'{header.lower()}: {headers[header]}'
if header != list(used_headers)[-1]:
string += '\n'
return string
def sign_sigstring(sigstring, key, hashalg='SHA256'):
cached_data = cache.sig.fetch(sigstring)
if cached_data:'Returning cache sigstring')
return cached_data
pkcs =
h = HASHES[hashalg.lower()].new()
sigdata = b64encode(pkcs.sign(h)).decode('ascii'), sigdata)
return sigdata
class error:
class MissingData(Exception):
'''raise when data is expected but nothing is returned'''
class ValidationFailed(Exception):
'''raise when signature validation fails'''

View file

@ -26,7 +26,7 @@ class Log():
self.config = dict()
def _lvlCheck(self, level):
@ -91,7 +91,7 @@ class Log():
if levelNum < self.config['level']:
message = ' '.join(msg)
message = ' '.join([str(message) for message in msg])
output = f'{level}: {message}\n'
if self.config['date'] and (self.config['systemd'] and not env.get('INVOCATION_ID')):

View file

@ -1,9 +1,9 @@
'''Miscellaneous functions'''
import random, string, sys, os
import random, string, sys, os, shlex, subprocess
from os.path import abspath, dirname, basename, isdir, isfile
from os import environ as env
from datetime import datetime
from os.path import abspath, dirname, basename, isdir, isfile
from . import logging
@ -63,6 +63,26 @@ def config_dir(modpath=None):
return stor_path
def sudo(cmd, password, user=None):
### Please don't pur your password in plain text in a script
### Use a module like 'getpass' to get the password instead
if isinstance(cmd, list):
cmd = ' '.join(cmd)
elif not isinstance(cmd, str):
raise ValueError('Command is not a list or string')
euser = os.environ.get('USER')
cmd = ' '.join(['sudo', '-u', user, cmd]) if user and euser != user else 'sudo ' + cmd
sudocmd = ' '.join(['echo', f'{password}', '|', cmd])
proc = subprocess.Popen(['/usr/bin/env', 'bash', '-c', sudocmd])
return proc
def merp():
log = logging.getLogger('merp-heck', {'level': 'merp', 'date': False})

View file

@ -1,5 +1,5 @@
'''functions for web template management and rendering'''
import codecs, traceback, os, json, aiohttp
import codecs, traceback, os, json
from os import listdir, makedirs
from os.path import isfile, isdir, getmtime, abspath
@ -13,6 +13,18 @@ from import FileSystemEventHandler
from . import logging
from .color import *
framework = 'sanic'
import sanic
logging.debug('Cannot find Sanic')
import aiohttp
logging.debug('Cannot find aioHTTP')
env = None
@ -94,8 +106,11 @@ def delEnv(var):
del global_variables[var]
def setup():
def setup(fwork='sanic'):
global env
global framework
framework = fwork
env = Environment(
loader=ChoiceLoader([FileSystemLoader(path) for path in search_path]),
autoescape=select_autoescape(['html', 'css']),
@ -115,11 +130,22 @@ def renderTemplate(tplfile, context, request=None, headers=dict(), cookies=dict(
return env.get_template(tplfile).render(data)
def aiohttpTemplate(*args, **kwargs):
ctype = kwargs.get('content_type', 'text/html')
status = kwargs.get('status', 200)
html = renderTemplate(*args, **kwargs)
return aiohttp.web.Response(body=html, status=status, content_type=ctype)
def sendResponse(template, request, context=dict(), status=200, ctype='text/html', headers=dict(), **kwargs):
context['request'] = request
html = renderTemplate(template, context=context, **kwargs)
if framework == 'sanic':
return sanic.response.text(html, status=status, headers=headers, content_type=ctype)
elif framework == 'aiohttp':
return aiohttp.web.Response(body=html, status=status, headers=headers, content_type=ctype)
logging.error('Please install aiohttp or sanic. Response not sent.')
# delete me later
aiohttpTemplate = sendResponse
def buildTemplates(name=None):
@ -202,4 +228,4 @@ class templateWatchHandler(FileSystemEventHandler):
__all__ = ['addSearchPath', 'delSearchPath', 'addBuildPath', 'delSearchPath', 'addEnv', 'delEnv', 'setup', 'renderTemplate', 'aiohttp', 'buildTemplates', 'templateWatcher']
__all__ = ['addSearchPath', 'delSearchPath', 'addBuildPath', 'delSearchPath', 'addEnv', 'delEnv', 'setup', 'renderTemplate', 'sendResponse', 'buildTemplates', 'templateWatcher']

View file

@ -9,3 +9,4 @@>=1.5.0