This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/database/base.py
2022-04-16 06:23:04 -04:00

532 lines
13 KiB
Python

import json
from datetime import datetime, timedelta
from hashlib import sha256
from izzylib_sql import Column, Session
from ..config import files, var
tables = {
'config': [
Column('id'),
Column('key', 'text', nullable=False),
Column('value', 'text'),
Column('type', 'text', default='str')
],
'bookmarks': [
Column('id'),
Column('name', 'text', nullable=False),
Column('url', 'text', nullable=False, unique=True),
Column('description', 'text'),
Column('category', 'text', default='Default'),
Column('lastupdate', 'datetime')
],
'links': [
Column('id'),
Column('bookmarkid', 'int', foreign_key='bookmarks.id')
],
'siteoptions': [
Column('id'),
Column('domain', 'text', nullable=False, unique=True),
Column('instance', 'boolean', default=None),
Column('notification', 'boolean', default=False),
Column('microphone', 'boolean', default=False),
Column('camera', 'boolean', default=False),
Column('location', 'boolean', default=False),
Column('fullscreen', 'boolean', default=True),
Column('javascript', 'boolean', default=True),
Column('images', 'boolean', default=True),
Column('adblock', 'boolean', default=True),
Column('dialog', 'boolean', default=True),
Column('autoplay', 'boolean', default=False)
],
'search': [
Column('id'),
Column('name', 'text', nullable=False),
Column('keyword', 'text', nullable=False),
Column('url', 'text', nullable=False)
],
'extensions': [
Column('id'),
Column('digest', 'text', nullable=False),
Column('manifest', 'json', nullable=False),
Column('enabled', 'boolean', default=True),
Column('system', 'boolean', default=False),
Column('config', 'json', default={}),
Column('installed', 'datetime')
],
'tabs': [
Column('id'),
Column('tabid', 'text', nullable=False),
Column('state', 'blob', nullable=False),
Column('title', 'text'),
Column('url', 'text'),
Column('active', 'boolean', default=False),
Column('order', 'integer')
],
'ftpcreds': [
Column('id'),
Column('domain', 'text'),
Column('username', 'text'),
Column('password', 'text')
],
'accounts': [
Column('id'),
Column('handle', 'text', nullable=False),
Column('domain', 'text', nullable=False),
Column('username', 'text'),
Column('token', 'text', nullable=False, unique=True),
Column('data', 'json'),
Column('toot_limit', 'integer')
],
'history': [
Column('id'),
Column('domain', 'text', nullable=False),
Column('url', 'text', nullable=False, unique=True),
Column('title', 'text'),
Column('post', 'boolean'),
Column('count', 'integer', nullable=False, default=0),
Column('last_access', 'datetime')
],
'passfields': [
Column('id'),
Column('domain', 'text', nullable=False),
Column('url', 'text', nullable=False),
Column('userfield', 'text', nullable=False),
Column('passfield', 'text', nullable=False)
]
}
subtypes = {
'str': str,
'int': int,
'float': float,
'dict': DotDict,
'bool': convert_to_boolean,
'datetime': int,
'path': Path
}
default_config = {
'version': (0, 'int'),
'active_acct': (0, 'int'),
'adblock': (True, 'bool'),
'ap_check': (True, 'bool'),
'allow_local_unsigned': (False, 'bool'),
'closed_tabs_limit': (100, 'int'),
'dbus_token': (random_gen(), 'str'),
'default_search': ('ddg', 'str'),
'detach_inspector': (False, 'bool'),
'download_dir': (Path('~/Downloads').expanduser(), 'path'),
'enable_autocomplete': (False, 'bool'),
'fullscreen': (False, 'bool'),
'homepage': (var.local + '/', 'str'),
'https_force': (True, 'bool'),
'load_switch': (True, 'bool'),
'load_tabs': (True, 'bool'),
'local_underline_links': (False, 'bool'),
'maximized': (True, 'bool'),
'post_check': (False, 'bool'),
'search': ('ddg', 'str'),
'scale': (1.0, 'float'),
'size': (DotDict({'width': 1024, 'height': 600}), 'dict'),
'location': (DotDict({'x': None, 'y': None}), 'dict'),
'tab_after_current': (True, 'bool'),
'tab_side': ('TOP', 'str'),
'theme': ('default', 'str')
}
default_permissions = {
'instance': None,
'notification': False,
'microphone': False,
'camera': False,
'location': False,
'fullscreen': True,
'adblock': True,
'dialog': True,
#'javascript': True,
#'images': True,
'autoplay': False,
#'insecure': False
}
default_searches = {
'ddg': ('DuckDuckGo', 'https://ddg.gg/?q={q}'),
'yh': ('Yahoo', 'https://search.yahoo.com/search?p={q}'),
'bing': ('Bing', 'https://www.bing.com/search?q={q}'),
'gg': ('Google', 'https://google.com/search?q={q}'),
'pypi': ('Python Package Index', 'https://pypi.org/search/?q={q}'),
'wiki': ('Wikipedia', 'https://www.wikipedia.org/search-redirect.php?search={q}'),
'man': ('Linux Manpages', 'https://www.die.net/search/?q={q}'),
'bl': ('Lootlemon', 'https://lootlemon.com/search?query={q}'),
'fa': ('Furaffinity', 'https://www.furaffinity.net/search/?q={q}'),
'ws': ('Weasyl', 'https://www.weasyl.com/search?rated=g&q={q}'),
'wsu': ('Weasyl Unsafe', 'https://www.weasyl.com/search?rated=g&rated=a&rated=p&q={q}'),
'e926': ('E926', 'https://e926.net/posts?tags={q}')
}
class CustomSession(Session):
def get_cached(self, table, cache_key, default_value=None, one=True, **kwargs):
cache = self.cache[table].get(cache_key)
if cache:
return cache
result = self.fetch(table, **kwargs)
row = result.one() if one else result.all()
if not row:
return default_value
self.cache[table].set(cache_key, row)
return row
def put_cached(self, table, cache_key, row=None, **kwargs):
if row:
new_row = self.update_row(row, **kwargs)
else:
rows = self.insert(table, **kwargs)
try:
new_row = rows[0]
except (IndexError, TypeError):
self.cache[table].pop(cache_key, None)
return rows
self.cache[table].set(cache_key, new_row)
return new_row
def del_cached(self, table, cache_key, **kwargs):
if not (row := self.fetch(table, **kwargs).one()):
raise KeyError('Cannot find row')
self.remove_row(row)
try:
self.cache[table].remove(cache_key)
except KeyError:
pass
return row
def get_account(self, name=None, domain=None):
if not name and not domain:
return self.fetch('accounts', id=self.get_config('active_acct')).one()
options = {}
if name:
options['username'] = name
if domain:
options['domain'] = domain
row = self.get_cached('accounts', f'{name}{domain}', **options)
if not row:
raise KeyError(f'Cannot find account: {name}@{domain}')
return row
def get_bookmark(self, url):
return self.fetch('bookmarks', url=url).one()
def get_categories(self):
return [row.category for row in self.execute('SELECT DISTINCT category FROM bookmarks')]
def get_config(self, key=None, default=None):
if not key:
data = DotDict()
for key in default_config:
data[key] = self.get_config(key)
return data
if key not in default_config:
raise ValueError('Not a valid config option')
row = self.get_cached('config', key, key=key)
if not row:
return default if default != None else default_config[key][0]
try:
#print(row.key, row.value, subtypes.get(row.type, str)(row.value))
return subtypes.get(row.type, str)(row.value)
except json.decoder.JSONDecodeError:
return default_config[key][0]
def get_extension(self, digest):
return self.get_cached('extensions', digest, digest=digest)
def get_history(self, url):
return self.get_cached('history', url, url=url)
def get_passfield(self, url):
return self.get_cached('passfields', url.without_query, url=url.without_query)
def get_permission(self, domain, cache=True):
row = self.get_cached('siteoptions', domain, domain = domain)
if not row:
return DotDict(id=None, domain=domain, **default_permissions)
return row
def get_search(self, keyword=None, default=True):
if not keyword:
keyword = self.get_config('default_search')
row = self.get_cached('search', keyword, keyword=keyword)
if row:
return row
if default:
return self.get_search()
def get_tab(self, tabid):
return self.get_cached('tabs', tabid, tabid=tabid)
def put_account(self, domain, token, user_data, toot_limit=500):
if not user_data:
raise ValueError('session.put.account: Missing user data')
if self.get_account(user_data.username, domain):
raise KeyError('session.put.account: Account already exists')
return self.insert('accounts',
handle = user_data.username,
domain = domain,
username = user_data.display_name,
token = token,
data = user_data,
toot_limit = toot_limit
).one()
def put_bookmark(self, name, url, description=None, category=None):
if not name or not url:
raise ValueError('Missing name or url parameters')
data = DotDict(
name = name,
url = url,
category = category or 'Misc',
lastupdate = datetime.now()
)
if description:
data['description'] = description
return self.insert('bookmarks', **data)
def put_bookmark_row(self, row, **kwargs):
for key in kwargs:
if key not in {'name', 'url', 'description', 'category'}:
raise KeyError(f'Invalid bookmark property: {key}')
kwargs['lastupdate'] = datetime.now()
return self.update_row(row, **kwargs)
def put_config(self, key, value=None, subtype=None):
if key not in default_config:
logging.error(f'put.config: Not a valid config key:', key)
return
if not subtype:
subtype = default_config[key][1]
if value == None:
value = default_config[key][0]
return self.put_cached('config', key,
row = self.fetch('config', key=key).one(),
key = key,
value = DotDict(value).to_json() if subtype == 'dict' else str(value),
type = subtype
)
def put_extension(self, extension):
if self.get_extension(extension.digest):
raise KeyError('Extension already Exists')
if extension.digest != sha256((extension.manifest.author + extension.manifest.shortname).encode('UTF-8')).hexdigest():
raise ValueError('Digest does not match')
return self.put_cached('extensions', extension.digest,
digest = extension.digest,
manifest = extension.manifest.to_json(),
system = not extension.path,
installed = datetime.now()
)
def put_extstate(self, digest, enabled):
row = self.get_extension(digest)
self.update_row(row, enabled=enabled)
def put_history(self, url, title, post=None):
assert post in [True, False, None]
if not url:
logging.verbose('Url empty')
return
if not url.domain:
logging.verbose('Not a valid url')
return
if not title:
title = 'Untitled'
date = datetime.now()
row = self.get_history(url)
new_row = not row
new_data = dict(
title = title,
url = url,
last_access = datetime.now(),
domain = url.domain
)
if row:
new_data['post'] = post if post != None else row.post
new_data['count'] = row.count + 1
else:
new_data['post'] = post
new_data['count'] = 1
return not row, self.put_cached('history', url, row=row, **new_data)
def put_history_from_tab(self, tab):
return self.put_history(tab.url, tab.title, (True if tab.fedi_post else False))
def put_passfield(self, url, userfield, passfield):
if None in [userfield, passfield]:
logging.verbose('Not inserting empty login fields')
return
row = self.s.get.passfield(url)
data = {
'url': url.without_query,
'domain': url.domain,
'userfield': userfield,
'passfield': passfield
}
return self.put_cached('passfields', url.without_query, self.get_passfield(url), **data)
def put_permission(self, domain, key, value):
row = self.fetch('siteoptions', domain=domain).one()
return self.put_cached('siteoptions', domain, row, domain=domain, **{key: value})
def put_search(self, name, keyword, url):
if '{q}' not in url:
raise ValueError('The query variable "{q}" must be in the url')
data = {'name': name, 'keyword': keyword.lower(), 'url': url}
if None in data.values():
logging.error('put.search: Missing name, keyword, or url')
return
row = self.fetch('search', keyword=keyword).one()
return self.put_cached('search', keyword, row=row, **data)
def put_tab(self, tabid, title, url, state, active, order):
row = self.get_tab(tabid)
data = {
'title': title,
'url': url,
'state': state,
'active': active,
'order': order,
'tabid': tabid
}
return self.put_cached('tabs', tabid, row, **data)
def del_bookmark(self, url):
self.remove('bookmarks', url=url)
def del_config(self, key):
self.put_config(key)
def del_extension(self, digest):
self.del_cached('extensions', digest, digest=digest)
def del_history(self, url):
self.del_cached('history', url, url=url)
def del_history_days(self, days):
rows_affected = []
for row in self.fetch('history'):
if row.last_access < (datetime.now() - timedelta(days=days)):
self.remove_row(row)
self.cache['history'].remove(row.url)
rows_affected.append(row)
return rows_affected
def del_passfield(self, domain):
self.del_cached('passfields', domain, domain=domain)
def del_permission(self, domain):
self.del_cached('siteoptions', domain, domain=domain)
def del_search(self, keyword):
self.del_cached('search', keyword, keyword=keyword)
if keyword == self.get_config('default_search'):
if not (row := s.fetch('search').one()):
return
s.put_config('default_search', row.keyword)