first draft of rework

This commit is contained in:
Izalia Mae 2021-06-07 16:45:29 -04:00
parent 63a6819e41
commit 27ec8c96a8
37 changed files with 2112 additions and 2262 deletions

View file

@ -1,12 +0,0 @@
'''
IzzyLib by Zoey Mae
Licensed under the CNPL: https://git.pixie.town/thufie/CNPL
https://git.barkshark.xyz/izaliamae/izzylib
'''
import sys
assert sys.version_info >= (3, 6)
__version_tpl__ = (0, 5, 0)
__version__ = '.'.join([str(v) for v in __version_tpl__])

View file

@ -1,56 +0,0 @@
'''functions to alter colors in hex format'''
import re
from colour import Color
check = lambda color: Color(f'#{str(color)}' if re.search(r'^(?:[0-9a-fA-F]{3}){1,2}$', color) else color)
def _multi(multiplier):
if multiplier >= 1:
return 1
elif multiplier <= 0:
return 0
return multiplier
def lighten(color, multiplier):
col = check(color)
col.luminance += ((1 - col.luminance) * _multi(multiplier))
return col.hex_l
def darken(color, multiplier):
col = check(color)
col.luminance -= (col.luminance * _multi(multiplier))
return col.hex_l
def saturate(color, multiplier):
col = check(color)
col.saturation += ((1 - col.saturation) * _multi(multiplier))
return col.hex_l
def desaturate(color, multiplier):
col = check(color)
col.saturation -= (col.saturation * _multi(multiplier))
return col.hex_l
def rgba(color, transparency):
col = check(color)
red = col.red*255
green = col.green*255
blue = col.blue*255
trans = _multi(transparency)
return f'rgba({red:0.2f}, {green:0.2f}, {blue:0.2f}, {trans:0.2f})'
__all__ = ['lighten', 'darken', 'saturate', 'desaturate', 'rgba']

View file

@ -1,17 +0,0 @@
from .. import logging
try:
from .sql import SqlDatabase
from .sqlite_server import SqliteClient, SqliteServer
except ImportError as e:
logging.verbose('Failed to load SqlDatabase, SqliteClient, and SqliteServer. Is sqlalchemy installed?')
try:
from .tiny import TinyDatabase
except ImportError as e:
logging.verbose('Failed to import TinyDatabase. Is tinydb and tinydb-serialization installed?')
try:
from .pysondb import PysonDatabase
except ImportError as e:
logging.verbose('Failed to import PysonDatabase. Is pysondb installed?')

View file

@ -1,353 +0,0 @@
import asyncio
import base64
import datetime
import filelock
import json
import multiprocessing
import operator
import pysondb
import queue
import random
from pysondb.db import JsonDatabase, IdNotFoundError
from .. import misc
class PysonDatabase(multiprocessing.Process):
def __init__(self, dbpath: misc.Path, tables: dict=None):
multiprocessing.Process.__init__(self, daemon=True)
self.dbpath = dbpath
self.tables = tables
self.shutdown = False
self.port = self._setup_port()
self.token = misc.RandomGen()
print(self.port)
self.fetch = lambda *args, **kwargs: self.send_message('fetch', *args, **kwargs)
self.search = lambda *args, **kwargs: self.send_message('search', *args, **kwargs)
self.insert = lambda *args, **kwargs: self.send_message('insert', *args, **kwargs)
self.remove = lambda *args, **kwargs: self.send_message('remove', *args, **kwargs)
self.start()
def run(self):
self.db = DatabaseProcess(self.dbpath, self.tables)
print(self.port)
server = asyncio.create_server(process_queue, '127.0.0.1', self.port)
loop = asyncio.new_event_loop()
loop.run_until_complete(server)
loop.run_forever()
def close(self):
self.terminate()
self.join(timeout=5)
def _setup_port(self):
port = None
while True:
port = random.randint(8096, 16394)
if misc.PortCheck(port) == True:
return port
def get_action(self, action):
return getattr(self.db, action)
def send_message(self, action, table, *args, **kwargs):
data = {
'token': self.token,
'action': action,
'table': table,
'args': args,
'kwargs': kwargs
}
with self.socket as s:
s.send(json.dumps(data))
return s.recieve(16 * 1024 * 1024)
@property
def socket(self):
return misc.Connection(port=self.port)
async def process_queue(self, reader, writer):
data = misc.DotDict(await reader.read(16 * 1024 * 1024))
if data.token != self.token:
return
if data.action == 'close':
self._shutdown = True
new_data = self.get_action(data.action)(data.table, *args, **kwargs)
if isinstance(new_data, dict):
writer.write(json.dumps(new_data))
await writer.drain()
writer.close()
async def pipe_listener(self):
pass
class DatabaseProcess(misc.DotDict):
def __init__(self, dbpath: misc.Path, tables: dict=None):
dbpath = misc.Path(dbpath)
super().__init__()
self.path = misc.Path(dbpath).resolve()
self.metadata = misc.DotDict(**{
'path': self.path.join('metadata.json'),
'lock': self.path.join('metadata.json.lock'),
'version': 0
})
self._closed = False
self.__setup_database(tables)
def __setup_database(self, tables):
self.path.mkdir()
self.load_meta()
for name, columns in tables.items():
self[name] = columns if type(columns) == Table else Table(name, columns)
if not self[name].db:
self[name].setup(self)
def load_meta(self):
if self.metadata.path.exists():
with filelock.FileLock(self.metadata.lock.str()):
data = self.metadata.path.load_json()
self.metadata.update(data)
def save_meta(self):
with filelock.FileLock(self.metadata.lock.str()):
data = self.metadata.copy()
data.pop('path')
data.pop('lock')
self.metadata.path.update_json(data)
self.metadata.path.save_json()
def close(self):
self.save_meta()
self._closed = True
def fetch(self, table, *args, **kwargs):
return self[table].fetch(*args, **kwargs)
def search(self, table, *args, **kwargs):
return self[table].search(*args, **kwargs)
def insert(self, table, *args, **kwargs):
return self[table].insert(*args, **kwargs)
def remove(self, table, *args, **kwargs):
return self[table].remove(*args, **kwargs)
def migrate(self, table=None):
tables = [self[table]] if table else self.table
for name, table in tables:
for row in table.search():
table.update(row.id, )
class Table(JsonDatabase):
def __init__(self, name: str, columns: dict={}):
self.db = None
self.name = name
self.columns = {}
self.add_column('id')
for name, col in columns.items():
if name != 'id':
self.add_column(name, *col)
def setup(self, db):
self.db = db
tablefile = db.path.join(f'table_{self.name}.json')
if not tablefile.exists():
tablefile.touch(mode=0o644)
with tablefile.open('w') as fd:
fd.write('{"data": []}')
super().__init__(tablefile.str())
def add_column(self, name: str, type: str='str', default: bool=None, nullable: bool=True, primary_key: bool=False):
if name == 'id':
type = 'int'
nullable = False
primary_key = True
self.columns[name] = misc.DotDict({
'default': default,
'type': type,
'primary_key': primary_key,
'nullable': nullable
})
def fetch(self, single=True, orderby=None, reverse=False, **kwargs):
if self.db._closed:
return logging.error('Database closed')
if not kwargs:
rows = DBRows(self, self.getAll())
single = False
else:
rows = DBRows(self, self.getBy(kwargs))
if single:
return rows[0] if rows else None
return rows if not orderby else sorted(rows, key=operator.itemgetter(orderby), reverse=reverse)
def search(self, orderby=None, reverse=None, **kwargs):
self.fetch(single=False, orderby=orderby, reverse=reverse, **kwargs)
def insert(self, row=None, rowid=None, **kwargs):
if self.db._closed:
return logging.error('Database closed')
new_data = {}
for name, col in self.columns.items():
raw_value = kwargs.get(name, col.default)
value = serialize(raw_value, col.type)
if not value and not col.nullable:
raise ValueError(f'Column "{name}" cannot be empty')
new_data[name] = value
if row:
rowid = row.id
if rowid:
return self.update({'id': rowid}, new_data)
return self.add(new_data)
def delete(self, rowid):
with self.lock:
with open(self.filename, "r+") as db_file:
db_data = self._get_load_function()(db_file)
result = []
found = False
for d in db_data["data"]:
print(d)
if rowid in d:
found = True
else:
result.append(d)
if not found:
raise IdNotFoundError(kwargs)
db_data["data"] = result
db_file.seek(0)
db_file.truncate()
self._get_dump_function()(db_data, db_file)
return True
def remove(self, row=None, rowid=None, **kwargs):
if self.db._closed:
return logging.error('Database closed')
if row or rowid:
return self.remove({'id': rowid or row.id})
return self.delete(kwargs)
def _get_dump_function(self):
return lambda *args, **kwargs: json.dump(*args, indent=2, **kwargs)
def serialize(data, dtype):
types = {
'datetime': lambda arg: arg.timestamp(),
'dotdict': lambda arg: arg.toDict(),
'bytes': lambda arg: base64.b64encode(arg).decode('ascii'),
'bool': misc.Boolean,
'int': int,
'path': lambda arg: arg.str()
}
if data != None:
serial_type = types.get(dtype)
return serial_type(data) if serial_type else data
return data
def deserialize(data, dtype):
types = {
'datetime': datetime.datetime.fromtimestamp,
'dotdict': misc.DotDict,
'dict': misc.DotDict,
'bytes': lambda: base64.b64decode,
'path': misc.Path
}
return types.get(dtype)(data) if data else None
def DBRows(table, rows):
return [DBRow(table, row) for row in rows]
class DBRow(misc.DotDict):
def __init__(self, table, row):
super().__init(**{k: deserialize(row[v], v.type) for k,v in table.items()})
self.table = table
def __str__(self):
data = ', '.join(f'{k}={v}' for k,v in self.items())
return
def update(self, data={}):
super().update(data)
self.table.update(rowid=self.id, **self)
def remove(self):
self.table.remove(rowid=self.id)

View file

@ -1,23 +0,0 @@
class MissingHeadersError(Exception):
def __init__(self, headers: list):
self.headers = ', '.join(headers)
self.message = f'Missing required headers for verificaton: {self.headers}'
super().init(self.message)
def __str__(self):
return self.message
class VerificationError(Exception):
def __init__(self, string=None):
self.message = f'Failed to verify hash'
if string:
self.message += ' for ' + string
super().__init__(self.message)
def __str__(self):
return self.message

View file

@ -1,657 +0,0 @@
import functools, json, sys
from IzzyLib import logging
from IzzyLib.misc import DefaultDict, DotDict, Path
from base64 import b64decode, b64encode
from datetime import datetime
from io import BytesIO
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from . import error, __version__
try:
import requests
except ImportError:
logging.verbose('Requests module not found. RequestsClient disabled')
requests = False
try:
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
crypto_enabled = True
except ImportError:
logging.verbose('Pycryptodome module not found. HTTP header signing and verifying is disabled')
crypto_enabled = False
try:
from sanic.request import Request as SanicRequest
except ImportError:
logging.verbose('Sanic module not found. Request verification is disabled')
SanicRequest = False
try:
from PIL import Image
except ImportError:
logging.verbose('Pillow module not found. Image downloading is disabled')
Image = False
Client = None
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class HttpClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def __build_request(self, url, data=None, headers={}, method='GET', stream=False):
new_headers = self.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = self.agent
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode('UTF-8')
request = Request(url, data=data, headers=parsed_headers, method=method, stream=stream)
if self.proxy.enabled:
request.set_proxy(f'{self.proxy.host}:{self.proxy.port}', self.proxy.ptype)
return request
def request(self, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
except SSLCertVerificationError as e:
logging.error('HttpClient.request: Certificate error:', e)
return
return HttpResponse(response)
def file(self, url, filepath, *args, filename=None, size=2048, **kwargs):
path = Path(filepath)
filepath = path.join(filename if filename else Path(url).name)
if filepath.exists():
kwargs['headers']['range'] = f'bytes={filepath.size}'
resp = self.request(url, *args, stream=True, **kwargs)
if not resp.headers.get('content-length'):
logging.verbose('File already downloaded fully')
return True
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not path.exists():
logging.error('Path does not exist:', path)
return False
with filepath.open('ab') as fd:
for chunk in resp.chunks(size):
fd.write(chunk)
return True
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
logging.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
logging.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
self.__sign_request(request, privkey, keyid)
try:
response = urlopen(request)
except HTTPError as e:
response = e
return HttpResponse(response)
class HttpResponse(object):
def __init__(self, response):
self.body = response.read()
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status
self.url = response.url
def text(self):
return self.body.decode('UTF-8')
def json(self, fail=False):
try:
return DotDict(self.text())
except Exception as e:
if fail:
raise e from None
else:
return DotDict()
def json_pretty(self, indent=4):
return json.dumps(self.json().asDict(), indent=indent)
class RequestsClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def request(self, *args, method='get', **kwargs):
if method.lower() not in methods:
raise ValueError(f'Invalid method: {method}')
request = RequestsRequest(self, *args, method=method.lower(), **kwargs)
return RequestsResponse(request.send())
def file(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
return resp.save(filepath)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
logging.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
logging.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
logging.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, **kwargs):
return self.dict(*args, **kwargs)
def dict(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs).dict
def signed_request(self, privkey, keyid, *args, **kwargs):
request = RequestsRequest(self, *args, **kwargs)
self.__sign_request(request, privkey, keyid)
return RequestsResponse(request.send())
class RequestsRequest(object):
def __init__(self, client, url, data=None, headers={}, query={}, method='get'):
self.args = [url]
self.kwargs = {'params': query}
self.method = method.lower()
self.client = client
new_headers = client.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = client.agent
self.kwargs['headers'] = new_headers
self.kwargs['data'] = data
if client.proxy.enabled:
self.kwargs['proxies'] = {self.proxy.ptype: f'{self.proxy.ptype}://{self.proxy.host}:{self.proxy.port}'}
def send(self):
func = getattr(requests, self.method)
return func(*self.args, **self.kwargs)
class RequestsResponse(object):
def __init__(self, response):
self.response = response
self.data = b''
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status_code
self.url = response.url
def chunks(self, size=256):
return self.response.iter_content(chunk_size=256)
@property
def body(self):
for chunk in self.chunks():
self.data += chunk
return self.data
@property
def text(self):
if not self.data:
return self.body.decode(self.response.encoding)
return self.data.decode(self.response.encoding)
@property
def dict(self):
try:
return DotDict(self.text)
except Exception as e:
return DotDict()
@property
def json(self):
return json.dumps(self.dict)
@property
def json_pretty(self, indent=4):
return json.dumps(self.dict, indent=indent)
def save(self, path, overwrite=True):
path = Path(path)
parent = path.parent()
if not parent.exists():
raise ValueError(f'Path does not exist: {parent}')
if overwrite and path.exists():
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
def VerifyRequest(request: SanicRequest, actor: dict):
'''Verify a header signature from a sanic request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
if not SanicRequest:
logging.error('Sanic request verification disabled')
return
body = request.body if request.body else None
return VerifyHeaders(request.headers, request.method, request.path, body, actor)
def VerifyHeaders(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = ParseSig(headers.get('signature'))
digest = ParseBodyDigest(headers.get('digest'))
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
logging.verbose('Missing signature')
return False
if not actor:
actor = FetchActor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
logging.verbose('Missing headers:', missing_headers)
return False
## Fail if body verification fails
if digest and not VerifyString(body, digest.sig, digest.alg):
logging.verbose('Failed body digest verification')
return False
pubkey = actor.publicKey['publicKeyPem']
if PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature):
return True
logging.verbose('Failed header verification')
return False
def ParseBodyDigest(digest):
if not digest:
return
parsed = DotDict()
parts = digest.split('=', 1)
if len(parts) != 2:
return
parsed.sig = parts[1]
parsed.alg = parts[0].replace('-', '')
return parsed
def VerifyString(string, enc_string, alg='SHA256', fail=False):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise error.VerificationError()
else:
return False
def PkcsHeaders(key: str, headers: dict, sig=None):
if not crypto_enabled:
logging.error('Crypto functions disabled')
return
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def ParseSig(signature: str):
if not signature:
logging.verbose('Missing signature header')
return
split_sig = signature.split(',')
sig = DefaultDict({})
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
if not sig.headers:
logging.verbose('Missing headers section in signature')
return
sig.headers = sig.headers.split()
return sig
def FetchActor(url):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
if not resp.json():
logging.verbose('functions.FetchActor: Failed to fetch actor:', url)
logging.debug(f'Error {resp.status}: {resp.body}')
return {}
actor = resp.json()
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@functools.lru_cache(maxsize=512)
def FetchWebfingerAcct(handle, domain):
if not Client:
logging.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
data = DefaultDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
return
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
for link in webfinger.json().links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def SetClient(client=None):
global Client
Client = client or HttpClient()
def GenRsaKey():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key

View file

@ -1,221 +0,0 @@
'''Simple logging module'''
import sys
from os import environ as env
from datetime import datetime
stdout = sys.stdout
class Log():
def __init__(self, name=None, config=dict()):
'''setup the logger'''
if not isinstance(config, dict):
raise TypeError(f'config is not a dict')
self.levels = {
'CRIT': 60,
'ERROR': 50,
'WARN': 40,
'INFO': 30,
'VERB': 20,
'DEBUG': 10,
'MERP': 0
}
self.long_levels = {
'CRITICAL': 'CRIT',
'ERROR': 'ERROR',
'WARNING': 'WARN',
'INFO': 'INFO',
'VERBOSE': 'VERB',
'DEBUG': 'DEBUG',
'MERP': 'MERP'
}
self.name = name
self.config = {'windows': sys.executable.endswith('pythonw.exe')}
self.setConfig(self._parseConfig(config))
def _lvlCheck(self, level):
'''make sure the minimum logging level is an int'''
try:
value = int(level)
except ValueError:
level = self.long_levels.get(level.upper(), level)
value = self.levels.get(level)
if value not in self.levels.values():
raise InvalidLevel(f'Invalid logging level: {level}')
return value
def _getLevelName(self, level):
for name, num in self.levels.items():
if level == num:
return name
raise InvalidLevel(f'Invalid logging level: {level}')
def _parseConfig(self, config):
'''parse the new config and update the old values'''
date = config.get('date', self.config.get('date',True))
systemd = config.get('systemd', self.config.get('systemd,', True))
windows = config.get('windows', self.config.get('windows', False))
if not isinstance(date, bool):
raise TypeError(f'value for "date" is not a boolean: {date}')
if not isinstance(systemd, bool):
raise TypeError(f'value for "systemd" is not a boolean: {date}')
level_num = self._lvlCheck(config.get('level', self.config.get('level', 'INFO')))
newconfig = {
'level': self._getLevelName(level_num),
'levelnum': level_num,
'datefmt': config.get('datefmt', self.config.get('datefmt', '%Y-%m-%d %H:%M:%S')),
'date': date,
'systemd': systemd,
'windows': windows,
'systemnotif': config.get('systemnotif', None)
}
return newconfig
def setConfig(self, config):
'''set the config'''
self.config = self._parseConfig(config)
def getConfig(self, key=None):
'''return the current config'''
if key:
if self.config.get(key):
return self.config.get(key)
else:
raise ValueError(f'Invalid config option: {key}')
return self.config
def printConfig(self):
for k,v in self.config.items():
stdout.write(f'{k}: {v}\n')
stdout.flush()
def setLevel(self, level):
self.minimum = self._lvlCheck(level)
def log(self, level, *msg):
if self.config['windows']:
return
'''log to the console'''
levelNum = self._lvlCheck(level)
if type(level) == int:
level = self._getLevelName(level)
if levelNum < self.config['levelnum']:
return
message = ' '.join([str(message) for message in msg])
if self.name:
output = f'[{self.name}] '
else:
output = ''
output += f'{level}: {message}\n'
if self.config['systemnotif']:
self.config['systemnotif'].New(level, message)
if self.config['date'] and (self.config['systemd'] and not env.get('INVOCATION_ID')):
'''only show date when not running in systemd and date var is True'''
date = datetime.now().strftime(self.config['datefmt'])
output = f'{date} {output}'
stdout.write(output)
stdout.flush()
def critical(self, *msg):
self.log('CRIT', *msg)
def error(self, *msg):
self.log('ERROR', *msg)
def warning(self, *msg):
self.log('WARN', *msg)
def info(self, *msg):
self.log('INFO', *msg)
def verbose(self, *msg):
self.log('VERB', *msg)
def debug(self, *msg):
self.log('DEBUG', *msg)
def merp(self, *msg):
self.log('MERP', *msg)
def getLogger(name):
'''get a logging instance and create one if it doesn't exist'''
log = logger.get(name)
if not log:
raise InvalidLogger(f'logger "{name}" doesn\'t exist')
return log
def setLogger(name, config={}):
log = Log(name, config)
logger[name.lower()] = log
return log
class InvalidLevel(Exception):
'''Raise when an invalid logging level was specified'''
class InvalidLogger(Exception):
'''Raise when the specified logger doesn't exist'''
'''create a default logger'''
logger = {
'default': Log()
}
DefaultLog = logger['default']
'''aliases for default logger's log output functions'''
log = DefaultLog.log
critical = DefaultLog.critical
error = DefaultLog.error
warning = DefaultLog.warning
info = DefaultLog.info
verbose = DefaultLog.verbose
debug = DefaultLog.debug
merp = DefaultLog.merp
'''aliases for the default logger's config functions'''
setConfig = DefaultLog.setConfig
getConfig = DefaultLog.getConfig
setLevel = DefaultLog.setLevel
printConfig = DefaultLog.printConfig

View file

@ -1,674 +0,0 @@
'''Miscellaneous functions'''
import hashlib, random, string, sys, os, json, statistics, socket, time, timeit
from os import environ as env
from datetime import datetime
from getpass import getpass
from importlib import util
from pathlib import Path as Pathlib
from shutil import copyfile, rmtree
from . import logging
try:
import argon2
except ImportError:
logging.verbose('argon2-cffi not installed. PasswordHasher class disabled')
argon2 = None
def Boolean(v, return_value=False):
if type(v) not in [str, bool, int, type(None)]:
raise ValueError(f'Value is not a string, boolean, int, or nonetype: {value}')
'''make the value lowercase if it's a string'''
value = v.lower() if isinstance(v, str) else v
if value in [1, True, 'on', 'y', 'yes', 'true', 'enable']:
'''convert string to True'''
return True
if value in [0, False, None, 'off', 'n', 'no', 'false', 'disable', '']:
'''convert string to False'''
return False
if return_value:
'''just return the value'''
return v
return True
def RandomGen(length=20, letters=True, digits=True, extra=None):
if not isinstance(length, int):
raise TypeError(f'Character length must be an integer, not {type(length)}')
characters = ''
if letters:
characters += string.ascii_letters
if digits:
characters += string.digits
if extra:
characters += extra
return ''.join(random.choices(characters, k=length))
def HashString(string, alg='blake2s'):
if alg not in hashlib.__always_supported:
logging.error('Unsupported hash algorithm:', alg)
logging.error('Supported algs:', ', '.join(hashlib.__always_supported))
return
string = string.encode('UTF-8') if type(string) != bytes else string
salt = salt.encode('UTF-8') if type(salt) != bytes else salt
newhash = hashlib.new(alg)
newhash.update(string)
return newhash.hexdigest()
def Timestamp(dtobj=None, utc=False):
dtime = dtobj if dtobj else datetime
date = dtime.utcnow() if utc else dtime.now()
return date.timestamp()
def GetVarName(*kwargs, single=True):
keys = list(kwargs.keys())
return key[0] if single else keys
def ApDate(date=None, alt=False):
if not date:
date = datetime.utcnow()
elif type(date) == int:
date = datetime.fromtimestamp(date)
elif type(date) != datetime:
raise TypeError(f'Unsupported object type for ApDate: {type(date)}')
return date.strftime('%a, %d %b %Y %H:%M:%S GMT' if alt else '%Y-%m-%dT%H:%M:%SZ')
def GetIp():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
data = s.getsockname()
ip = data[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def ImportFromPath(mod_path):
mod_path = Path(mod_path)
path = mod_path.join('__init__.py') if mod_path.isdir() else mod_path
name = path.name.replace('.py', '', -1)
spec = util.spec_from_file_location(name, path.str())
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def Input(prompt, default=None, valtype=str, options=[], password=False):
input_func = getpass if password else input
if default != None:
prompt += ' [-redacted-]' if password else f' [{default}]'
prompt += '\n'
if options:
opt = '/'.join(options)
prompt += f'[{opt}]'
prompt += ': '
value = input_func(prompt)
while value and len(options) > 0 and value not in options:
input_func('Invalid value:', value)
value = input(prompt)
if not value or value == '':
return default
ret = valtype(value)
while valtype == Path and not ret.parent().exists():
input_func('Parent directory doesn\'t exist')
ret = Path(input(prompt))
return ret
def NfsCheck(path):
proc = Path('/proc/mounts')
path = Path(path).resolve()
if not proc.exists():
return True
with proc.open() as fd:
for line in fd:
line = line.split()
if line[2] == 'nfs' and line[1] in path.str():
return True
return False
def PortCheck(port, address='127.0.0.1', tcp=True):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) as s:
try:
return not s.connect_ex((address, port)) == 0
except socket.error as e:
print(e)
return False
def PrintMethods(object, include_underscore=False):
for line in dir(object):
if line.startswith('_'):
if include_underscore:
print(line)
else:
print(line)
def TimeFunction(func, *args, passes=1, use_gc=True, **kwargs):
options = [
lambda: func(*args, **kwargs)
]
if use_gc:
options.append('gc.enable()')
timer = timeit.Timer(*options)
if passes > 1:
return timer.repeat(passes, 1)
return timer.timeit(1)
def TimeFunctionPPrint(func, *args, passes=5, use_gc=True, floatlen=3, **kwargs):
parse_time = lambda num: f'{round(num, floatlen)}s'
times = []
for idx in range(0, passes):
passtime = TimeFunction(func, *args, **kwargs, passes=1, use_gc=use_gc)
times.append(passtime)
print(f'Pass {idx+1}: {parse_time(passtime)}')
average = statistics.fmean(times)
print('-----------------')
print(f'Average: {parse_time(average)}')
print(f'Total: {parse_time(sum(times))}')
def TimePassHasher(string='hecking heck', passes=3, iterations=[2,4,8,16,32,64,96]):
for iteration in iterations:
print('\nTesting hash iterations:', iteration)
hasher = PasswordHasher(iterations=iteration)
strhash = hasher.hash(string)
TimeFunctionPPrint(hasher.verify, strhash, string, passes=passes)
class Connection(socket.socket):
def __init__(self, address='127.0.0.1', port=8080, tcp=True):
super().__init__(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM)
self.address = address
self.port = port
def __enter__(self):
self.connect((self.address, self.port))
return self
def __exit__(self, exctype, value, tb):
self.close()
def send(self, msg):
self.sendall(msg)
def recieve(self, size=8192):
return self.recv(size)
class DotDict(dict):
dict_ignore_types = ['basecache', 'lrucache', 'ttlcache']
def __init__(self, value=None, **kwargs):
'''Python dictionary, but variables can be set/get via attributes
value [str, bytes, dict]: JSON or dict of values to init with
kwargs: key/value pairs to set on init. Overrides identical keys set by 'value'
'''
super().__init__()
self.__setattr__ = self.__setitem__
## compatibility
self.toJson = self.to_json
self.fromJson = self.from_json
if isinstance(value, (str, bytes)):
self.from_json(value)
elif value.__class__.__name__.lower() not in self.dict_ignore_types and isinstance(value, dict):
self.update(value)
elif value:
raise TypeError(f'The value must be a JSON string, list, dict, or another DotDict object, not {value.__class__}')
if kwargs:
self.update(kwargs)
def __getattr__(self, k):
try:
return super().__getitem__(k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setitem__(self, k, v):
if v.__class__.__name__.lower() not in self.dict_ignore_types and isinstance(v, dict):
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def update(self, data):
for k,v in data.items():
self.__setitem__(k, v)
def to_json(self, indent=None, **kwargs):
if 'cls' not in kwargs:
kwargs['cls'] = JsonEncoder
return json.dumps(dict(self), indent=indent, **kwargs)
def from_json(self, string):
data = json.loads(string)
self.update(data)
def load_json(self, path: str=None):
self.update(Path(path).load_json())
def save_json(self, path: str, **kwargs):
with Path(path).open(w) as fd:
write(self.toJson(*kwargs))
## This has to be reworked tbh
class DefaultDict(DotDict):
def __getattr__(self, key):
try:
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, DefaultDict())
return DotDict(val) if type(val) == dict else val
class LowerDotDict(DotDict):
def __getattr__(self, key):
return super().__getattr__(self, key.lower())
def __setattr__(self, key, value):
return super().__setattr__(key.lower(), value)
def update(self, data):
data = {k.lower(): v for k,v in self.items()}
return super().update(data)
class Path(object):
def __init__(self, path, exist=True, missing=True, parents=True):
self.__path = Pathlib(str(path))
if str(path).startswith('~'):
self.__path = self.__path.expanduser()
self.json = DotDict()
self.exist = exist
self.missing = missing
self.parents = parents
self.name = self.__path.name
self.stem = self.__path.stem
def __str__(self):
return str(self.__path)
def __repr__(self):
return f'Path({str(self.__path)})'
def str(self):
return self.__str__()
def __check_dir(self, path=None):
target = self if not path else Path(path)
if not self.parents and not target.parent().exists():
raise FileNotFoundError('Parent directories do not exist:', target.str())
if not self.exist and target.exists():
raise FileExistsError('File or directory already exists:', target.str())
def __parse_perm_octal(self, mode):
return mode if type(mode) == oct else eval(f'0o{mode}')
def append(self, text, new=True):
path = str(self.__path) + text
if new:
return Path(path)
self.__path = Pathlib(path)
return self
def size(self):
return self.__path.stat().st_size
def mtime(self):
return self.__path.stat().st_mtime
def mkdir(self, mode=0o755):
self.__path.mkdir(mode, self.parents, self.exist)
return True if self.__path.exists() else False
def new(self):
return Path(self.__path)
def parent(self, new=True):
path = Pathlib(self.__path).parent
if new:
return Path(path)
self.__path = path
return self
def copy(self, path, overwrite=False):
target = Path(path)
self.__check_dir(path)
if target.exists() and overwrite:
target.delete()
copyfile(self.str(), target.str())
def backup(self, ext='backup', overwrite=False):
target = f'{self.__path.parent}.{ext}'
self.copy(target, overwrite)
def move(self, path, overwrite=False):
self.copy(path, overwrite=overwrite)
self.delete()
def join(self, path, new=True):
new_path = self.__path.joinpath(path)
if new:
return Path(new_path)
self.__path = new_path
return self
def home(self, path=None, new=True):
new_path = Pathlib.home()
if path:
new_path = new_path.joinpath(path)
if new:
return Path(new_path)
self.__path = new_path
return self
def isdir(self):
return self.__path.is_dir()
def isfile(self):
return self.__path.is_file()
def islink(self):
return self.__path.is_symlink()
def listdir(self, recursive=True):
paths = self.__path.iterdir() if recursive else os.listdir(self.__path)
return [Path(path) for path in paths]
def exists(self):
return self.__path.exists()
def mtime(self):
return os.path.getmtime(self.str())
def size(self):
return self.__path.stat().st_size
def link(self, path):
target = Path(path)
self.__check_dir(path)
if target.exists():
target.delete()
self.__path.symlink_to(path, target.isdir())
def resolve(self, new=True):
path = self.__path.resolve()
if new:
return Path(path)
self.__path = path
return self
def chmod(self, mode=None):
octal = self.__parse_perm_octal(mode)
self.__path.chmod(octal)
def touch(self, mode=0o666):
octal = __parse_perm_octal(mode)
self.__path.touch(octal, self.exist)
return self.exists()
def mkdir(self):
self.__path.mkdir(parents=self.parents, exist_ok=self.exist)
return self.exists()
def load_json(self):
self.json = DotDict(self.read())
return self.json
def save_json(self, indent=None):
with self.__path.open('w') as fp:
fp.write(json.dumps(self.json.asDict(), indent=indent, cls=JsonEncoder))
def update_json(self, data={}):
if type(data) == str:
data = json.loads(data)
self.json.update(data)
def delete(self):
if self.isdir():
rmtree(self.__path)
else:
self.__path.unlink()
return not self.exists()
def open(self, *args):
return self.__path.open(*args)
def read(self, *args):
return self.open().read(*args)
def readlines(self):
return self.open().readlines()
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if not any(map(isinstance, [obj], [str, int, float, dict])):
return str(obj)
return json.JSONEncoder.default(self, obj)
class PasswordHasher(DotDict):
## The defaults can usually be used, except for `iterations`. That should be tweaked on each machine
## You can use the TimeFunctionPPrint command above to test this
aliases = {
'iterations': 'time_cost',
'memory': 'memory_cost',
'threads': 'parallelism'
}
def __init__(self, **kwargs):
if not argon2:
raise ValueError('password hashing disabled')
super().__init__({
'time_cost': 16,
'memory_cost': 100 * 1024,
'parallelism': os.cpu_count(),
'encoding': 'utf-8',
'type': argon2.Type.ID,
})
self.hasher = None
self.update(kwargs)
self.setup()
def get_config(self, key):
key = self.aliases.get(key, key)
self[key]
return self.get(key) / 1024 if key == 'memory_cost' else self.get(key)
def set_config(self, key, value):
key = self.aliases.get(key, key)
self[key] = value * 1024 if key == 'memory_cost' else value
self.setup()
def setup(self):
self.hasher = argon2.PasswordHasher(**self)
def hash(self, password: str):
return self.hasher.hash(password)
def verify(self, passhash: str, password: str):
try:
return self.hasher.verify(passhash, password)
except argon2.exceptions.VerifyMismatchError:
return False
def iteration_test(self, string='hecking heck', passes=3, iterations=[8,16,24,32,40,48,56,64]):
original_iter = self.get_config('iterations')
for iteration in iterations:
self.set_config('iterations', iteration)
print('\nTesting hash iterations:', iteration)
TimeFunctionPPrint(self.verify, self.hash(string), string, passes=passes)
self.set_config('iterations', original_iter)

52
LICENSE
View file

@ -1,7 +1,7 @@
IzzyLib
Copyright Zoey Mae 2020
Copyright Zoey Mae 2021
COOPERATIVE NON-VIOLENT PUBLIC LICENSE v4
COOPERATIVE NON-VIOLENT PUBLIC LICENSE v6
Preamble
@ -19,7 +19,7 @@ Official Webpage: https://thufie.lain.haus/NPL.html
Terms and Conditions
THE WORK (AS DEFINED BELOW) IS PROVIDED UNDER THE TERMS OF THIS
COOPERATIVE NON-VIOLENT PUBLIC LICENSE v4 ("LICENSE"). THE WORK IS
COOPERATIVE NON-VIOLENT PUBLIC LICENSE v5 ("LICENSE"). THE WORK IS
PROTECTED BY COPYRIGHT AND ALL OTHER APPLICABLE LAWS. ANY USE OF THE
WORK OTHER THAN AS AUTHORIZED UNDER THIS LICENSE OR COPYRIGHT LAW IS
PROHIBITED. BY EXERCISING ANY RIGHTS TO THE WORK PROVIDED IN THIS
@ -51,11 +51,14 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
For the avoidance of doubt, where the Work is a musical work,
performance or phonogram, the synchronization of the Work in
timed-relation with a moving image ("synching") will be
considered an Adaptation for the purpose of this License.
considered an Adaptation for the purpose of this License. In
addition, where the Work is designed to output a neural network
the output of the neural network will be considered an
Adaptation for the purpose of this license.
c. "Bodily Harm" means any physical hurt or injury to a person that
interferes with the health or comfort of the person and that is more
more than merely transient or trifling in nature.
than merely transient or trifling in nature.
d. "Collection" means a collection of literary or artistic
works, such as encyclopedias and anthologies, or performances,
@ -75,9 +78,13 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
through sale, gift or any other transfer of possession or
ownership.
f. "Incarceration" means confinement in a jail, prison, or any
other place where individuals of any kind are held against
either their will or the will of their legal guardians.
f. "Incarceration" means confinement in a jail, prison, or
any other place where individuals of any kind are held against
either their will or (if their will cannot be determined) the
will of their legal guardian or guardians. In the case of a
conflict between the will of the individual and the will of
their legal guardian or guardians, the will of the
individual will take precedence.
g. "Licensor" means the individual, individuals, entity or
entities that offer(s) the Work under the terms of this License.
@ -153,7 +160,7 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
overtly or covertly observe and record persons and or their
activities.
p. "Web Service" means the use of a piece of Software to
p. "Network Service" means the use of a piece of Software to
interpret or modify information that is subsequently and directly
served to users over the Internet.
@ -166,6 +173,11 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
for some group or advocating a form of Discrimination
(to Discriminate per definition in (q)) between humans.
s. "Coercion" means leveraging of the threat of force or use of force
to intimidate a person in order to gain compliance, or to offer
large incentives which aim to entice a person to act against their
will.
2. FAIR DEALING RIGHTS
Nothing in this License is intended to reduce, limit, or restrict any
@ -251,7 +263,7 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
form, or You provide a URI for the corresponding Source Code of
the Work, to any recipients upon request.
d. If the Work is used as or for a Web Service, You may exercise
d. If the Work is used as or for a Network Service, You may exercise
the rights granted in Section 3 only if You provide a copy of the
corresponding Source Code from which the Work was derived in digital
form, or You provide a URI for the corresponding Source Code to the
@ -259,11 +271,11 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
Service.
e. You may exercise the rights granted in Section 3 for
commercial purposes only if you satisfy any of the following:
commercial purposes only if:
i. You are a worker-owned business or worker-owned
collective; and
ii. after tax, all financial gain, surplus, profits and
ii. after tax, all financial gain, surplus, profits and
benefits produced by the business or collective are
distributed among the worker-owners unless a set amount
is to be allocated towards community projects as decided
@ -285,15 +297,19 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
i. You do not use the Work for the purpose of inflicting
Bodily Harm on human beings (subject to criminal
prosecution or otherwise) outside of providing medical aid.
prosecution or otherwise) outside of providing medical aid
or undergoing a voluntary procedure under no form of
Coercion.
ii.You do not use the Work for the purpose of Surveilling
or tracking individuals for financial gain.
iii. You do not use the Work in an Act of War.
iv. You do not use the Work for the purpose of supporting
or profiting from an Act of War.
v. You do not use the Work for the purpose of Incarceration.
vi. You do not use the Work for the purpose of extracting
oil, gas, or coal.
vi. You do not use the Work for the purpose of extracting,
processing, or refining, oil, gas, or coal. Or to in any other
way to deliberately pollute the environment as a byproduct
of manufacturing or irresponsible disposal of hazardous materials.
vii. You do not use the Work for the purpose of
expediting, coordinating, or facilitating paid work
undertaken by individuals under the age of 12 years.
@ -310,11 +326,11 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
pseudonym, if applicable) if supplied, and/or if the Original
Author and/or Licensor designate another party or parties (e.g.,
a sponsor institute, publishing entity, journal) for attribution
("Attribution Parties") in Licensor!s copyright notice, terms of
("Attribution Parties") in Licensor's copyright notice, terms of
service or by other reasonable means, the name of such party or
parties; (ii) the title of the Work if supplied; (iii) to the
extent reasonably practicable, the URI, if any, that Licensor
specifies to be associated with the Work, unless such URI does
to be associated with the Work, unless such URI does
not refer to the copyright notice or licensing information for
the Work; and, (iv) consistent with Section 3(b), in the case of
an Adaptation, a credit identifying the use of the Work in the
@ -464,7 +480,7 @@ TO BE BOUND BY THE TERMS AND CONDITIONS OF THIS LICENSE.
a copy and/or URI of the corresponding Source Code on the same
terms and conditions as the license granted to You under this License.
d. If the Work is used as a Web Service, each time You Distribute
d. If the Work is used as a Network Service, each time You Distribute
or Publicly Perform an Adaptation, or serve data derived from the
Software, the Licensor offers to any recipients of the data a copy
and/or URI of the corresponding Source Code on the same terms and

View file

@ -1,5 +1,32 @@
# IzzyLib
These are just a number of functions I keep reusing over and over again in most of my projects
These are just a number of functions I keep reusing over and over again in most of my projects. It's okay to use them if you also find them useful
Note: not in a stable state yet. Expect major changes
## Installation
You only need to install the base and whatever sub-modules you want to use
### From Git
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-base&subdirectory=base
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-http-server&subdirectory=http_server
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-http-requests-client&subdirectory=requests_client
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-sql&subdirectory=sql
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-templates&subdirectory=template
$(venv)/bin/python -m pip install -e git+https://git.barkshark.xyz/izaliamae/izzylib.git#egg=izzylib-tinydb&subdirectory=tinydb
### From Source
$(venv)/bin/python -m pip install ./base ./http_server ./requests_client ./sql ./template ./tinydb
## Documentation
### Importing
Most useful classes and functions are imported in the module root, so you don't need to do any multi-level imports. For example, just do `from izzylib import SqlDatabase` instead of `from izzylib.sql.generic import SqlDatabase`. Or even simply do `import izzylib` and use `izzylib.SqlDatabase()`.
### Usage
All classes and functions will have docstrings. Either look through the code or run `help()` on an object
# NOTE!
not in a stable state yet. Expect major changes

56
base/izzylib/__init__.py Normal file
View file

@ -0,0 +1,56 @@
'''
IzzyLib by Zoey Mae
Licensed under the CNPL: https://git.pixie.town/thufie/CNPL
https://git.barkshark.xyz/izaliamae/izzylib
'''
import sys, traceback
assert sys.version_info >= (3, 7)
__version_tpl__ = (0, 6, 0)
__version__ = '.'.join([str(v) for v in __version_tpl__])
from . import logging
izzylog = logging.get_logger('IzzyLib')
from .dotdict import DotDict, LowerDotDict, DefaultDotDict, JsonEncoder
from .misc import *
from .cache import LruCache, TtlCache
from .connection import Connection
from .path import Path
from .http_urllib_client import HttpUrllibClient, HttpUrllibResponse
def log_import_error(*message):
izzylog.verbose(*message)
if izzylog.get_config('level') == logging.Levels.DEBUG:
traceback.print_exc()
try:
from izzylib.sql import Column, CustomRows, Session, SqlDatabase, Tables, SqliteClient, SqliteColumn, SqliteServer, SqliteSession
except ImportError:
log_import_error('Failed to import SQL classes. Connecting to SQL databases is disabled')
try:
from izzylib.tinydb import TinyDatabase, TinyRow, TinyRows
except ImportError:
log_import_error('Failed to import TinyDB classes. TinyDB database is disabled')
try:
from izzylib.template import Template, Color
except ImportError:
log_import_error('Failed to import http template classes. Jinja and HAML templates disabled')
try:
from izzylib.http_requests_client import HttpRequestsClient, HttpRequestsRequest, HttpRequestsResponse
except ImportError:
log_import_error('Failed to import Requests http client classes. Requests http client is disabled.')
try:
from izzylib.http_server import PasswordHasher, HttpServer, HttpServerRequest, HttpServerResponse
except ImportError:
log_import_error('Failed to import HTTP server classes. The HTTP server will be disabled')

View file

@ -5,7 +5,7 @@ import re
from datetime import datetime
from collections import OrderedDict
from .misc import DotDict
from . import DotDict
def parse_ttl(ttl):
@ -113,11 +113,11 @@ class BaseCache(OrderedDict):
return self[key].data
class TTLCache(BaseCache):
class TtlCache(BaseCache):
def __init__(self, maxsize=1024, ttl='1h'):
super().__init__(maxsize, ttl)
class LRUCache(BaseCache):
class LruCache(BaseCache):
def __init__(self, maxsize=1024):
super().__init__(maxsize)

View file

@ -0,0 +1,25 @@
import socket
class Connection(socket.socket):
def __init__(self, address='127.0.0.1', port=8080, tcp=True):
super().__init__(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM)
self.address = address
self.port = port
def __enter__(self):
self.connect((self.address, self.port))
return self
def __exit__(self, exctype, value, tb):
self.close()
def send(self, msg):
self.sendall(msg)
def recieve(self, size=8192):
return self.recv(size)

115
base/izzylib/dotdict.py Normal file
View file

@ -0,0 +1,115 @@
import json
class DotDict(dict):
dict_ignore_types = ['basecache', 'lrucache', 'ttlcache']
def __init__(self, value=None, **kwargs):
'''Python dictionary, but variables can be set/get via attributes
value [str, bytes, dict]: JSON or dict of values to init with
kwargs: key/value pairs to set on init. Overrides identical keys set by 'value'
'''
super().__init__()
self.__setattr__ = self.__setitem__
## compatibility
self.toJson = self.to_json
self.fromJson = self.from_json
if isinstance(value, (str, bytes)):
self.from_json(value)
elif value.__class__.__name__.lower() not in self.dict_ignore_types and isinstance(value, dict):
self.update(value)
elif value:
raise TypeError(f'The value must be a JSON string, list, dict, or another DotDict object, not {value.__class__}')
if kwargs:
self.update(kwargs)
def __getattr__(self, k):
try:
return super().__getitem__(k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def __setitem__(self, k, v):
if v.__class__.__name__.lower() not in self.dict_ignore_types and isinstance(v, dict):
v = DotDict(v)
super().__setitem__(k, v)
def __delattr__(self, k):
try:
dict.__delitem__(self, k)
except KeyError:
raise AttributeError(f'{self.__class__.__name__} object has no attribute {k}') from None
def update(self, data):
for k,v in data.items():
self.__setitem__(k, v)
def to_json(self, indent=None, **kwargs):
if 'cls' not in kwargs:
kwargs['cls'] = JsonEncoder
return json.dumps(dict(self), indent=indent, **kwargs)
def from_json(self, string):
data = json.loads(string)
self.update(data)
def load_json(self, path: str=None):
self.update(Path(path).load_json())
def save_json(self, path: str, **kwargs):
with Path(path).open(w) as fd:
write(self.toJson(*kwargs))
## This has to be reworked tbh
class DefaultDotDict(DotDict):
def __getattr__(self, key):
try:
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, DefaultDict())
return DotDict(val) if type(val) == dict else val
class LowerDotDict(DotDict):
def __getattr__(self, key):
return super().__getattr__(self, key.lower())
def __setattr__(self, key, value):
return super().__setattr__(key.lower(), value)
def update(self, data):
data = {k.lower(): v for k,v in self.items()}
return super().update(data)
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if not any(map(isinstance, [obj], [str, int, float, dict])):
return str(obj)
return json.JSONEncoder.default(self, obj)

View file

@ -0,0 +1,2 @@
class HttpFileDownloadedError(Exception):
'raise when a download failed for any reason'

View file

@ -0,0 +1,163 @@
import functools, json, sys
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property
from io import BytesIO
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from . import DefaultDotDict, DotDict, Path, exceptions, izzylog, __version__
try:
from PIL import Image
except ImportError:
izzylog.verbose('Pillow module not found. Image downloading is disabled')
Image = False
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class HttpUrllibClient:
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
def __build_request(self, url, data=None, headers={}, method='GET'):
new_headers = self.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = self.agent
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode('UTF-8')
request = Request(url, data=data, headers=parsed_headers, method=method)
if self.proxy.enabled:
request.set_proxy(f'{self.proxy.host}:{self.proxy.port}', self.proxy.ptype)
return request
def request(self, *args, **kwargs):
request = self.__build_request(*args, **kwargs)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
return HttpUrllibResponse(response)
def file(self, url, filepath, *args, filename=None, size=2048, create_dirs=True, **kwargs):
filepath = Path(filepath)
path = filepath.parent
if not path.exists() and not create_dirs:
raise FileNotFoundError(f'Path does not exist: {path}')
path.mkdir()
if filepath.exists():
kwargs['headers']['range'] = f'bytes={filepath.size}'
resp = self.request(url, *args, stream=True, **kwargs)
if not resp.headers.get('content-length'):
raise exceptions.HttpFileDownloadedError('File already downloaded fully')
if resp.status != 200:
raise exceptions.HttpFileDownloadedError(f'Failed to download {url}: {resp.status}, body: {resp.body}')
with filepath.open('ab') as fd:
for chunk in resp.chunks(size):
fd.write(chunk)
return True
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), create_dirs=True, **kwargs):
if not Image:
raise ValueError('Pillow module is not installed')
filepath = Path(filepath)
path = filepath.parent
if not path.exists() and not create_dirs:
raise FileNotFoundError(f'Path does not exist: {path}')
path.mkdir()
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
raise exceptions.HttpFileDownloadedError(f'Failed to download {url}: {resp.status}, body: {resp.body}')
if not filename:
filename = Path(url).stem()
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
class HttpUrllibResponse(object):
def __init__(self, response):
self.body = response.read()
self.headers = DefaultDotDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status
self.url = response.url
@cached_property
def text(self):
return self.body.decode('UTF-8')
@cached_property
def json(self):
return DotDict(self.text)
def json_pretty(self, indent=4):
return json.dumps(self.json, indent=indent)

129
base/izzylib/logging.py Normal file
View file

@ -0,0 +1,129 @@
import sys
from os import getppid, environ as env
from datetime import datetime
from enum import Enum
class Levels(Enum):
CRITICAL = 60,
ERROR = 50
WARNING = 40
INFO = 30
VERBOSE = 20
DEBUG = 10
MERP = 0
class Log:
__slots__ = [
'name', 'level', 'date', 'format',
'critical', 'error', 'warning',
'info', 'verbose', 'debug', 'merp'
]
def __init__(self, name, **config):
self.name = name
self.level = Levels.INFO
self.date = True
self.format = '%Y-%m-%d %H:%M:%S'
self.update_config(**config)
for level in Levels:
self._set_log_function(level)
def _set_log_function(self, level):
setattr(self, level.name.lower(), lambda *args: self.log(level, *args))
def print(self, *args):
sys.stdout.write(' '.join([str(arg) for arg in args]))
sys.stdout.flush()
def parse_level(self, level):
try:
return Levels(int(level))
except ValueError:
return getattr(Levels, level.upper())
def update_config(self, **data):
for key, value in data.items():
self.set_config(key, value)
def set_config(self, key, value):
if key == 'level' and type(value) == str:
value = self.parse_level(value)
setattr(self, key, value)
def get_config(self, key):
return getattr(self, key)
def print_config(self):
self.print(*(f'{k}: {v}\n' for k,v in self.items()))
def log(self, level, *msg):
if level.value < self.level.value:
return
default = self.name == 'Default'
options = [
level.name + ':',
' '.join([str(message) for message in msg]),
'\n'
]
if self.date and not getppid() == 1:
options.insert(0, datetime.now().strftime(self.format))
if not default:
options.insert(0 if not self.date else 1, f'[{self.name}]')
self.print(*options)
def get_logger(name, **config):
try:
return logger[name.lower()]
except KeyError:
log = Log(name, **config)
logger[name.lower()] = log
return log
'''create a default logger'''
logger = {
'default': Log('Default')
'IzzyLib': Log('IzzyLib')
}
DefaultLog = logger['default']
'''aliases for default logger's log output functions'''
critical = DefaultLog.critical
error = DefaultLog.error
warning = DefaultLog.warning
info = DefaultLog.info
verbose = DefaultLog.verbose
debug = DefaultLog.debug
merp = DefaultLog.merp
'''aliases for the default logger's config functions'''
update_config = DefaultLog.update_config
set_config = DefaultLog.set_config
get_config = DefaultLog.get_config
print_config = DefaultLog.print_config
try:
logger['IzzyLib'].set_config('level', env['LOG_LEVEL'])
except KeyError:
'heck'

411
base/izzylib/misc.py Normal file
View file

@ -0,0 +1,411 @@
'''Miscellaneous functions'''
import hashlib, random, string, statistics, socket, time, timeit
from datetime import datetime
from getpass import getpass
from importlib import util
from pathlib import Path
from . import izzylog
__all__ = [
'ap_date',
'boolean',
'get_ip',
'hasher',
'import_from_path',
'nfs_check',
'port_check',
'print_methods',
'prompt',
'random_gen',
'time_function',
'time_function_pprint',
'timestamp',
'var_name'
]
def ap_date(date=None, alt=False):
'''
Takes a datetime object and returns it as an ActivityPub-friendly string
Arguments:
date (datetime): The datetime object to be converted. It not set, will create a new datetime object with the current date and time
alt (bool): If True, the returned string will be in the Mastodon API format
Return:
str: The date in an ActivityPub-friendly format
'''
if not date:
date = datetime.utcnow()
elif type(date) == int:
date = datetime.fromtimestamp(date)
elif type(date) != datetime:
raise TypeError(f'Unsupported object type for ApDate: {type(date)}')
return date.strftime('%a, %d %b %Y %H:%M:%S GMT' if alt else '%Y-%m-%dT%H:%M:%SZ')
def boolean(v, return_value=False):
'''
Convert a str, bool, int or None object into a boolean.
Arguments:
v (str, bool, int, None): The value to be checked
return_value (bool): If True, return v instead of True if it can't be converted
Return:
various: A boolean or the value itself
'''
if type(v) not in [str, bool, int, type(None)]:
raise ValueError(f'Value is not a string, boolean, int, or nonetype: {value}')
value = v.lower() if isinstance(v, str) else v
if value in [1, True, 'on', 'y', 'yes', 'true', 'enable']:
return True
if value in [0, False, None, 'off', 'n', 'no', 'false', 'disable', '']:
return False
if return_value:
return v
return True
def catch_kb_interrupt(function, *args, **kwargs):
'''
Run a function and catch the KeyboardInterrupt exception
Parameters:
function (function): The function to be ran
*args, **kwargs: The arguments and keyword arguments to pass to the function
Return:
None
'''
try:
function(*args, **kwargs)
except KeyboardInterrupt:
izzylog.verbose('Bye! UvU')
def get_ip():
'''
Get the IP address of the machine
Return:
str: An IP address
'''
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
data = s.getsockname()
ip = data[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def hasher(string, alg='blake2s'):
'''
Hash a string and return the digest in hex format as a string
Arguments:
string (str, bytes): A string or bytes object to be hashed
alg (str): The name of algorithm to use for hashing. Check hashlib.__always_supported for valid hash algs
Return:
str: The hashed string in hex format as a string
'''
if alg not in hashlib.__always_supported:
raise TypeError('Unsupported hash algorithm. Supported algs:', ', '.join(hashlib.__always_supported))
string = string.encode('UTF-8') if type(string) != bytes else string
newhash = hashlib.new(alg)
newhash.update(string)
return newhash.hexdigest()
def import_from_path(mod_path):
'''
Import a module from a directory
Arguments:
mod_path (str, Path): Py file or directory to import
Return:
module: A module object
'''
mod_path = Path(mod_path)
if mod_path.isdir():
path = mod_path.join('__init__.py')
name = path.name
else:
path = mod_path
name = path.name.replace('.py', '', -1)
spec = util.spec_from_file_location(name, path.str())
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def nfs_check(path):
'''
Check if a file or directory is on an NFS share. Only tested on Linux
Arguments:
path (str, Path): Path to a file or directory
Return:
bool: True if the path is on an nfs share. False if not
'''
if platform.system() == 'Windows':
izzylog.verbose('Refusing to check unix mounts on a non-unix system')
return
proc = Path('/proc/mounts')
path = Path(path).resolve()
if not proc.exists():
return True
with proc.open() as fd:
for line in fd:
line = line.split()
if line[2] == 'nfs' and line[1] in path.str():
return True
return False
def port_check(port, address='127.0.0.1', tcp=True):
'''
Checks if a TCP or UDP port is open or not
Arguments:
port (int): The port number to check
address (str): The address to connect to to check
tcp (bool): Use TCP if True, else use UDP
Return:
bool: True if the port is in use. False if it is not
'''
with socket.socket(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) as s:
try:
return not s.connect_ex((address, port)) == 0
except socket.error as e:
return False
def print_methods(object, include_underscore=False):
'''
Prints each method of an object on a new line
Arguments:
object (object): The object to work with
include_underscore (bool): If True, also include methods that start with '_'
Return:
None (The methods are printed to stdout)
'''
for line in dir(object):
if line.startswith('_') and include_underscore:
print(line)
print(line)
def prompt(prompt, default=None, valtype=str, options=[], password=False):
'''An upgraded `input`
Arguments:
prompt (str): The string to display to the user
default (various): The value that should be returned if there is no user input
valtype (str): The type the value should be returned as
options (list(str)): If set, these are the only values the user can select
password (bool): If set to True, the input will be treated like a password and not show the user's input on screen
Return:
various: The value typed by the user (and converted if necessary)
'''
input_func = getpass if password else input
if default != None:
prompt += ' [-redacted-]' if password else f' [{default}]'
prompt += '\n'
if options:
opt = '/'.join(options)
prompt += f'[{opt}]'
prompt += ': '
value = input_func(prompt)
while value and len(options) > 0 and value not in options:
input_func('Invalid value:', value)
value = input(prompt)
if not value or value == '':
return default
ret = valtype(value)
while valtype == Path and not ret.parent().exists():
input_func('Parent directory doesn\'t exist')
ret = Path(input(prompt))
return ret
def random_gen(length=20, letters=True, numbers=True, extra=None):
'''Return a randomly generated string
Arguments:
length (int): The length of the returned string
letters (bool): If True, include all upper and lowercase letters
numbers (bool): if True, include all numbers
extra (str): A string of any extra characters to include
Return:
str: A random string of characters
'''
if not isinstance(length, int):
raise TypeError(f'Character length must be an integer, not {type(length)}')
characters = ''
if letters:
characters += string.ascii_letters
if digits:
characters += string.digits
if extra:
characters += extra
return ''.join(random.choices(characters, k=length))
def time_function(func, *args, passes=1, use_gc=True, **kwargs):
'''Run a function and return the time it took
Arguments:
func (function): The command to be timed
args (list(various)): The arguments to be passed to the timed function
kwargs (dict(str:various)): The keyword arguments to be passed to the timed function
passes (int): How many times the timed function should be run
use_gc (bool): If True, keep garbage collection enabled
Return:
int: The time it took to run the function in miliseconds
'''
options = [
lambda: func(*args, **kwargs)
]
if use_gc:
options.append('gc.enable()')
timer = timeit.Timer(*options)
if passes > 1:
return timer.repeat(passes, 1)
return timer.timeit(1)
def time_function_pprint(func, *args, passes=5, use_gc=True, floatlen=3, **kwargs):
'''Run a function and print out the time it took for each pass, the average and total
Arguments:
func (function): The command to be timed
args (list(various)): The arguments to be passed to the timed function
kwargs (dict(str:various)): The keyword arguments to be passed to the timed function
passes (int): How many times the timed function should be run
use_gc (bool): If True, keep garbage collection enabled
floatlen (int): The amount of decimal places each result should have
Return:
None: The data gets printed to stdout
'''
parse_time = lambda num: f'{round(num, floatlen)}s'
times = []
for idx in range(0, passes):
passtime = time_function(func, *args, **kwargs, passes=1, use_gc=use_gc)
times.append(passtime)
print(f'Pass {idx+1}: {parse_time(passtime)}')
average = statistics.fmean(times)
print('-----------------')
print(f'Average: {parse_time(average)}')
print(f'Total: {parse_time(sum(times))}')
def timestamp(dtobj=None, utc=False):
'''
Turn a datetime object into a unix timestamp
Arguments:
dtobj (datetime): The datetime object to be converted
utc (bool): If True, use UTC instead of local time for new objects
Return:
int: The timestamp version of a datetime object
'''
dtime = dtobj if dtobj else datetime
date = dtime.utcnow() if utc else dtime.now()
return date.timestamp()
def var_name(single=True, **kwargs):
'''
Return a variable name as a string
Agruments:
kwargs (dict(str:variable)): Variables and their values
single (bool): If True, only return the first variable name
Return:
str or list(str): The variable name as a str (or names in a list if not Single)
'''
keys = list(kwargs.keys())
return key[0] if single else keys

170
base/izzylib/path.py Normal file
View file

@ -0,0 +1,170 @@
import pathlib
from . import DotDict, JsonEncoder
class Path(pathlib.Path):
def __init__(self, path, exist=True, missing=True, parents=True):
if str(path).startswith('~'):
path = pathlib.Path(path).expanduser()
super().__init__(path)
self.config = {
'missing': missing,
'parents': parents,
'exist': exist
}
def __check_dir(self, path=None):
target = self if not path else Path(path)
if not self.parents and not target.parent().exists():
raise FileNotFoundError('Parent directories do not exist:', target.str())
if not self.exist and target.exists():
raise FileExistsError('File or directory already exists:', target.str())
def __parse_perm_octal(self, mode):
return mode if type(mode) == oct else eval(f'0o{mode}')
def append(self, text):
return Path(str(self.__path) + text)
def backup(self, ext='backup', overwrite=False):
target = f'{self.parent}.{ext}'
self.copy(target, overwrite)
def chmod(self, mode=None):
octal = self.__parse_perm_octal(mode)
super().chmod(octal)
def copy(self, path, overwrite=False):
target = Path(path)
self.__check_dir(path)
if target.exists and overwrite:
target.delete
copyfile(self, target)
def join(self, path):
return Path(self.joinpath(path))
def json_load(self):
return DotDict(self.read())
def json_save(self, data, indent=None):
with self.open('w') as fp:
fp.write(json.dumps(data, indent=indent, cls=JsonEncoder))
def link(self, path):
target = Path(path)
self.__check_dir(path)
if target.exists():
target.delete()
self.symlink_to(path, target.isdir())
def mkdir(self, mode=0o755):
self.__path.mkdir(mode, self.parents, self.exist)
return True if self.__path.exists() else False
def move(self, path, overwrite=False):
self.copy(path, overwrite=overwrite)
self.delete()
def touch(self, mode=0o666):
octal = __parse_perm_octal(mode)
self.__path.touch(octal, self.exist)
return self.exists()
@property
def delete(self):
if self.isdir():
rmtree(self)
else:
self.unlink()
return not self.exists
@property
def exists(self):
return super().exists()
@property
def home(self):
return Path(pathlib.Path.home())
@property
def isdir(self):
return self.is_dir()
@property
def isfile(self):
return self.is_file()
@property
def islink(self):
return self.is_symlink()
@property
def listdir(self, recursive=True):
paths = self.iterdir() if recursive else os.listdir(self)
return [Path(path) for path in paths]
@property
def mtime(self):
return os.path.getmtime(self.str())
@property
def parent(self):
return Path(super().parent)
@property
def read(self):
return self.open().read()
@property
def readlines(self):
return self.open().readlines()
@property
def resolve(self):
return Path(super().resolve())
@property
def size(self):
return self.stat().st_size

36
base/setup.py Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env python3
from setuptools import setup
setup(
name="IzzyLib Base",
version='0.6.0',
packages=['izzylib'],
python_requires='>=3.7.0',
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='a collection of often-used functions and classes',
keywords='web http client',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View file

@ -0,0 +1,2 @@
from .hasher import PasswordHasher
from .server import HttpServer, HttpServerRequest, HttpServerResponse

View file

@ -0,0 +1,78 @@
import argon2, os
from izzylib import time_function_pprint
class PasswordHasher:
'''
Argon2 password hasher and validator
Attributes:
config (dict): The settings used for the hasher
Methods:
get_config(key): Get the value of a config options
set_config(key, value): Set a config option
hash(password): hash a password and return the digest as a hex string
verify(hash, password): verify a password and the password hash match
iteration_test(string, passes, iterations): Time the hashing functionality
'''
aliases = {
'iterations': 'time_cost',
'memory': 'memory_cost',
'threads': 'parallelism'
}
def __init__(self, iterations=16, memory=100, threads=os.cpu_count(), type=argon2.Type.ID):
if not argon2:
raise ValueError('password hashing disabled')
self.config = {
'time_cost': iterations,
'memory_cost': memory * 1024,
'parallelism': threads,
'encoding': 'utf-8',
'type': algtype,
}
self.hasher = argon2.PasswordHasher(**self.config)
def get_config(self, key):
key = self.aliases.get(key, key)
self[key]
return self.get(key) / 1024 if key == 'memory_cost' else self.get(key)
def set_config(self, key, value):
key = self.aliases.get(key, key)
self.config[key] = value * 1024 if key == 'memory_cost' else value
self.hasher = argon2.PasswordHasher(**self.config)
def hash(self, password: str):
return super().hash(password)
def verify(self, passhash: str, password: str):
try:
return self.hasher.verify(passhash, password)
except argon2.exceptions.VerifyMismatchError:
return False
def iteration_test(self, string='hecking heck', passes=3, iterations=[8,16,24,32,40,48,56,64]):
original_iter = self.get_config('iterations')
for iteration in iterations:
self.set_config('iterations', iteration)
print('\nTesting hash iterations:', iteration)
time_function_pprint(self.verify, self.hash(string), string, passes=passes)
self.set_config('iterations', original_iter)

View file

@ -1,3 +1,4 @@
# probably gonna remove this since I'll be using my asgi framework
import multiprocessing, sanic, signal, traceback
import logging as pylog
@ -7,7 +8,7 @@ from multiprocessing import cpu_count, current_process
from sanic.views import HTTPMethodView
from urllib.parse import parse_qsl, urlparse
from . import http, logging
from . import http, izzylog
from .misc import DotDict, DefaultDict, LowerDotDict
from .template import Template
@ -32,7 +33,7 @@ class HttpServer(sanic.Sanic):
self.workers = int(kwargs.get('workers', cpu_count()))
self.sig_handler = kwargs.get('sig_handler')
super().__init__(name, request_class=kwargs.get('request_class', HttpRequest))
super().__init__(name, request_class=kwargs.get('request_class', HttpServerRequest))
for log in ['sanic.root', 'sanic.access']:
pylog.getLogger(log).setLevel(pylog.ERROR)
@ -85,7 +86,7 @@ class HttpServer(sanic.Sanic):
if self.workers > 1:
msg += f' with {self.workers} workers'
logging.info(msg)
izzylog.info(msg)
self.run(**options)
@ -95,16 +96,16 @@ class HttpServer(sanic.Sanic):
print('stopping.....')
self.stop()
logging.info('Bye! :3')
izzylog.info('Bye! :3')
sys.exit()
class HttpRequest(sanic.request.Request):
class HttpServerRequest(sanic.request.Request):
def __init__(self, url_bytes, headers, version, method, transport, app):
super().__init__(url_bytes, headers, version, method, transport, app)
self.Headers = Headers(headers)
self.Data = Data(self)
self.Headers = HttpHeaders(headers)
self.Data = HttpData(self)
self.template = self.app.template
self.__setup_defaults()
self.__parse_path()
@ -178,7 +179,34 @@ class HttpRequest(sanic.request.Request):
return False
class Headers(LowerDotDict):
class HttpServerResponse:
Text = sanic.response.text
Html = sanic.response.html
Json = sanic.response.json
Redir = sanic.response.redirect
def Css(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'text/css')
return sanic.response.text(*args, headers=headers, **kwargs)
def Js(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/javascript')
return sanic.response.text(*args, headers=headers, **kwargs)
def Ap(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/activity+json')
return sanic.response.json(*args, headers=headers, **kwargs)
def Jrd(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/jrd+json')
return sanic.response.json(*args, headers=headers, **kwargs)
class HttpHeaders(LowerDotDict):
def __init__(self, headers):
super().__init__()
@ -202,7 +230,7 @@ class Headers(LowerDotDict):
return self.get(key.lower(), default)
class Data(object):
class HttpData(object):
def __init__(self, request):
self.request = request
@ -235,8 +263,8 @@ class Data(object):
try:
return self.request.body
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.raw: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
izzylog.verbose('IzzyLib.http_server.Data.raw: failed to get body')
izzylog.debug(f'{e.__class__.__name__}: {e}')
return b''
@ -245,8 +273,8 @@ class Data(object):
try:
return self.raw.decode()
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.text: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
izzylog.verbose('IzzyLib.http_server.Data.text: failed to get body')
izzylog.debug(f'{e.__class__.__name__}: {e}')
return ''
@ -255,8 +283,8 @@ class Data(object):
try:
return DotDict(self.text)
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.json: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
izzylog.verbose('IzzyLib.http_server.Data.json: failed to get body')
izzylog.debug(f'{e.__class__.__name__}: {e}')
data = '{}'
return {}
@ -268,7 +296,7 @@ async def MiddlewareAccessLog(request, response):
uagent = request.headers.get('user-agent')
address = request.headers.get('x-real-ip', request.forwarded.get('for', request.remote_addr))
logging.info(f'({multiprocessing.current_process().name}) {address} {request.method} {request.path} {response.status} "{uagent}"')
izzylog.info(f'({multiprocessing.current_process().name}) {address} {request.method} {request.path} {response.status} "{uagent}"')
def GenericError(request, exception):
@ -293,7 +321,7 @@ def GenericError(request, exception):
def NoTemplateError(request, exception):
logging.error('TEMPLATE_ERROR:', f'{exception.__class__.__name__}: {str(exception)}')
izzylog.error('TEMPLATE_ERROR:', f'{exception.__class__.__name__}: {str(exception)}')
return sanic.response.html('I\'m a dumbass and forgot to create a template for this page', 500)
@ -301,33 +329,3 @@ def ReplaceHeader(headers, key, value):
for k,v in headers.items():
if k.lower() == header.lower():
del headers[k]
class Response:
Text = sanic.response.text
Html = sanic.response.html
Json = sanic.response.json
Redir = sanic.response.redirect
def Css(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'text/css')
return sanic.response.text(*args, headers=headers, **kwargs)
def Js(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/javascript')
return sanic.response.text(*args, headers=headers, **kwargs)
def Ap(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/activity+json')
return sanic.response.json(*args, headers=headers, **kwargs)
def Jrd(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/jrd+json')
return sanic.response.json(*args, headers=headers, **kwargs)
Resp = Response

53
http_server/setup.py Normal file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from setuptools import setup, find_namespace_packages
requires = [
'argon2-cffi==20.1.0',
'colour==0.1.5',
'Hamlish-Jinja==0.3.3',
'Jinja2==2.11.2',
'Markdown==3.3.3',
'pillow==8.2.0',
'pycryptodome==3.9.9',
'requests==2.25.1',
'sanic==20.12.1',
'Sanic-Cors==0.10.0.post3'
]
setup(
name="IzzyLib HTTP Server",
version='0.6.0',
packages=['izzylib.http_server'],
python_requires='>=3.7.0',
install_requires=requires,
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='a collection of often-used functions and classes',
keywords='web http server templates argon2 jinja haml',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Internet :: WWW/HTTP :: HTTP Servers',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View file

@ -0,0 +1,393 @@
import json, requests, sys
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from PIL import Image
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property, lru_cache
from io import BytesIO
from izzylib import DefaultDotDict, DotDict, Path, izzylog, __version__
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
methods = ['connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace']
class RequestsClient(object):
def __init__(self, headers={}, useragent=f'IzzyLib/{__version__}', appagent=None, proxy_type='https', proxy_host=None, proxy_port=None):
proxy_ports = {
'http': 80,
'https': 443
}
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
self.headers=headers
self.agent = f'{useragent} ({appagent})' if appagent else useragent
self.proxy = DotDict({
'enabled': True if proxy_host else False,
'ptype': proxy_type,
'host': proxy_host,
'port': proxy_ports[proxy_type] if not proxy_port else proxy_port
})
self.SetGlobal = SetClient
def __sign_request(self, request, privkey, keyid):
if not crypto_enabled:
izzylog.error('Crypto functions disabled')
return
request.add_header('(request-target)', f'{request.method.lower()} {request.path}')
request.add_header('host', request.host)
request.add_header('date', datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT'))
if request.body:
body_hash = b64encode(SHA256.new(request.body).digest()).decode("UTF-8")
request.add_header('digest', f'SHA-256={body_hash}')
request.add_header('content-length', len(request.body))
sig = {
'keyId': keyid,
'algorithm': 'rsa-sha256',
'headers': ' '.join([k.lower() for k in request.headers.keys()]),
'signature': b64encode(PkcsHeaders(privkey, request.headers)).decode('UTF-8')
}
sig_items = [f'{k}="{v}"' for k,v in sig.items()]
sig_string = ','.join(sig_items)
request.add_header('signature', sig_string)
request.remove_header('(request-target)')
request.remove_header('host')
def request(self, *args, method='get', **kwargs):
if method.lower() not in methods:
raise ValueError(f'Invalid method: {method}')
request = RequestsRequest(self, *args, method=method.lower(), **kwargs)
return RequestsResponse(request.send())
def file(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
return resp.save(filepath)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
izzylog.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists():
izzylog.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def json(self, *args, headers={}, activity=True, **kwargs):
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
return self.request(*args, headers=headers, **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
request = RequestsRequest(self, *args, **kwargs)
self.__sign_request(request, privkey, keyid)
return RequestsResponse(request.send())
class RequestsRequest(object):
def __init__(self, client, url, data=None, headers={}, query={}, method='get'):
self.args = [url]
self.kwargs = {'params': query}
self.method = method.lower()
self.client = client
new_headers = client.headers.copy()
new_headers.update(headers)
parsed_headers = {k.lower(): v for k,v in new_headers.items()}
if not parsed_headers.get('user-agent'):
parsed_headers['user-agent'] = client.agent
self.kwargs['headers'] = new_headers
self.kwargs['data'] = data
if client.proxy.enabled:
self.kwargs['proxies'] = {self.proxy.ptype: f'{self.proxy.ptype}://{self.proxy.host}:{self.proxy.port}'}
def send(self):
func = getattr(requests, self.method)
return func(*self.args, **self.kwargs)
class RequestsResponse(object):
def __init__(self, response):
self.response = response
self.data = b''
self.headers = DefaultDict({k.lower(): v.lower() for k,v in response.headers.items()})
self.status = response.status_code
self.url = response.url
def chunks(self, size=256):
return self.response.iter_content(chunk_size=256)
@property
def body(self):
for chunk in self.chunks():
self.data += chunk
return self.data
@cached_property
def text(self):
return self.data.decode(self.response.encoding)
@cached_property
def json(self):
return DotDict(self.text)
@cached_property
def json_pretty(self, indent=4):
return json.dumps(self.json, indent=indent)
def save(self, path, overwrite=True):
path = Path(path)
parent = path.parent()
if not parent.exists():
raise ValueError(f'Path does not exist: {parent}')
if overwrite and path.exists():
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
def verify_request(request: SanicRequest, actor: dict):
'''Verify a header signature from a SimpleASGI request
request: The request with the headers to verify
actor: A dictionary containing the activitypub actor and the link to the pubkey used for verification
'''
body = request.body if request.body else None
return verify_headers(request.headers, request.method, request.path, body, actor)
def verify_headers(headers: dict, method: str, path: str, actor: dict=None, body=None):
'''Verify a header signature
headers: A dictionary containing all the headers from a request
method: The HTTP method of the request
path: The path of the HTTP request
actor (optional): A dictionary containing the activitypub actor and the link to the pubkey used for verification
body (optional): The body of the request. Only needed if the signature includes the digest header
fail (optional): If set to True, raise an error instead of returning False if any step of the process fails
'''
headers = {k.lower(): v for k,v in headers.items()}
headers['(request-target)'] = f'{method.lower()} {path}'
signature = parse_signature(headers.get('signature'))
digest = parse_body_digest(headers.get('digest'))
missing_headers = [k for k in headers if k in ['date', 'host'] if headers.get(k) == None]
if not signature:
raise ValueError('Missing signature')
if not actor:
actor = fetch_actor(signature.keyid)
## Add digest header to missing headers list if it doesn't exist
if method.lower() == 'post' and not digest:
missing_headers.append('digest')
## Fail if missing date, host or digest (if POST) headers
if missing_headers:
raise KeyError(f'Missing headers: {missing_headers}')
## Fail if body verification fails
if digest and not VerifyString(body, digest.sig, digest.alg):
raise ValueError('Failed body digest verification')
pubkey = actor.publicKey['publicKeyPem']
return PkcsHeaders(pubkey, {k:v for k,v in headers.items() if k in signature.headers}, sig=signature)
def parse_body_digest(digest):
if not digest:
raise ValueError('Empty digest')
parsed = DotDict()
alg, sig = digest.split('=', 1)
parsed.sig = sig
parsed.alg = alg.replace('-', '')
return parsed
def verify_string(string, enc_string, alg='SHA256', fail=False):
if type(string) != bytes:
string = string.encode('UTF-8')
body_hash = b64encode(SHA256.new(string).digest()).decode('UTF-8')
if body_hash == enc_string:
return True
if fail:
raise ValueError('String failed validation')
else:
return False
def sign_pkcs_headers(key: str, headers: dict, sig=None):
if sig:
head_items = [f'{item}: {headers[item]}' for item in sig.headers]
else:
head_items = [f'{k.lower()}: {v}' for k,v in headers.items()]
head_string = '\n'.join(head_items)
head_bytes = head_string.encode('UTF-8')
KEY = RSA.importKey(key)
pkcs = PKCS1_v1_5.new(KEY)
h = SHA256.new(head_bytes)
if sig:
return pkcs.verify(h, b64decode(sig.signature))
else:
return pkcs.sign(h)
def parse_signature(signature: str):
if not signature:
raise ValueError('Missing signature header')
split_sig = signature.split(',')
sig = DefaultDict({})
for part in split_sig:
key, value = part.split('=', 1)
sig[key.lower()] = value.replace('"', '')
sig.headers = sig.headers.split()
return sig
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "SetRequestsClient(client)"')
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
try:
actor = resp.json()
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
actor.web_domain = urlparse(url).netloc
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
return actor
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
izzylog.error('IzzyLib.http: Please set global client with "SetClient(client)"')
return {}
data = DefaultDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json().subject.replace('acct:', '').split('@')
for link in webfinger.json().links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
def set_requests_client(client=None):
global Client
Client = client or RequestsClient()
def generate_rsa_key():
privkey = RSA.generate(2048)
key = DotDict({'PRIVKEY': privkey, 'PUBKEY': privkey.publickey()})
key.update({'privkey': key.PRIVKEY.export_key().decode(), 'pubkey': key.PUBKEY.export_key().decode()})
return key

44
requests_client/setup.py Normal file
View file

@ -0,0 +1,44 @@
#!/usr/bin/env python3
from setuptools import setup, find_namespace_packages
requires = [
'pillow==8.2.0',
'pycryptodome==3.9.9',
'requests==2.25.1',
]
setup(
name="IzzyLib Requests Client",
version='0.6.0',
packages=['izzylib.http_requests_client'],
python_requires='>=3.7.0',
install_requires=requires,
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='A Requests client with support for http header signing and verifying',
keywords='web http client',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View file

@ -1,10 +0,0 @@
colour>=0.1.5
envbash>=1.2.0
Hamlish-Jinja==0.3.3
Jinja2>=2.10.1
jinja2-markdown>=0.0.3
Mastodon.py>=1.5.0
pycryptodome>=3.9.1
python-magic>=0.4.18
sanic>=19.12.2
watchdog>=0.8.3

124
setup.py Executable file → Normal file
View file

@ -1,40 +1,94 @@
#!/usr/bin/env python3
from setuptools import setup
import sys
from IzzyLib import __version__ as v
import subprocess, sys
version = '.'.join([str(i) for i in v])
submodules = [
'http_server',
'requests_client',
'sql',
'template',
'tinydb'
]
submodule_names = [
'IzzyLib-Base',
'IzzyLib-Database',
'IzzyLib-HTTP-Server',
'IzzyLib-Requests-Client',
'IzzyLib-SQL',
'IzzyLib-Templates',
'IzzyLib-TinyDB',
]
setup(
name="IzzyLib",
version=version,
packages=['IzzyLib'],
python_requires='>=3.6.0',
install_requires=[req.replace('\n', '') for req in open('requirements.txt').readlines()],
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='a collection of often-used functions and classes',
keywords='web http server database postgresql',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
def main(*args):
if not len(args):
print('Missing command')
return cmd_help()
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Information Technology',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Internet :: WWW/HTTP'
]
)
command = args[0]
if command not in ['install', 'uninstall', 'dev']:
print('Invalid command:', command)
return cmd_help()
if command == 'uninstall':
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y', *submodule_names])
return
submodule_args = []
if len(args) > 1:
if args[1] == 'all':
submodule_args = [f'./{module}' for module in modinstall]
else:
submodule_args = [f'./{module}' for module in args[1:]]
for mod in submodules:
if mod not in submodules:
print('Invalid submodule:', mod)
return cmd_help()
options = ['-m', 'pip', 'install', './base', *submodule_args]
if command == 'dev':
options.insert(3, '--use-feature=in-tree-build')
print(options)
subprocess.run([sys.executable, *options])
def cmd_help(*args):
text = f'''IzzyLib manager
Commands:
install [submodules: space-separated list]:
Install IzzyLib as well as any sub-modules if any are required
uninstall [submodules]
Uninstall IzzyLib and any installed submodules
Valid Submodules:
{', '.join(submodules)}
'''
print(text)
if __name__ == '__main__':
try:
args = sys.argv[1:]
except IndexError:
args = []
try:
if not args:
cmd_help()
else:
main(*args)
except KeyboardInterrupt:
print('Bye! UvU')

View file

@ -0,0 +1,2 @@
from .generic import Column, CustomRows, Session, SqlDatabase, Tables
from .sqlite_server import SqliteClient, SqliteColumn, SqliteServer, SqliteSession

View file

@ -7,9 +7,14 @@ from sqlalchemy import Column as SqlColumn, types as Types
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.orm import scoped_session, sessionmaker
from .. import logging
from ..cache import LRUCache
from ..misc import DotDict, RandomGen, NfsCheck, PrintMethods, Path
from izzylib import (
LruCache,
DotDict,
Path,
random_gen,
nfs_check,
izzylog
)
SqlTypes = DotDict({t.lower(): getattr(Types, t) for t in dir(Types) if not t.startswith('_')})
@ -36,13 +41,13 @@ class SqlDatabase:
engine_kwargs = {}
if not kwargs.get('database'):
raise MissingDatabaseError('Database not set')
raise KeyError('Database argument is not set')
engine_string = dbtype + '://'
if dbtype == 'sqlite':
if NfsCheck(kwargs.get('database')):
logging.error('Database file is on an NFS share which does not support locking. Any writes to the database will fail')
if nfs_check(kwargs.get('database')):
izzylog.error('Database file is on an NFS share which does not support locking. Any writes to the database will fail')
engine_string += '/' + str(kwargs.get('database'))
engine_kwargs['connect_args'] = {'check_same_thread': False}
@ -80,7 +85,7 @@ class SqlDatabase:
def SetupCache(self):
self.cache = DotDict({table: LRUCache() for table in self.table_names})
self.cache = DotDict({table: LruCache() for table in self.table_names})
def CreateTables(self, *tables):
@ -156,7 +161,7 @@ class Session(object):
def open(self):
self.sessionid = RandomGen(10)
self.sessionid = random_gen(10)
self.db.sessions[self.sessionid] = self
@ -271,13 +276,13 @@ class Session(object):
column = getattr(table.c, col)
except AttributeError:
logging.error(f'Table "{tbl}" does not have column "{col}"')
izzylog.error(f'Table "{tbl}" does not have column "{col}"')
return
columns = [row[1] for row in self.execute(f'PRAGMA table_info({tbl})')]
if col in columns:
logging.info(f'Column "{col}" already exists')
izzylog.info(f'Column "{col}" already exists')
return
sql = f'ALTER TABLE {tbl} ADD COLUMN {col} {column.type}'
@ -300,7 +305,7 @@ class Session(object):
columns = [row[1] for row in self.execute(f'PRAGMA table_info({tbl})')]
if col not in columns:
logging.info(f'Column "{col}" already exists')
izzylog.info(f'Column "{col}" already exists')
return
columns.remove(col)
@ -430,7 +435,3 @@ def Column(name, stype=None, fkey=None, **kwargs):
options.append(ForeignKey(fkey))
return SqlColumn(*options, **kwargs)
class MissingDatabaseError(Exception):
'''raise when the "database" kwarg is not set'''

View file

@ -1,8 +1,6 @@
import asyncio, json, socket, sqlite3, ssl, time, traceback
from . import SqlDatabase
from .sql import CustomRows
from .. import logging, misc
from izzylib import CustomRows, DotDict, Path, JsonEncoder, SqlDatabase, izzylog
commands = [
@ -119,7 +117,7 @@ class SqliteSession(socket.socket):
login = self.send('login', self.data.password)
if not login.get('message') == 'OK':
logging.error('Server error:', login.error)
izzylog.error('Server error:', login.error)
return
self.connected = True
@ -143,7 +141,7 @@ class SqliteSession(socket.socket):
pass
def Column(*args, **kwargs):
def SqliteColumn(*args, **kwargs):
return {'args': list(args), 'kwargs': dict(kwargs)}
@ -160,9 +158,9 @@ class SqliteServer(misc.DotDict):
self.metadata_layout = {
'databases': [
Column('id'),
Column('name', 'text', nullable=False),
Column('layout', 'text', nullable=False)
SqliteColumn('id'),
SqliteColumn('name', 'text', nullable=False),
SqliteColumn('layout', 'text', nullable=False)
]
}
@ -190,7 +188,7 @@ class SqliteServer(misc.DotDict):
row = s.fetch('databases', name=database)
if not row:
logging.error('Database not found:', database)
izzylog.error('Database not found:', database)
return
db.SetupTables(row.layout)
@ -226,11 +224,11 @@ class SqliteServer(misc.DotDict):
loop.run_until_complete(self.asyncio_run())
try:
logging.info('Starting Sqlite Server')
izzylog.info('Starting Sqlite Server')
loop.run_forever()
except KeyboardInterrupt:
print()
logging.info('Closing...')
izzylog.info('Closing...')
return
@ -238,9 +236,9 @@ class SqliteServer(misc.DotDict):
meta = self.open('metadata')
tables = {
'databases': [
Column('id'),
Column('name', 'text', nullable=False),
Column('layout', 'text', nullable=False)
SqliteColumn('id'),
SqliteColumn('name', 'text', nullable=False),
SqliteColumn('layout', 'text', nullable=False)
]
}
@ -300,7 +298,7 @@ class SqliteServer(misc.DotDict):
writer.write(json.dumps(response or {'message': 'OK'}, cls=misc.JsonEncoder).encode('utf8'))
await writer.drain()
logging.info(f'{writer.get_extra_info("peername")[0]}: [{database}] {data.command} {data.args} {data.kwargs}')
izzylog.info(f'{writer.get_extra_info("peername")[0]}: [{database}] {data.command} {data.args} {data.kwargs}')
if data.command == 'delete':
writer.close()

47
sql/setup.py Normal file
View file

@ -0,0 +1,47 @@
#!/usr/bin/env python3
from setuptools import setup
requires = [
'SQLAlchemy==1.3.22',
'SQLAlchemy-Paginator==0.2',
'tinydb==4.4.0',
'tinydb-serialization==2.1.0',
'tinydb-smartcache==2.0.0',
'tinyrecord==0.2.0'
]
setup(
name="IzzyLib SQL",
version='0.6.0',
packages=['izzylib.sql'],
python_requires='>=3.7.0',
install_requires=requires,
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='a collection of often-used functions and classes',
keywords='database postgresql sqlite tinydb',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Database',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View file

@ -1,11 +1,11 @@
'''functions for web template management and rendering'''
import codecs, traceback, os, json, xml
from colour import Color as Colour
from hamlish_jinja import HamlishExtension
from izzylib import izzylog, DotDict, Path
from jinja2 import Environment, FileSystemLoader, ChoiceLoader, select_autoescape, Markup
from os import listdir, makedirs
from os.path import isfile, isdir, getmtime, abspath
from jinja2 import Environment, FileSystemLoader, ChoiceLoader, select_autoescape, Markup
from hamlish_jinja import HamlishExtension
from xml.dom import minidom
try:
@ -14,10 +14,6 @@ try:
except ModuleNotFoundError:
Response = None
from . import logging
from .color import *
from .misc import Path, DotDict
class Template(Environment):
def __init__(self, search=[], global_vars={}, context=None, autoescape=True):
@ -43,11 +39,7 @@ class Template(Environment):
self.globals.update({
'markup': Markup,
'cleanhtml': lambda text: ''.join(xml.etree.ElementTree.fromstring(text).itertext()),
'lighten': lighten,
'darken': darken,
'saturate': saturate,
'desaturate': desaturate,
'rgba': rgba
'color': Color
})
self.globals.update(global_vars)
@ -64,11 +56,11 @@ class Template(Environment):
def setContext(self, context):
if not hasattr(context, '__call__'):
logging.error('Context is not callable')
izzylog.error('Context is not callable')
return
if not isinstance(context({}, {}), dict):
logging.error('Context does not return a dict or dict-like object')
izzylog.error('Context does not return a dict or dict-like object')
return
self.func_context = context
@ -135,3 +127,47 @@ class Template(Environment):
html = self.render(tpl, request=request, **kwargs)
return Response.HTTPResponse(body=html, status=status, content_type=ctype, headers=kwargs.get('headers', {}))
class Color(Colour):
def __init__(self, color):
super.__init__(f'#{str(raw_color)}' if raw_color.startswith('#') else raw_color)
self.lighten = lambda color, multi: self.alter('lighten', multi)
self.darken = lambda color, multi: self.alter('darken', multi)
self.saturate = lambda color, multi: self.alter('saturate', multi)
self.desaturate = lambda color, multi: self.alter('desaturate', multi)
self.rgba = lambda color, multi: self.alter('rgba', multi)
def multi(multiplier):
if multiplier >= 1:
return 1
elif multiplier <= 0:
return 0
return multiplier
def alter(action, multiplier):
if action == 'lighten':
self.luminance += ((1 - color.luminance) * self.multi(multiplier))
elif action == 'darken':
self.luminance -= (color.luminance * self.multi(multiplier))
elif action == 'saturate':
self.saturation += ((1 - color.saturation) * self.multi(multiplier))
elif action == 'desaturate':
self.saturation -= (color.saturation * self.multi(multiplier))
elif action == 'rgba':
red = self.red*255
green = self.green*255
blue = self.blue*255
trans = self.multi(multiplier)
return f'rgba({red:0.2f}, {green:0.2f}, {blue:0.2f}, {trans:0.2f})'
return self.hex_l

46
template/setup.py Normal file
View file

@ -0,0 +1,46 @@
#!/usr/bin/env python3
from setuptools import setup
requires = [
'colour==0.1.5',
'Hamlish-Jinja==0.3.3',
'Jinja2==2.11.2',
'Markdown==3.3.3',
]
setup(
name="IzzyLib Templates",
version='0.6.0',
packages=['izzylib.template'],
python_requires='>=3.7.0',
install_requires=requires,
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='A template engine based on Jinja2 and HAMLish-Jinja',
keywords='web http templates jinja haml',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)

View file

@ -0,0 +1 @@
from .tiny import TinyDatabase, TinyRow, TinyRows

View file

@ -1,3 +1,4 @@
# incomplete
import base64
import json
import operator
@ -8,7 +9,8 @@ import time
import tinydb
import tinydb_serialization
from .. import misc
from concurrent.futures import ThreadPoolExecutor, as_completed
from izzylib import Path, random_gen
class AwaitingResult(object):
@ -16,10 +18,9 @@ class AwaitingResult(object):
class TinyDatabase(tinydb.TinyDB):
def __init__(self, dbfile: misc.Path, queue_limit: int=64, serializers: list=[]):
def __init__(self, dbfile: Path, queue_limit: int=64, serializers: list=[]):
options = {
'indent': 2,
'separators': (',', ': '),
'indent': 2
}
serialization = tinydb_serialization.SerializationMiddleware(ThreadSupport)
@ -89,7 +90,7 @@ def TinyRows(db, rows):
return [TinyRow(db, row) for row in rows]
class TinyRow(misc.DotDict):
class TinyRow(DotDict):
def __init__(self, db, row):
super().__init({'id': row.doc_id})
super().update({k: v for k,v in row.items()})
@ -108,15 +109,7 @@ class ThreadSupport(tinydb.storages.JSONStorage):
def __init__(self, filename, *args, **kwargs):
super().__init__(filename, *args, **kwargs)
self._thread_event = threading.Event()
self._shutdown = False
self._results = {}
self._queue = queue.Queue()
self._lock = threading.Lock()
self._thread = threading.Thread(target=self.process_queue)
self._thread.daemon = True
self._thread.start()
self.pool = ThreadPoolExecutor(max_workers=8)
## send all storage commands to the queue
def read(self):
@ -125,80 +118,12 @@ class ThreadSupport(tinydb.storages.JSONStorage):
def write(self, data):
self.queue_put('write', data)
for future in as_completed(self.pool.submit(super().write, data)):
self.queue_put('write', data)
def close(self):
self.queue_put('close')
def get_action(self, action):
return getattr(super(), action)
def get_result(self, qid):
with self._lock:
return self._results[qid]
def set_result(self, qid, data=AwaitingResult):
with self._lock:
self._results[qid] = data
def pop_result(self, qid):
with self._lock:
return self._result.pop(qid)
## queue
def process_queue(self):
while not self._thread_event.is_set():
if not self._queue.empty():
qid, action, args, kwargs = self._queue.get(block=False)
if qid not in self._results:
self.set_result(qid)
if action == 'close':
self._shutdown = True
func = self.get_action(action)
if action == 'read':
self.set_result(qid, func(*args, **kwargs))
else:
time.sleep(0.1)
if self._shutdown:
self.get_action('close')()
return
def queue_put(self, func, *args, **kwargs):
if self._shutdown:
logging.error('Storage has been closed. Refusing to send more commands')
return
qid = misc.RandomGen()
self._queue.put((qid, func, args, kwargs))
if func != 'read':
return
sleep_time = 0.0
while self.get_result(qid) == AwaitingResult:
time.sleep(0.1)
sleep_time += 0.1
if sleep_time >= 5.0:
raise TimeoutError(f'Timeout on "{func}" with args: {args}, {kwargs}')
result = self.pop_result(qid)
print(result)
return result
self.pool.shutdown(wait=False)
class ByteSerializer(tinydb_serialization.Serializer):
@ -226,7 +151,7 @@ class DictSerializer(tinydb_serialization.Serializer):
class DotDictSerialize(tinydb_serialization.Serializer):
OBJ_CLASS = misc.DotDict
OBJ_CLASS = DotDict
def encode(self, obj):
#print('encode', self.__class__.__name__, obj)
@ -234,11 +159,11 @@ class DotDictSerialize(tinydb_serialization.Serializer):
def decode(self, obj):
#print('decode', self.__class__.__name__, obj)
return misc.DotDict(obj)
return DotDict(obj)
class PathSerialize(tinydb_serialization.Serializer):
OBJ_CLASS = misc.Path
OBJ_CLASS = Path
def encode(self, obj):
#print('encode', self.__class__.__name__, obj)
@ -246,4 +171,4 @@ class PathSerialize(tinydb_serialization.Serializer):
def decode(self, obj):
#print('decode', self.__class__.__name__, obj)
return misc.Path(obj)
return Path(obj)

45
tinydb/setup.py Normal file
View file

@ -0,0 +1,45 @@
#!/usr/bin/env python3
from setuptools import setup, find_namespace_packages
requires = [
'tinydb==4.4.0',
'tinydb-serialization==2.1.0',
'tinydb-smartcache==2.0.0',
'tinyrecord==0.2.0'
]
setup(
name="IzzyLib TinyDB",
version='0.6.0',
packages=['izzylib.tinydb'],
python_requires='>=3.7.0',
install_requires=requires,
include_package_data=False,
author='Zoey Mae',
author_email='admin@barkshark.xyz',
description='a collection of often-used functions and classes',
keywords='database postgresql sqlite tinydb',
url='https://git.barkshark.xyz/izaliamae/izzylib',
project_urls={
'Bug Tracker': 'https://git.barkshark.xyz/izaliamae/izzylib/issues',
'Documentation': 'https://git.barkshark.xyz/izaliamae/izzylib/wiki',
'Source Code': 'https://git.barkshark.xyz/izaliamae/izzylib'
},
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Information Technology',
'License :: Co-operative Non-violent Public License (CNPL 6+)',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Operating System :: POSIX',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
'Topic :: Database',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules'
]
)