This commit is contained in:
Izalia Mae 2021-06-14 03:44:12 -04:00
parent 9f6b70987d
commit 00a5b03995
14 changed files with 630 additions and 477 deletions

View file

@ -23,34 +23,36 @@ from .connection import Connection
from .http_urllib_client import HttpUrllibClient, HttpUrllibResponse
def log_import_error(*message):
izzylog.verbose(*message)
def log_import_error(package, *message):
izzylog.debug(*message)
path = Path(__file__).resolve.parent.join(package)
if izzylog.get_config('level') == logging.Levels.DEBUG:
if path.exists:
traceback.print_exc()
try:
from izzylib.sql import Column, CustomRows, Session, SqlDatabase, Tables, SqliteClient, SqliteColumn, SqliteServer, SqliteSession
from izzylib.sql import SqlColumn, CustomRows, SqlSession, SqlDatabase, Tables, SqliteClient, SqliteColumn, SqliteServer, SqliteSession
except ImportError:
log_import_error('Failed to import SQL classes. Connecting to SQL databases is disabled')
log_import_error('sql', 'Failed to import SQL classes. Connecting to SQL databases is disabled')
try:
from izzylib.tinydb import TinyDatabase, TinyRow, TinyRows
except ImportError:
log_import_error('Failed to import TinyDB classes. TinyDB database is disabled')
log_import_error('tinydb', 'Failed to import TinyDB classes. TinyDB database is disabled')
try:
from izzylib.template import Template, Color
except ImportError:
log_import_error('Failed to import http template classes. Jinja and HAML templates disabled')
log_import_error('template', 'Failed to import http template classes. Jinja and HAML templates disabled')
try:
from izzylib.http_requests_client import HttpRequestsClient, HttpRequestsRequest, HttpRequestsResponse
from izzylib.http_requests_client import *
except ImportError:
log_import_error('Failed to import Requests http client classes. Requests http client is disabled.')
log_import_error('requests_client', 'Failed to import Requests http client classes. Requests http client is disabled')
try:
from izzylib.http_server import PasswordHasher, HttpServer, HttpServerRequest, HttpServerResponse
except ImportError:
log_import_error('Failed to import HTTP server classes. The HTTP server will be disabled')
log_import_error('http_server', 'Failed to import HTTP server classes. The HTTP server will be disabled')

View file

@ -1,9 +1,8 @@
'''Simple caches that uses ordered dicts'''
import re
from datetime import datetime
from collections import OrderedDict
from functools import wraps
from . import DotDict
@ -113,6 +112,23 @@ class BaseCache(OrderedDict):
return self[key].data
## Was gonna use this for db stuff, but I need to plan it out better
def decorator(function, key, arg=0):
@wraps(function)
def wrapper(*args, **kwargs):
cached = self.fetch(key)
if cached:
return cached
result = function(*args, **kwargs)
self.store(key, args[arg] if type(arg) == int else kwargs[arg])
return result
return wrapper
class TtlCache(BaseCache):
def __init__(self, maxsize=1024, ttl='1h'):
super().__init__(maxsize, ttl)

View file

@ -56,6 +56,10 @@ class DotDict(dict):
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def copy(self):
return DotDict(self)
def update(self, data):
for k,v in data.items():
self.__setitem__(k, v)
@ -90,7 +94,7 @@ class DefaultDotDict(DotDict):
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, DefaultDict())
val = self.get(key, DefaultDotDict())
return DotDict(val) if type(val) == dict else val
@ -160,6 +164,13 @@ class MultiDotDict(DotDict):
return default
def set(self, key, value):
if self.get(key):
del self[key]
self[key] = value
def delone(self, key, value):
self.__getitem__(key, False).remove(value)

View file

@ -1,5 +1,5 @@
'''Miscellaneous functions'''
import hashlib, random, string, statistics, socket, time, timeit
import hashlib, platform, random, string, statistics, socket, time, timeit
from datetime import datetime
from getpass import getpass
@ -137,8 +137,11 @@ def hasher(string, alg='blake2s'):
str: The hashed string in hex format as a string
'''
if alg not in hashlib.__always_supported:
raise TypeError('Unsupported hash algorithm. Supported algs:', ', '.join(hashlib.__always_supported))
if alg not in hashlib.algorithms_available:
raise TypeError('Unsupported hash algorithm. Supported algs:', ', '.join(hashlib.algorithms_available))
if alg in ['sha1', 'md4', 'md5', 'md5-sha1']:
logging.verbose('Warning: Using an insecure hashing algorithm. sha256 or sha512 is recommended')
string = string.encode('UTF-8') if type(string) != bytes else string
@ -220,7 +223,7 @@ def port_check(port, address='127.0.0.1', tcp=True):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) as s:
try:
return not s.connect_ex((address, port)) == 0
return s.connect_ex((address, port)) == 0
except socket.error as e:
return False
@ -310,7 +313,7 @@ def random_gen(length=20, letters=True, numbers=True, extra=None):
if letters:
characters += string.ascii_letters
if digits:
if numbers:
characters += string.digits
if extra:

View file

@ -26,10 +26,10 @@ class Path(str):
def __check_dir(self, path=None):
target = self if not path else Path(path)
if not self.parents and not target.parent.exists:
if not self.config['parents'] and not target.parent.exists:
raise FileNotFoundError('Parent directories do not exist:', target)
if not self.exist and target.exists:
if not self.config['exist'] and target.exists:
raise FileExistsError('File or directory already exists:', target)
@ -90,11 +90,11 @@ class Path(str):
def mkdir(self, mode=0o755):
if self.parents:
os.makedirs(self, mode, exist_ok=self.exist)
if self.config['parents']:
os.makedirs(self, mode, exist_ok=self.config['exist'])
else:
os.makedir(self, mode, exist_ok=self.exist)
os.makedir(self, mode, exist_ok=self.config['exist'])
return self.exists

View file

@ -34,7 +34,7 @@ class PasswordHasher:
'memory_cost': memory * 1024,
'parallelism': threads,
'encoding': 'utf-8',
'type': algtype,
'type': type,
}
self.hasher = argon2.PasswordHasher(**self.config)

View file

@ -1,393 +1,36 @@
import json, requests, sys
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from PIL import Image
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property, lru_cache
from io import BytesIO
from izzylib import DefaultDotDict, DotDict, Path, izzylog, __version__
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class RequestsClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
izzylog.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def request(self, *args, method='get', **kwargs):
if method.lower() not in methods:
raise ValueError(f'Invalid method: {method}')
request = RequestsRequest(self, *args, method=method.lower(), **kwargs)
return RequestsResponse(request.send())
def file(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
return resp.save(filepath)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
izzylog.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
izzylog.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = RequestsRequest(self, *args, **kwargs)
self.__sign_request(request, privkey, keyid)
return RequestsResponse(request.send())
class RequestsRequest(object):
def __init__(self, client, url, data=None, headers={}, query={}, method='get'):
self.args = [url]
self.kwargs = {'params': query}
self.method = method.lower()
self.client = client
new_headers = client.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = client.agent
self.kwargs['headers'] = new_headers
self.kwargs['data'] = data
if client.proxy.enabled:
self.kwargs['proxies'] = {self.proxy.ptype: f'{self.proxy.ptype}://{self.proxy.host}:{self.proxy.port}'}
def send(self):
func = getattr(requests, self.method)
return func(*self.args, **self.kwargs)
class RequestsResponse(object):
def __init__(self, response):
self.response = response
self.data = b''
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status_code
self.url = response.url
def chunks(self, size=256):
return self.response.iter_content(chunk_size=256)
@property
def body(self):
for chunk in self.chunks():
self.data += chunk
return self.data
@cached_property
def text(self):
return self.data.decode(self.response.encoding)
@cached_property
def json(self):
return DotDict(self.text)
@cached_property
def json_pretty(self, indent=4):
return json.dumps(self.json, indent=indent)
def save(self, path, overwrite=True):
path = Path(path)
parent = path.parent()
if not parent.exists():
raise ValueError(f'Path does not exist: {parent}')
if overwrite and path.exists():
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
def verify_request(request: SanicRequest, actor: dict):
'''Verify a header signature from a SimpleASGI request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
body = request.body if request.body else None
return verify_headers(request.headers, request.method, request.path, body, actor)
def verify_headers(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = parse_signature(headers.get('signature'))
digest = parse_body_digest(headers.get('digest'))
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
raise ValueError('Missing signature')
if not actor:
actor = fetch_actor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
raise KeyError(f'Missing headers: {missing_headers}')
## Fail if body verification fails
if digest and not VerifyString(body, digest.sig, digest.alg):
raise ValueError('Failed body digest verification')
pubkey = actor.publicKey['publicKeyPem']
return PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature)
def parse_body_digest(digest):
if not digest:
raise ValueError('Empty digest')
parsed = DotDict()
alg, sig = digest.split('=', 1)
parsed.sig = sig
parsed.alg = alg.replace('-', '')
return parsed
def verify_string(string, enc_string, alg='SHA256', fail=False):
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise ValueError('String failed validation')
else:
return False
def sign_pkcs_headers(key: str, headers: dict, sig=None):
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def parse_signature(signature: str):
if not signature:
raise ValueError('Missing signature header')
split_sig = signature.split(',')
sig = DefaultDict({})
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
sig.headers = sig.headers.split()
return sig
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
try:
actor = resp.json()
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
izzylog.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
data = DefaultDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
for link in webfinger.json().links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def set_requests_client(client=None):
global Client
Client = client or RequestsClient()
def generate_rsa_key():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key
from .client import (
HttpRequestsClient,
HttpRequestsRequest,
HttpRequestsResponse,
SigningError,
verify_request,
verify_headers,
parse_signature,
fetch_actor,
fetch_webfinger_account,
fetch_nodeinfo,
set_requests_client,
generate_rsa_key
)
## These usually only get called by the above functions, but importing anyway
from .client import (
parse_body_digest,
verify_string,
sign_pkcs_headers
)
__all__ = [
'HttpRequestsClient',
'HttpRequestsRequest',
'HttpRequestsResponse',
'SigningError',
'fetch_actor',
'fetch_webfinger_account',
'fetch_nodeinfo',
'generate_rsa_key',
'parse_signature',
'set_requests_client',
'verify_headers',
'verify_request',
]

View file

@ -0,0 +1,426 @@
import json, requests, sys
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from PIL import Image
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property, lru_cache
from io import BytesIO
from izzylib import DefaultDotDict, DotDict, Path, izzylog as logging, __version__
from izzylib.exceptions import HttpFileDownloadedError
from ssl import SSLCertVerificationError
from tldextract import extract
from urllib.parse import urlparse
Client = None
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class HttpRequestsClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
def set_global(self):
global Client
Client = self
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
izzylog.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def request(self, *args, method='get', **kwargs):
if method.lower() not in methods:
raise ValueError(f'Invalid method: {method}')
request = HttpRequestsRequest(self, *args, method=method.lower(), **kwargs)
return HttpRequestsResponse(request.send())
def download(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
raise HttpFileDownloadedError(f'Failed to download {url}: Status: {resp.status}, Body: {resp.body}')
return resp.save(filepath)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
izzylog.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists:
izzylog.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = HttpRequestsRequest(self, *args, **kwargs)
self.__sign_request(request, privkey, keyid)
return HttpRequestsResponse(request.send())
class HttpRequestsRequest(object):
def __init__(self, client, url, data=None, headers={}, query={}, method='get'):
self.args = [url]
self.kwargs = {'params': query}
self.method = method.lower()
self.client = client
new_headers = client.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = client.agent
self.kwargs['headers'] = new_headers
self.kwargs['data'] = data
if client.proxy.enabled:
self.kwargs['proxies'] = {self.proxy.ptype: f'{self.proxy.ptype}://{self.proxy.host}:{self.proxy.port}'}
def send(self):
func = getattr(requests, self.method)
return func(*self.args, **self.kwargs)
class HttpRequestsResponse(object):
def __init__(self, response):
self.response = response
self.data = b''
self.headers = DefaultDotDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status_code
self.url = response.url
def chunks(self, size=256):
return self.response.iter_content(chunk_size=256)
@property
def body(self):
for chunk in self.chunks():
self.data += chunk
return self.data
@cached_property
def text(self):
return self.data.decode(self.response.encoding)
@cached_property
def json(self):
return DotDict(self.body)
@cached_property
def json_pretty(self, indent=4):
return json.dumps(self.json, indent=indent)
def save(self, path, overwrite=True):
path = Path(path)
if not path.parent.exists:
raise ValueError(f'Path does not exist: {path.parent}')
if overwrite and path.exists:
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
async def verify_request(request, actor: dict=None):
'''Verify a header signature from a SimpleASGI request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
body = (await request.body) if request.body else None
headers = {k.lower(): v[0] for k,v in request.headers.items()}
return verify_headers(headers, request.method, request.path, actor, body)
def verify_headers(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = parse_signature(headers.get('signature'))
digest = headers.get('digest')
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
raise AssertionError('Missing signature')
if not actor:
actor = fetch_actor(signature.keyid)
print(actor)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
raise AssertionError(f'Missing headers: {missing_headers}')
## Fail if body verification fails
if digest:
digest_hash = parse_body_digest(headers.get('digest'))
if not verify_string(body, digest_hash.sig, digest_hash.alg):
raise AssertionError('Failed body digest verification')
pubkey = actor.publicKey['publicKeyPem']
return sign_pkcs_headers(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature)
def parse_body_digest(digest):
if not digest:
raise AssertionError('Empty digest')
parsed = DotDict()
alg, sig = digest.split('=', 1)
parsed.sig = sig
parsed.alg = alg.replace('-', '')
return parsed
def verify_string(string, enc_string, alg='SHA256', fail=False):
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise AssertionError('String failed validation')
else:
return False
def sign_pkcs_headers(key: str, headers: dict, sig=None):
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def parse_signature(signature: str):
if not signature:
return
raise AssertionError('Missing signature header')
split_sig = signature.split(',')
sig = DefaultDotDict()
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
sig.headers = sig.headers.split()
sig.domain = urlparse(sig.keyid).netloc
sig.top_domain = '.'.join(extract(sig.domain)[1:])
return sig
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
try:
actor = resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
data = DefaultDotDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json.subject.replace('acct:', '').split('@')
for link in webfinger.json.links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
@lru_cache(maxsize=512)
def fetch_nodeinfo(domain):
if not Client:
raise ValueError('Please set global client with HttpRequestsClient.set_global()')
webfinger = Client.request(f'https://{domain}/.well-known/nodeinfo')
webfinger_data = DotDict(webfinger.body)
for link in webfinger.json.links:
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
nodeinfo = Client.request(nodeinfo_url)
return nodeinfo.json
def set_requests_client(client=None):
global Client
Client = client or RequestsClient()
def generate_rsa_key():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key
class SigningError(Exception):
pass

View file

@ -6,6 +6,7 @@ requires = [
'pillow==8.2.0',
'pycryptodome==3.10.1',
'requests==2.25.1',
'tldextract==3.1.0'
]

View file

@ -1,2 +1,2 @@
from .generic import Column, CustomRows, Session, SqlDatabase, Tables
from .generic import SqlColumn, CustomRows, SqlSession, SqlDatabase, Tables
from .sqlite_server import SqliteClient, SqliteColumn, SqliteServer, SqliteSession

View file

@ -3,7 +3,7 @@ import json, sys, threading, time
from contextlib import contextmanager
from datetime import datetime
from sqlalchemy import create_engine, ForeignKey, MetaData, Table
from sqlalchemy import Column as SqlColumn, types as Types
from sqlalchemy import Column, types as Types
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.orm import scoped_session, sessionmaker
@ -27,11 +27,11 @@ class SqlDatabase:
self.classes = kwargs.get('row_classes', CustomRows())
self.cache = None
self.session_class = kwargs.get('session_class', Session)
self.session_class = kwargs.get('session_class', SqlSession)
self.sessions = {}
self.SetupTables(tables)
self.SetupCache()
self.setup_tables(tables)
self.setup_cache()
## Leaving link to example code for read-only sqlite for later use
@ -69,7 +69,7 @@ class SqlDatabase:
engine_string += user + '@'
if host == '/var/run/postgresql':
engine_string += '/' + name
engine_string += f'/{name}:{port}/{name}'
else:
engine_string += f'{host}:{port}/{name}'
@ -83,19 +83,19 @@ class SqlDatabase:
def close(self):
self.SetupCache()
self.setup_cache()
def SetupCache(self):
def setup_cache(self):
self.cache = DotDict({table: LruCache() for table in self.table_names})
def CreateTables(self, *tables):
def create_tables(self, *tables):
new_tables = [self.table[table] for table in tables]
self.table.meta.create_all(bind=self.db, tables=new_tables)
def CreateDatabase(self):
def create_database(self):
if self.db.url.get_backend_name() == 'postgresql':
predb = create_engine(db.engine_string.replace(config.db.name, 'postgres', -1))
conn = predb.connect()
@ -116,7 +116,7 @@ class SqlDatabase:
self.table.meta.create_all(self.db)
def SetupTables(self, tables):
def setup_tables(self, tables):
self.table = Tables(self, tables)
self.table_names = tables.keys()
@ -126,7 +126,7 @@ class SqlDatabase:
return s.execute(*args, **kwargs)
class Session(object):
class SqlSession(object):
def __init__(self, db):
self.closed = False
@ -259,19 +259,19 @@ class Session(object):
row = self.execute(f'DELETE FROM {table} WHERE id={rowid}')
def DropTables(self):
tables = self.GetTables()
def drop_tables(self):
tables = self.get_tables()
for table in tables:
self.execute(f'DROP TABLE {table}')
def GetTables(self):
def get_tables(self):
rows = self.execute("SELECT name FROM sqlite_master WHERE type IN ('table','view') and name NOT LIKE 'sqlite_%'")
return [row[0] for row in rows]
def AppendColumn(self, tbl, col):
def append_column(self, tbl, col):
table = self.table[tbl]
try:
@ -301,7 +301,7 @@ class Session(object):
self.execute(sql)
def RemoveColumn(self, tbl, col):
def remove_column(self, tbl, col):
table = self.table[tbl]
column = getattr(table, col, None)
columns = [row[1] for row in self.execute(f'PRAGMA table_info({tbl})')]
@ -416,24 +416,29 @@ class Tables(DotDict):
def __setup_table(self, name, table):
columns = [col if type(col) == SqlColumn else Column(*col.get('args'), **col.get('kwargs')) for col in table]
columns = [col if type(col) == Column else Column(*col.get('args'), **col.get('kwargs')) for col in table]
self[name] = Table(name, self.meta, *columns)
def Column(name, stype=None, fkey=None, **kwargs):
def SqlColumn(name, stype=None, fkey=None, **kwargs):
if not stype and not kwargs:
if name == 'id':
return Column('id', 'integer', primary_key=True, autoincrement=True)
return Column('id', SqlTypes['integer'], primary_key=True, autoincrement=True)
elif name == 'timestamp':
return Column('timestamp', 'datetime')
return Column('timestamp', SqlTypes['datetime'])
raise ValueError('Missing column type and options')
else:
options = [name, SqlTypes.get(stype.lower(), SqlTypes['string'])]
try:
stype = stype or 'string'
options = [name, SqlTypes[stype.lower()]]
except KeyError:
raise KeyError(f'Invalid SQL data type: {stype}')
if fkey:
options.append(ForeignKey(fkey))
return SqlColumn(*options, **kwargs)
return Column(*options, **kwargs)

View file

@ -1,6 +1,8 @@
import asyncio, json, socket, sqlite3, ssl, time, traceback
from izzylib import CustomRows, DotDict, Path, JsonEncoder, SqlDatabase, izzylog
from izzylib import DotDict, JsonEncoder, Path, izzylog
from . import CustomRows, SqlDatabase
commands = [
@ -12,7 +14,7 @@ commands = [
class SqliteClient(object):
def __init__(self, database: str='metadata', host: str='localhost', port: int=3926, password: str=None, session_class=None):
self.ssl = None
self.data = misc.DotDict({
self.data = DotDict({
'host': host,
'port': int(port),
'password': password,
@ -96,7 +98,7 @@ class SqliteSession(socket.socket):
data = self.recv(8*1024*1024).decode()
try:
data = misc.DotDict(data)
data = DotDict(data)
except ValueError:
data = json.loads(data)
@ -145,12 +147,12 @@ def SqliteColumn(*args, **kwargs):
return {'args': list(args), 'kwargs': dict(kwargs)}
class SqliteServer(misc.DotDict):
class SqliteServer(DotDict):
def __init__(self, path, host='localhost', port=3926, password=None):
self.server = None
self.database = misc.DotDict()
self.database = DotDict()
self.path = misc.Path(path).resolve()
self.path = Path(path).resolve()
self.ssl = None
self.password = password
self.host = host
@ -266,7 +268,7 @@ class SqliteServer(misc.DotDict):
break
try:
data = misc.DotDict(raw_data)
data = DotDict(raw_data)
if self.password:
if valid == None and data.command == 'login':
@ -363,7 +365,7 @@ class SqliteServer(misc.DotDict):
def cmd_update(self, table=None, rowid=None, row=None, **data):
if row:
row = misc.DotDict(row)
row = DotDict(row)
return self.update(table, rowid, row, **data)

View file

@ -1,6 +1,7 @@
import codecs, traceback, os, json, xml
from colour import Color as Colour
from functools import partial
from hamlish_jinja import HamlishExtension
from izzylib import izzylog, DotDict, Path
from jinja2 import Environment, FileSystemLoader, ChoiceLoader, select_autoescape, Markup
@ -18,14 +19,14 @@ except ModuleNotFoundError:
class Template(Environment):
def __init__(self, search=[], global_vars={}, context=None, autoescape=True):
self.autoescape = autoescape
self.search = []
self.func_context = context
self.search = FileSystemLoader([])
for path in search:
self.__add_search_path(Path(path))
self.add_search_path(Path(path))
super().__init__(
loader=ChoiceLoader([FileSystemLoader(path) for path in self.search]),
loader=self.search,
extensions=[HamlishExtension],
autoescape=self.autoescape,
lstrip_blocks=True,
@ -39,21 +40,31 @@ class Template(Environment):
self.globals.update({
'markup': Markup,
'cleanhtml': lambda text: ''.join(xml.etree.ElementTree.fromstring(text).itertext()),
'color': Color
'color': Color,
'lighten': partial(color_func, 'lighten'),
'darken': partial(color_func, 'darken'),
'saturate': partial(color_func, 'saturate'),
'desaturate': partial(color_func, 'desaturate'),
'rgba': partial(color_func, 'rgba')
})
self.globals.update(global_vars)
def __add_search_path(self, path):
def add_search_path(self, path, index=None):
if not path.exists:
raise FileNotFoundError(f'Cannot find search path: {path}')
if path not in self.search:
self.search.append(path)
if path not in self.search.searchpath:
loader = os.fspath(path)
if index != None:
self.search.searchpath.insert(index, loader)
else:
self.search.searchpath.append(loader)
def setContext(self, context):
def set_context(self, context):
if not hasattr(context, '__call__'):
izzylog.error('Context is not callable')
return
@ -65,37 +76,37 @@ class Template(Environment):
self.func_context = context
def addEnv(self, k, v):
def add_env(self, k, v):
self.globals[k] = v
def delEnv(self, var):
def del_env(self, var):
if not self.globals.get(var):
raise ValueError(f'"{var}" not in global variables')
del self.var[var]
def updateEnv(self, data):
def update_env(self, data):
if not isinstance(data, dict):
raise ValueError(f'Environment data not a dict')
self.globals.update(data)
def addFilter(self, funct, name=None):
def add_filter(self, funct, name=None):
name = funct.__name__ if not name else name
self.filters[name] = funct
def delFilter(self, name):
def del_filter(self, name):
if not self.filters.get(name):
raise valueError(f'"{name}" not in global filters')
del self.filters[name]
def updateFilter(self, data):
def update_filter(self, data):
if not isinstance(context, dict):
raise ValueError(f'Filter data not a dict')
@ -130,37 +141,66 @@ class Template(Environment):
class Color(Colour):
def __init__(self, color):
super.__init__(f'#{str(raw_color)}' if raw_color.startswith('#') else raw_color)
if isinstance(color, str):
super().__init__(f'#{str(color)}' if not color.startswith('#') else color)
self.lighten = lambda color, multi: self.alter('lighten', multi)
self.darken = lambda color, multi: self.alter('darken', multi)
self.saturate = lambda color, multi: self.alter('saturate', multi)
self.desaturate = lambda color, multi: self.alter('desaturate', multi)
self.rgba = lambda color, multi: self.alter('rgba', multi)
elif isinstance(color, Colour):
super().__init__(str(color))
else:
raise TypeError(f'Color has to be a string or Color class, not {type(color)}')
def multi(multiplier):
if multiplier >= 1:
def __repr__(self):
return self.__str__()
def __str__(self):
return self.hex_l
def lighten(self, multiplier):
return self.alter('lighten', multiplier)
def darken(self, multiplier):
return self.alter('darken', multiplier)
def saturate(self, multiplier):
return self.alter('saturate', multiplier)
def desaturate(self, multiplier):
return self.alter('desaturate', multiplier)
def rgba(self, multiplier):
return self.alter('rgba', multiplier)
def multi(self, multiplier):
if multiplier >= 100:
return 1
elif multiplier <= 0:
return 0
return multiplier
return multiplier / 100
def alter(action, multiplier):
def alter(self, action, multiplier):
if action == 'lighten':
self.luminance += ((1 - color.luminance) * self.multi(multiplier))
self.luminance += ((1 - self.luminance) * self.multi(multiplier))
elif action == 'darken':
self.luminance -= (color.luminance * self.multi(multiplier))
self.luminance -= (self.luminance * self.multi(multiplier))
elif action == 'saturate':
self.saturation += ((1 - color.saturation) * self.multi(multiplier))
self.saturation += ((1 - self.saturation) * self.multi(multiplier))
elif action == 'desaturate':
self.saturation -= (color.saturation * self.multi(multiplier))
self.saturation -= (self.saturation * self.multi(multiplier))
elif action == 'rgba':
red = self.red*255
@ -169,4 +209,8 @@ class Color(Colour):
trans = self.multi(multiplier)
return f'rgba({red:0.2f}, {green:0.2f}, {blue:0.2f}, {trans:0.2f})'
return self.hex_l
return self
def color_func(action, color, multi):
return Color(color).alter(action, multi)