This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/dbus.py
2022-09-24 06:57:21 -04:00

256 lines
5 KiB
Python

import traceback
from dasbus.connection import SessionMessageBus
from dasbus.error import DBusError
from dasbus.identifier import DBusServiceIdentifier
from dasbus.server.interface import dbus_interface
from dasbus.server.publishable import Publishable
from dasbus.server.template import InterfaceTemplate
from dasbus.typing import Bool, List, Str
from . import var
from .functions import get_app
NAMESPACE = 'xyz.barkshark.Web'
def SessionBusIdent(base='Browser'):
data = {
'namespace': NAMESPACE.split('.'),
'message_bus': SessionMessageBus()
}
return data
class Client():
def __init__(self):
self.bus = None
self.proxy = None
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()
def __del__(self):
self.disconnect()
@property
def connected(self):
return True if self.proxy else False
def connect(self):
if self.bus:
return True
self.bus = SessionMessageBus()
self.proxy = self.bus.get_proxy(
NAMESPACE,
'/xyz/barkshark/Web'
)
if not self.Merp():
logging.verbose(f'Failed to connect to dbus')
self.proxy = None
return False
def disconnect(self):
if not self.bus:
return True
self.bus.disconnect()
self.bus = None
self.proxy = None
def run_cmd(self, cmd, *args, **kwargs):
try:
func = getattr(self.proxy, cmd)
except DBusError:
logging.verbose('Could not find DBus interface: xyz.barkshark.pyweb')
self.disconnect()
return False
except AttributeError:
if not self.bus:
logging.verbose('Not connected to DBus')
return False
except Exception as e:
traceback.print_exc()
logging.error('Failed to connect to DBus because of program error')
return False
return func(*args, **kwargs)
def Introspect(self) -> dict:
return self.run_cmd('Introspect')
def Merp(self) -> Str:
return self.run_cmd('Merp')
def NewTab(self, url: str, switch: Bool):
return self.run_cmd('NewTab', url, switch)
def CloseTab(self, tabid: Str):
return self.run_cmd('CloseTab', tabid)
def GetTabs(self) -> List:
return self.run_cmd('GetTabs')
def Internal(self, data: str) -> str:
return self.run_command('Internal', data)
def Present(self):
return self.run_cmd('Present')
def ExtFunc(self, func, *args, **kwargs):
data = DotDict(
func = func,
args = list(args),
kwargs = dict(kwargs)
)
resp = DotDict(self.run_cmd('ExtFunc', data.to_json()))
error = resp.get('error')
msg = resp.get('message')
if error:
raise DBusError(error)
if msg:
logging.debug('DBusClient.ExtFunc:', msg)
return None
if resp.get('data') == None:
return
return resp.data
def Database(self, func, *args, **kwargs):
return self.ExtFunc(f'db.session.{func}', *args, **kwargs)
@dbus_interface(NAMESPACE)
class ServerInterface(InterfaceTemplate):
def Merp(self) -> Str:
return self.implementation.handle_merp()
def NewTab(self, url: str, switch: Bool):
return self.implementation.handle_new_tab(url, switch=switch)
def CloseTab(self, tabid: Str):
return self.implementation.handle_close_tab(tabid)
def GetTabs(self) -> List:
return self.implementation.handle_get_tabs()
def Present(self):
return self.implementation.handle_present()
def ExtFunc(self, data: Str) -> Str:
return self.implementation.handle_json_function(data)
class Server(Publishable):
def __init__(self, window, *args, **kwargs):
super().__init__(*args, **kwargs)
self.window = window
self.ident = DBusServiceIdentifier(**SessionBusIdent())
self.bus = self.ident.message_bus
self.bus.publish_object(self.ident.object_path, self.for_publication())
self.bus.register_service(self.ident.service_name)
def disconnect(self):
self.bus.disconnect()
def for_publication(self):
return ServerInterface(self)
def handle_merp(self):
logging.verbose('DBus: run command Merp')
return 'UvU'
def handle_new_tab(self, url, switch):
logging.verbose(f'DBus: run command NewTab with args: {url}, {switch}')
self.window.new_tab(url, switch=switch)
def handle_close_tab(self, tabid):
logging.verbose(f'DBus: run command CloseTab with args: {tabid}')
self.window.close_tab(tabid)
def handle_get_tabs(self):
logging.verbose('DBus: run command GetTabs')
data = []
for tabid, widget in self.window.tabdata.items():
if not widget.webview:
continue
data.append((tabid, widget.title, widget.url))
return data
def handle_present(self):
logging.verbose('DBus: run command Present')
self.window.present()
def handle_json_function(self, data):
data = DotDict(data)
func = data.get('func')
args = data.get('args', [])
kwargs = data.get('kwargs', {})
logging.verbose(f'DBus: run ExtFunc command "{func}" with args: {args}, {kwargs}')
if not func:
return DotDict(error = 'Missing function').to_json()
resp = None
if func == 'notification':
self.window.notification(*args, **kwargs)
elif func.startswith('db.session.'):
func = func.replace('db.session.', '')
with getapp().db.session as s:
resp = getattr(s, func)(*args, **kwargs)
return DotDict(data = resp).to_json()