This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/protocol/local.py
2022-10-05 13:30:39 -04:00

712 lines
18 KiB
Python

import objgraph
import mimetypes
import traceback
from jinja2.exceptions import TemplateNotFound
from izzylib.color import Color
from izzylib.dotdict import DotDict
from izzylib.enums import LogLevel
from izzylib.exceptions import HttpClientError
from izzylib.misc import class_name, fuzzy_string_match
from threading import Thread
from . import ProtocolHandler, ProtocolRequest, handle_remote_file, list_directory
from .functions import error, finish_request, finish_request_error, redirect
from .. import var
from ..functions import TimeoutCallback, get_app, run_in_gui_thread
from ..passwords import PASSWORD_STORAGE
class LocalRequest(ProtocolRequest):
def ok_or_redirect(self, *message, level='INFO'):
if redir := self.query.get('redir'):
return self.redirect(redir, ' '.join(message), level)
self.window.notification(' '.join(message), level=level)
return self.response('ok')
def handle_refresh_account(self, row):
row.refresh()
self.ok_or_redirect(f'Refreshed account info: {row.fullhandle}')
def handle_return_account(self, row):
html = self._render_template('account_row.haml', {'row': row})
return self.response(html)
Local = ProtocolHandler('pyweb', LocalRequest)
@Local.route('/')
def home(handler, request):
context = DotDict(
links = []
)
with handler.db.session as s:
for row in s.fetch('links'):
if bookmark := s.fetch('bookmarks', id=row.bookmarkid):
context.links.append(bookmark)
context.search = s.get_search()
return request.page('home', context)
@Local.route('/search-web')
def search_web(handler, request):
keyword = request.query.get('keyword')
try:
text = request.query['text']
except KeyError:
request.error('No search text provided', 400)
with handler.db.session as s:
row = s.get_search(keyword)
return request.redirect(row.compile(text))
@Local.route('/favicon.ico')
def favicon(handler, request):
return icon(handler, request, 'app.png')
@Local.route('/about')
def about(handler, request):
return request.page('about')
### Actions ###
@Local.route('/action/clear/{item}')
def action_clear(handler, request, item):
if item == 'cache':
handler.app.context.clear_cache()
msg = 'Cleared cache'
elif item in ['history', 'siteoptions']:
with handler.db.session as s:
s.truncate(item)
handler.db.cache[item].clear()
if item == 'history':
msg = 'Cleared history'
elif item == 'siteoptions':
msg = 'Cleared site options'
return request.ok_or_redirect(msg)
### Bookmarks ###
@Local.route('/bookmarks')
def bookmarks_home(handler, request):
bookmarks = DotDict()
with handler.db.session as s:
for row in s.fetch('bookmarks'):
category = row.category
if category not in bookmarks.keys():
bookmarks[category] = []
row.name = row.name.replace('<', '&lt;').replace('>', '&gt;')
bookmarks[category].append(row)
for category in bookmarks:
bookmarks[category] = sorted(bookmarks[category], key=lambda x:x['name'])
return request.page('bookmarks', dict(bookmarks = {key: bookmarks[key] for key in sorted(bookmarks)}))
@Local.route('/bookmarks/edit/{markid:int}')
def bookmarks_edit(handler, request, markid):
with handler.db.session as s:
if markid:
if not (row := s.fetch('bookmarks', id=markid).one()):
return request.error(f'Invalid bookmark ID: {markid}', 404)
else:
row = None
context = {
'bookmark': row,
'categories': s.get_categories()
}
return request.page('bookmark-edit', context)
@Local.route('/bookmarks/update/{markid:int}')
def bookmarks_update(handler, request, markid):
with handler.db.session as s:
if markid > 0:
if not (row := s.fetch('bookmarks', id=markid).one()):
return request.error(f'Invalid bookmark ID: {markid}', 404)
s.put_bookmark_row(row, **request.query)
msg = 'Updated bookmark'
else:
s.put_bookmark(**request.query)
msg = 'Created new bookmark'
return request.redirect('/bookmarks', msg)
@Local.route('/bookmarks/delete/{markid:int}')
def bookmarks_delete(handler, request, markid):
with handler.db.session as s:
if not (row := s.fetch('bookmarks', id=markid).one()):
return request.error(f'Invalid bookmark ID: {markid}', 404)
s.del_bookmark_row(row)
return request.redirect('/bookmarks', 'Deleted bookmark')
### Downloads ###
@Local.route('/downloads')
def downloads_home(handler, request):
return request.error('Not implemented yet', 500)
### Extensions ###
@Local.route('/extensions')
def extensions_home(handler, request):
return request.error('Not implemented yet', 500)
### Fediverse ###
@Local.route('/fediverse')
def fediverse_home(handler, request):
accounts = ','.join([str(acct.id) for acct in handler.app.accounts])
context = DotDict(
accounts = f'[{accounts}]'
)
return request.page('fediverse', context)
@Local.route('/fediverse/avatar/{acctid:int}')
def fediverse_avatar(handler, request, acctid):
try:
row = handler.app.get_account_by_id(acctid)
except IndexError:
return request.error(f'Account with ID not found: {acctid}', 404)
try:
return request.file(row.avatar)
except FileNotFoundError:
return request.error(f'Avatar for account does not exist: @{row.handle}@{row.domain}', 404)
@Local.route('/fediverse/refresh/{acctid:int}')
def fediverse_refresh(handler, request, acctid):
try:
row = handler.app.get_account_by_id(acctid)
except IndexError:
return request.error(f'Account with ID not found: {acctid}', 404)
return request.finish_thread(request.handle_refresh_account, row)
@Local.route('/fediverse/set_active/{acctid:int}')
def fediverse_set_active(handler, request, acctid):
try:
row = handler.app.get_account_by_id(acctid)
except IndexError:
return request.error(f'Account with ID not found: {acctid}', 404)
row.set_active()
return request.ok_or_redirect(f'Account set as active: {row.fullhandle}')
@Local.route('/fediverse/logout/{acctid:int}')
def fediverse_logout(handler, request, acctid):
try:
row = handler.app.get_account_by_id(acctid)
except IndexError:
return request.error(f'Account with ID not found: {acctid}', 404)
with handler.db.session as s:
s.remove_row(row)
handler.app.accounts.remove(row)
return request.ok_or_redirect(f'Logged out of {row.fullhandle}')
@Local.route('/fediverse/acct_info/{acctid:int}')
def fediverse_acct_info(handler, request, acctid):
try:
row = handler.app.get_account_by_id(acctid)
except IndexError:
return request.error(f'Account with ID not found: {acctid}', 404)
return request.finish_thread(request.handle_return_account, row)
### Help ###
@Local.route('/help')
def help_home(handler, request):
keybinds = {}
for description, keydata in handler.app.keybinds.items():
keybinds[description] = [bind.replace('<','').replace('>','+') for bind in keydata[1]]
return request.page('help', dict(keybinds = keybinds))
### History ###
@Local.route('/history')
def history_home(handler, request):
context = DotDict(
history = {}
)
with handler.db.session as s:
for row in s.fetch('history', orderby='domain'):
if (search_text := request.query.get('text')) and True not in fuzzy_string_match(search_text, row.domain, row.url, row.title):
continue
if row.domain not in context.history:
context.history[row.domain] = []
row.title = row.title.replace('<', '&lt;').replace('>', '&gt;')
context.history[row.domain].append(row)
return request.page('history', context)
@Local.route('/history/delete/{rowid:int}')
def history_delete(handler, request, rowid):
with handler.db.session as s:
s.remove('history', id=rowid)
return request.ok_or_redirect('Removed history item')
@Local.route('/history/delete_host/{host}')
def history_delete_host(handler, request, host):
with handler.db.session as s:
s.remove('history', domain=host)
return request.ok_or_redirect('Removed history items for', host)
@Local.route('/history/delete_days/{days:int}')
def history_delete_days(handler, request, days):
assert days > 0
with handler.db.session as s:
rows = s.del_history_days(days)
return request.ok_or_redirect(f'Cleared history {len(rows)} item(s) older than', str(days), 'days' if days > 1 else 'day')
@Local.route('/history/delete_all')
def history_clear(handler, request):
with handler.db.session as s:
s.execute('DELETE FROM history')
s.cache['history'].clear()
return request.ok_or_redirect('History cleared')
### Passwords ###
@Local.route('/passwords')
def passwords_home(handler,request):
domain = request.query.get('domain')
if handler.password.storage_type == 'bitwarden':
if not handler.password.account:
return request.error('Not logged into Bitwarden', 400)
if not handler.password.active:
return request.error('Bitwarden session key invalid', 400)
elif not handler.password.session_key:
return request.error('No Bitwarden session key', 400)
if (search_text := request.query.get('text')):
items = []
for row in handler.password.fetch():
if True in fuzzy_string_match(search_text, row.domain, row.url, row.username, row.note, row.label):
items.append(row)
elif domain:
items = handler.password.fetch(domain=domain).all()
else:
items = handler.password.fetch().all()
return request.page('passwords', {'items': items})
@Local.route('/passwords/edit', '/passwords/edit/{rowid}')
def passwords_edit(handler, request, rowid=None):
if not rowid:
return request.page('password-edit', {'password': None})
if not (row := handler.password.fetch(id=rowid)):
return request.error(f'Cannot find password with row ID: {rowid}', 404)
return request.page('password-edit', {'password': row})
@Local.route('/passwords/update', '/passwords/update/{rowid}')
def passwords_update(handler, request, rowid=None):
query = request.query.copy()
query.pop('redir', None)
if not query.get('label'):
query['label'] = f'{query.username} @ {query.domain}'
for key, value in list(query.items()):
if isinstance(value, str) and not value:
del query[key]
if not rowid:
row = handler.password.insert(**query)
return request.ok_or_redirect(f'Created new password: {row.label}')
if not all(map(query.get, ('username', 'password', 'url'))):
return request.error(f'Missing username, password or url', 400)
row = handler.password.update(rowid, **query)
return request.ok_or_redirect(f'Updated password: {row.label}')
@Local.route('/passwords/copy/{rowid}')
def passwords_copy(handler, request, rowid):
if not (row := handler.password.fetch(id=rowid)):
return request.error(f'Cannot find password with ID: {rowid}', 404)
row.copy_password()
return request.ok_or_redirect(f'Copied password for 60 seconds')
@Local.route('/passwords/delete/{rowid}')
def passwords_update(handler, request, rowid):
if not (row := handler.password.fetch(id=rowid)):
return request.error(f'Cannot find password with ID: {rowid}', 404)
label = row.label
handler.password.remove(rowid)
return request.ok_or_redirect(f'Deleted password: {label}')
#return request.error(f'Cannot find password: {rowid}', 404)
### Preferences ###
@Local.route('/preferences')
def preferences_home(handler, request):
context = DotDict(
bitwarden = DotDict(),
pass_backends = tuple(PASSWORD_STORAGE.keys()),
pass_type = handler.password.storage_type,
log_levels = tuple(value.name for value in LogLevel),
settings = {
'font': 'sans undertale',
'font-size': '14'
}
)
if context.pass_type == 'bitwarden':
context.bitwarden.email = handler.password.account
try:
context.bitwarden.active = handler.password.active
context.bitwarden.locked = not handler.password.session_key
except ConnectionError:
context.bitwarden.active = False
context.bitwarden.locked = True
else:
context.bitwarden = dict(
email = None,
active = None,
locked = None
)
context['themes'] = handler.window.themes
return request.page('preferences', context)
@Local.route('/preferences/update')
def preferences_update(handler, request):
if not len([key for key in request.query.keys() if key != 'redir']):
return request.error('No key/value pairs provided', 400)
with handler.db.session as s:
for key, value in request.query.items():
if key == 'redir':
continue
elif key == 'pass_type':
handler.app.setup_password_storage(value)
elif key == 'log_level':
logging.set_config('level', value)
elif key == 'zoom':
value = int(value) / 100
for tab in handler.window.tabdata.values():
if tab.zoom == s.get_config('zoom'):
tab.page_action('zoom', zoom=value)
s.put_config(key, value)
logging.verbose(f'Updated config: {key} = {value}')
return request.ok_or_redirect('Updated preferences')
@Local.route('/preferences/reset/{key}')
def preferences_reset(handler, request, key):
try:
with handler.db.session as s:
s.put_config(key)
except KeyError:
return request.redirect('/preferences', f'Invalid config key: {key}', 'INFO')
return request.redirect('/preferences', f'Reset config to default: {key}')
@Local.route('/preferences/bitwarden/login/{username}/{password}')
def preferences_bw_login(handler, request, username, password):
handler.password.stop()
if not (skey := handler.password.login(username, password, force=True)):
return request.error(f'Failed to login to Bitwarden', 401)
with handler.db.session as s:
s.put_config('bw_session_key', skey)
handler.password.start()
return request.ok_or_redirect('Logged into Bitwarden')
@Local.route('/preferences/bitwarden/logout')
def preferences_bw_logout(handler, request):
handler.password.stop()
handler.password.logout()
return request.ok_or_redirect('Logged out of Bitwarden')
@Local.route('/preferences/bitwarden/lock')
def preferences_bw_lock(handler, request):
handler.password.stop(lock=True)
with handler.db.session as s:
s.put_config('bw_session_key', None)
return request.redirect('/preferences', 'Locked password vault')
@Local.route('/preferences/bitwarden/unlock/{password}')
def preferences_bw_unlock(handler, request, password):
request.query.redir = '/preferences'
handler.password.stop()
if not (skey := handler.password.unlock(password)):
return request.redirect('/preferences', f'Failed to unlock password vault', 'ERROR')
handler.password.start()
with handler.db.session as s:
s.put_config('bw_session_key', skey)
return request.ok_or_redirect('Unlocked password vault')
@Local.route('/preferences/theme/delete/{hash}')
def preferences_theme_delete(handler, request, hash):
themes = handler.window.themes
try:
theme = themes.user[hash]
except KeyError as e:
request.error(e, 404)
theme.path.delete()
return request.ok_or_redirect(f'Deleted theme: {theme.name}')
@Local.route('/preferences/theme/set/{hash}')
def preferences_theme_set(handler, request, hash):
themes = handler.window.themes
if hash == 'default':
themes.set('default')
message = 'Set theme to system'
else:
try:
theme = themes.set(hash)
message = f'Set theme to {theme.name}'
except KeyError:
return request.error(f'Cannot find theme with hash: {hash}', 404)
return request.ok_or_redirect(message)
### Search ###
@Local.route('/search')
def search_home(handler, request):
with handler.db.session as s:
context = {'searches': sorted(s.fetch('search').all(), key=lambda x:x['name'])}
return request.page('search', context)
@Local.route('/search/edit/{searchid:int}')
def search_edit(handler, request, searchid):
context = DotDict(
search = None
)
if searchid:
with handler.db.session as s:
if not (row := s.fetch('search', id=searchid).one()):
return request.error(f'Invalid search engine ID: {searchid}', 404)
context.search = row
return request.page('search-edit', context)
@Local.route('/search/update/{searchid:int}')
def search_update(handler, request, searchid):
if searchid > 0:
msg = 'Updated search engine'
else:
msg = 'Created new search engine'
with handler.db.session as s:
if not s.fetch('search', id=searchid):
return request.error(f'Invalid search engine ID: {searchid}', 404)
s.put_search(**request.query)
return request.redirect('/search', msg)
@Local.route('/search/delete/{searchid:int}')
def search_delete(handler, request, searchid):
with handler.db.session as s:
if not (row := s.fetch('search', id=searchid).one()):
return request.error(f'Invalid search engine ID: {searchid}', 404)
s.remove_row(row)
return request.ok_or_redirect('Removed search:', searchid)
@Local.route('/search/default/{searchid:int}')
def search_default(handler, request, searchid):
with handler.db.session as s:
if not (row := s.fetch('search', id=searchid).one()):
return request.error(f'Invalid search engine ID: {searchid}', 404)
s.put_config('default_search', row.keyword)
return request.ok_or_redirect('Set default search')
## Debug
@Local.route('/debug')
def debug_home(handler, request):
return request.page('debug')
@Local.route('/debug/objects')
def debug_objects(handler, request):
objects = objgraph.most_common_types(
limit = int(request.query.get('minimum', 100)),
shortnames = False
)
context = dict(objects = objects)
return request.page('debug', context)
@Local.route('/debug/leaking')
def debug_leaking(handler, request):
objects = []
for obj in objgraph.get_leaking_objects():
try:
objects.append((class_name(obj), str(obj)))
except:
pass
context = dict(
objects = objects
)
return request.page('debug', context)
### Resources ###
@Local.route('/css/{filename}')
def css(handler, request, filename):
context = DotDict(
primary = Color('#e7a'),
secondary = Color('#e7a'),
background = Color('#191919'),
positive = Color('#ada'),
negative = Color('#daa'),
speed = 350
)
return request.template(f'css/{filename}', context)
@Local.route('/js/{filename}')
def javascript(handler, request, filename):
with handler.app.path.localweb.join('js', filename).open() as fd:
return request.response(fd.read(), 'application/javascript')
@Local.route('/font/{name}/{filename}')
def font(handler, request, name, filename):
with handler.app.path.localweb.join('fonts', name, filename).open('rb') as fd:
return request.response(fd.read(), request.mimetype)
@Local.route('/icon/{name}')
def icon(handler, request, name):
if name == 'app.png':
path = handler.app.path.resources.join('icon.png')
mime = 'image/png'
else:
path = handler.app.path.resources.join('icons', name)
mime = 'image/svg+xml'
with path.open('rb') as fd:
return request.response(fd.read(), mime)