From 820a5b4772fab8696cde35990eb7e251ab7aadf1 Mon Sep 17 00:00:00 2001 From: Izalia Mae Date: Sun, 2 May 2021 14:15:11 -0400 Subject: [PATCH] add incomplete json-based database classes --- IzzyLib/database_pysondb.py | 353 ++++++++++++++++++++++++++++++++++++ IzzyLib/database_tiny.py | 249 +++++++++++++++++++++++++ 2 files changed, 602 insertions(+) create mode 100644 IzzyLib/database_pysondb.py create mode 100644 IzzyLib/database_tiny.py diff --git a/IzzyLib/database_pysondb.py b/IzzyLib/database_pysondb.py new file mode 100644 index 0000000..c601edb --- /dev/null +++ b/IzzyLib/database_pysondb.py @@ -0,0 +1,353 @@ +import asyncio +import base64 +import datetime +import filelock +import json +import multiprocessing +import operator +import pysondb +import queue +import random + +from pysondb.db import JsonDatabase, IdNotFoundError + +from . import misc + + +class Database(multiprocessing.Process): + def __init__(self, dbpath: misc.Path, tables: dict=None): + multiprocessing.Process.__init__(self, daemon=True) + + self.dbpath = dbpath + self.tables = tables + self.shutdown = False + self.port = self._setup_port() + self.token = misc.RandomGen() + print(self.port) + + self.fetch = lambda *args, **kwargs: self.send_message('fetch', *args, **kwargs) + self.search = lambda *args, **kwargs: self.send_message('search', *args, **kwargs) + self.insert = lambda *args, **kwargs: self.send_message('insert', *args, **kwargs) + self.remove = lambda *args, **kwargs: self.send_message('remove', *args, **kwargs) + + self.start() + + + def run(self): + self.db = DatabaseProcess(self.dbpath, self.tables) + print(self.port) + server = asyncio.create_server(process_queue, '127.0.0.1', self.port) + + loop = asyncio.new_event_loop() + loop.run_until_complete(server) + loop.run_forever() + + + def close(self): + self.terminate() + self.join(timeout=5) + + + def _setup_port(self): + port = None + + while True: + port = random.randint(8096, 16394) + + if misc.PortCheck(port) == True: + return port + + + def get_action(self, action): + return getattr(self.db, action) + + + def send_message(self, action, table, *args, **kwargs): + data = { + 'token': self.token, + 'action': action, + 'table': table, + 'args': args, + 'kwargs': kwargs + } + + with self.socket as s: + s.send(json.dumps(data)) + return s.recieve(16 * 1024 * 1024) + + + @property + def socket(self): + return misc.Connection(port=self.port) + + + async def process_queue(self, reader, writer): + data = misc.DotDict(await reader.read(16 * 1024 * 1024)) + + if data.token != self.token: + return + + if data.action == 'close': + self._shutdown = True + + new_data = self.get_action(data.action)(data.table, *args, **kwargs) + + if isinstance(new_data, dict): + writer.write(json.dumps(new_data)) + await writer.drain() + + writer.close() + + async def pipe_listener(self): + pass + + +class DatabaseProcess(misc.DotDict): + def __init__(self, dbpath: misc.Path, tables: dict=None): + dbpath = misc.Path(dbpath) + super().__init__() + + self.path = misc.Path(dbpath).resolve() + self.metadata = misc.DotDict(**{ + 'path': self.path.join('metadata.json'), + 'lock': self.path.join('metadata.json.lock'), + 'version': 0 + }) + + self._closed = False + self.__setup_database(tables) + + + def __setup_database(self, tables): + self.path.mkdir() + self.load_meta() + + for name, columns in tables.items(): + self[name] = columns if type(columns) == Table else Table(name, columns) + + if not self[name].db: + self[name].setup(self) + + + def load_meta(self): + if self.metadata.path.exists(): + with filelock.FileLock(self.metadata.lock.str()): + data = self.metadata.path.load_json() + self.metadata.update(data) + + + def save_meta(self): + with filelock.FileLock(self.metadata.lock.str()): + data = self.metadata.copy() + data.pop('path') + data.pop('lock') + + self.metadata.path.update_json(data) + self.metadata.path.save_json() + + + def close(self): + self.save_meta() + self._closed = True + + + def fetch(self, table, *args, **kwargs): + return self[table].fetch(*args, **kwargs) + + + def search(self, table, *args, **kwargs): + return self[table].search(*args, **kwargs) + + + def insert(self, table, *args, **kwargs): + return self[table].insert(*args, **kwargs) + + + def remove(self, table, *args, **kwargs): + return self[table].remove(*args, **kwargs) + + + def migrate(self, table=None): + tables = [self[table]] if table else self.table + + for name, table in tables: + for row in table.search(): + table.update(row.id, ) + + +class Table(JsonDatabase): + def __init__(self, name: str, columns: dict={}): + self.db = None + self.name = name + self.columns = {} + self.add_column('id') + + for name, col in columns.items(): + if name != 'id': + self.add_column(name, *col) + + + def setup(self, db): + self.db = db + tablefile = db.path.join(f'table_{self.name}.json') + + if not tablefile.exists(): + tablefile.touch(mode=0o644) + with tablefile.open('w') as fd: + fd.write('{"data": []}') + + super().__init__(tablefile.str()) + + + def add_column(self, name: str, type: str='str', default: bool=None, nullable: bool=True, primary_key: bool=False): + if name == 'id': + type = 'int' + nullable = False + primary_key = True + + self.columns[name] = misc.DotDict({ + 'default': default, + 'type': type, + 'primary_key': primary_key, + 'nullable': nullable + }) + + + def fetch(self, single=True, orderby=None, reverse=False, **kwargs): + if self.db._closed: + return logging.error('Database closed') + + if not kwargs: + rows = DBRows(self, self.getAll()) + single = False + + else: + rows = DBRows(self, self.getBy(kwargs)) + + if single: + return rows[0] if rows else None + + return rows if not orderby else sorted(rows, key=operator.itemgetter(orderby), reverse=reverse) + + + def search(self, orderby=None, reverse=None, **kwargs): + self.fetch(single=False, orderby=orderby, reverse=reverse, **kwargs) + + + def insert(self, row=None, rowid=None, **kwargs): + if self.db._closed: + return logging.error('Database closed') + + new_data = {} + + for name, col in self.columns.items(): + raw_value = kwargs.get(name, col.default) + value = serialize(raw_value, col.type) + + if not value and not col.nullable: + raise ValueError(f'Column "{name}" cannot be empty') + + new_data[name] = value + + if row: + rowid = row.id + + if rowid: + return self.update({'id': rowid}, new_data) + + return self.add(new_data) + + + def delete(self, rowid): + with self.lock: + with open(self.filename, "r+") as db_file: + db_data = self._get_load_function()(db_file) + result = [] + found = False + + for d in db_data["data"]: + print(d) + if rowid in d: + found = True + + else: + result.append(d) + + if not found: + raise IdNotFoundError(kwargs) + + db_data["data"] = result + db_file.seek(0) + db_file.truncate() + self._get_dump_function()(db_data, db_file) + + return True + + + def remove(self, row=None, rowid=None, **kwargs): + if self.db._closed: + return logging.error('Database closed') + + if row or rowid: + return self.remove({'id': rowid or row.id}) + + return self.delete(kwargs) + + + def _get_dump_function(self): + return lambda *args, **kwargs: json.dump(*args, indent=2, **kwargs) + + +def serialize(data, dtype): + types = { + 'datetime': lambda arg: arg.timestamp(), + 'dotdict': lambda arg: arg.toDict(), + 'bytes': lambda arg: base64.b64encode(arg).decode('ascii'), + 'bool': misc.Boolean, + 'int': int, + 'path': lambda arg: arg.str() + } + + if data != None: + serial_type = types.get(dtype) + return serial_type(data) if serial_type else data + + return data + + +def deserialize(data, dtype): + types = { + 'datetime': datetime.datetime.fromtimestamp, + 'dotdict': misc.DotDict, + 'dict': misc.DotDict, + 'bytes': lambda: base64.b64decode, + 'path': misc.Path + } + + return types.get(dtype)(data) if data else None + + + + +def DBRows(table, rows): + return [DBRow(table, row) for row in rows] + + +class DBRow(misc.DotDict): + def __init__(self, table, row): + super().__init(**{k: deserialize(row[v], v.type) for k,v in table.items()}) + self.table = table + + + def __str__(self): + data = ', '.join(f'{k}={v}' for k,v in self.items()) + return + + + def update(self, data={}): + super().update(data) + self.table.update(rowid=self.id, **self) + + + def remove(self): + self.table.remove(rowid=self.id) diff --git a/IzzyLib/database_tiny.py b/IzzyLib/database_tiny.py new file mode 100644 index 0000000..258b465 --- /dev/null +++ b/IzzyLib/database_tiny.py @@ -0,0 +1,249 @@ +import base64 +import json +import operator +import queue +import threading +import time + +import tinydb +import tinydb_serialization + +from . import misc + + +class AwaitingResult(object): + pass + + +class DataBase(tinydb.TinyDB): + def __init__(self, dbfile: misc.Path, queue_limit: int=64, serializers: list=[]): + options = { + 'indent': 2, + 'separators': (',', ': '), + } + + serialization = tinydb_serialization.SerializationMiddleware(ThreadSupport) + serialization.register_serializer(tinydb_serialization.serializers.DateTimeSerializer(), 'TinyDate') + serialization.register_serializer(ByteSerializer(), 'TinyBytes') + serialization.register_serializer(PathSerialize(), 'IzzyLibPath') + serialization.register_serializer(DotDictSerialize(), 'IzzyLibDotDict') + + for serializer in serializers: + serialization.register_serializer(serializer(), serializer.__class__.__name__) + + options['storage'] = serialization + + super().__init__(dbfile, **options) + + self.dbfile = dbfile + self.queue = queue.Queue(maxsize=queue_limit) + self.sessions = {} + + + @property + def session(self): + return TinySession(self) + + + def fetch(self, table, single=True, orderby=None, reverse=False, **kwargs): + query = tinydb.Query() + + if not kwargs: + rows = TinyRows(self, self.table(table).all()) + single = False + + else: + rows = TinyRows(self, self.table(table).search(query.fragment(kwargs))) + + if single: + return rows[0] if rows else None + + return rows if not orderby else sorted(rows, key=operator.itemgetter(orderby), reverse=reverse) + + + def insert(self, table, row=None, rowid=None, **kwargs): + if row: + rowid = row.doc_id + + elif not rowid: + row = self.fetch(table, **kwargs) + rowid = row.doc_id if row else None + + if rowid: + return self.table(table).update(kwargs, doc_ids=[rowid]) + + return self.table(table).insert(kwargs) + + + def remove(self, table, row=None, rowid=None, **kwargs): + query = tinydb.Query() + + if row or rowid: + rowid = rowid or row.doc_id + return self.table(table).remove(doc_ids=[row.doc_id]) + + return self.table(table).remove(query.fragment(kwargs)) + + +def TinyRows(db, rows): + return [TinyRow(db, row) for row in rows] + + +class TinyRow(misc.DotDict): + def __init__(self, db, row): + super().__init({'id': row.doc_id}) + super().update({k: v for k,v in row.items()}) + + + def update(self, data={}): + db.update(rowid=self.id, **data) + super().update(data) + + + def remove(self): + db.remove(rowid=self.id) + + +class ThreadSupport(tinydb.storages.JSONStorage): + def __init__(self, filename, *args, **kwargs): + super().__init__(filename, *args, **kwargs) + + self._thread_event = threading.Event() + self._shutdown = False + self._results = {} + self._queue = queue.Queue() + + self._lock = threading.Lock() + self._thread = threading.Thread(target=self.process_queue) + self._thread.daemon = True + self._thread.start() + + ## send all storage commands to the queue + def read(self): + data = self.queue_put('read') + return data + + + def write(self, data): + self.queue_put('write', data) + + + def close(self): + self.queue_put('close') + + + def get_action(self, action): + return getattr(super(), action) + + + def get_result(self, qid): + with self._lock: + return self._results[qid] + + + def set_result(self, qid, data=AwaitingResult): + with self._lock: + self._results[qid] = data + + + def pop_result(self, qid): + with self._lock: + return self._result.pop(qid) + + + ## queue + def process_queue(self): + while not self._thread_event.is_set(): + if not self._queue.empty(): + qid, action, args, kwargs = self._queue.get(block=False) + + if qid not in self._results: + self.set_result(qid) + if action == 'close': + self._shutdown = True + + func = self.get_action(action) + + if action == 'read': + self.set_result(qid, func(*args, **kwargs)) + + else: + time.sleep(0.1) + + if self._shutdown: + self.get_action('close')() + return + + + def queue_put(self, func, *args, **kwargs): + if self._shutdown: + logging.error('Storage has been closed. Refusing to send more commands') + return + + qid = misc.RandomGen() + + self._queue.put((qid, func, args, kwargs)) + + if func != 'read': + return + + sleep_time = 0.0 + + while self.get_result(qid) == AwaitingResult: + time.sleep(0.1) + sleep_time += 0.1 + + if sleep_time >= 5.0: + raise TimeoutError(f'Timeout on "{func}" with args: {args}, {kwargs}') + + result = self.pop_result(qid) + print(result) + return result + + +class ByteSerializer(tinydb_serialization.Serializer): + OBJ_CLASS = bytes + + def encode(self, obj): + #print('encode', self.__class__.__name__, obj) + return base64.b64encode(obj).decode('ascii') + + def decode(self, obj): + #print('decode', self.__class__.__name__, obj) + return base64.b64decode(obj) + + +class DictSerializer(tinydb_serialization.Serializer): + OBJ_CLASS = dict + + def encode(self, obj): + #print('encode', self.__class__.__name__, obj) + return json.dumps(obj) + + def decode(self, obj): + #print('decode', self.__class__.__name__, obj) + return json.loads(obj) + + +class DotDictSerialize(tinydb_serialization.Serializer): + OBJ_CLASS = misc.DotDict + + def encode(self, obj): + #print('encode', self.__class__.__name__, obj) + return obj.toJson() + + def decode(self, obj): + #print('decode', self.__class__.__name__, obj) + return misc.DotDict(obj) + + +class PathSerialize(tinydb_serialization.Serializer): + OBJ_CLASS = misc.Path + + def encode(self, obj): + #print('encode', self.__class__.__name__, obj) + return obj.str() + + def decode(self, obj): + #print('decode', self.__class__.__name__, obj) + return misc.Path(obj)