This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/functions.py
2022-09-24 06:57:21 -04:00

334 lines
7.8 KiB
Python

import cairo
import multiprocessing
import random
import os
import socket
import sys
import threading
import time
import traceback
from configobj import ConfigObj
from ctypes import cdll, create_string_buffer, byref
from dns.resolver import NXDOMAIN, resolve
from izzylib.http_client import HttpClient
from json.decoder import JSONDecodeError
from platform import system
from urllib.parse import unquote
from . import __version__, var
js_cache = LruCache()
appimage_exec = os.environ.get('APPIMAGE')
client_options = {'appagent': f'PyWeb/{__version__}'}
glib_types = {
bytes: GLib.Variant.new_byte,
str: GLib.Variant.new_string,
bool: GLib.Variant.new_boolean,
int: GLib.Variant.new_int64,
tuple: GLib.Variant.new_tuple,
list: GLib.Variant.new_array
}
categories = ['Network', 'WebBrowser']
mimetypes = [
'application/x-extension-htm', 'application/x-extension-html',
'application/x-extension-shtml', 'application/x-extension-xht',
'application/x-extension-xhtml', 'application/x-extension-xml'
'text/html', 'text/xml',
'application/xhtml+xml', 'application/xml',
'x-scheme-handler/http', 'x-scheme-handler/https',
'x-scheme-handler/ftp', 'x-scheme-handler/file'
]
## Haven't implemented proxy support yet
#if var.proxy:
#client_options.update({
#'proxy_type': var.proxy.scheme,
#'proxy_host': var.proxy.host,
#'proxy_port': var.proxy.port
#})
def connect(widget, signal, callback, *args, original_args=False, **kwargs):
if not widget:
logging.debug(f'Signal: {signal}, Callback: {callback}, Args: {args}, Kwargs: {kwargs}')
raise TypeError('Missing widget')
if original_args:
return widget.connect(signal, lambda *sigarg: callback(*sigarg, *args, **kwargs))
else:
return widget.connect(signal, lambda *sigarg: callback(*args, **kwargs))
# Not used atm, but keeping just in case
def convert_to_variant(arg):
if isinstance(arg, dict):
return GLib.Variant('s', json.dumps(arg))
elif isinstance(arg, tuple) or isinstance(arg, list):
values = []
for value in arg:
func = glib_types.get(type(value), glib_types[str])
if func == glib_types[str]:
value = str(value)
values.append(func(value))
return glib_types[tuple](values)
else:
func = glib_types.get(type(arg), glib_types[str])
if func == glib_types[str]:
value = str(arg)
return func(arg)
def get_app():
return Gio.Application.get_default()
def get_buffer_text(text_buffer):
return text_buffer.get_text(text_buffer.get_start_iter(), text_buffer.get_end_iter(), True)
def human_bytes(B):
B = float(B)
KB = float(1024)
MB = float(KB ** 2)
GB = float(KB ** 3)
TB = float(KB ** 4)
if B < KB:
return '{0} {1}'.format(B,'B')
elif KB <= B < MB:
return '{0:.2f} KiB'.format(B/KB)
elif MB <= B < GB:
return '{0:.2f} MiB'.format(B/MB)
elif GB <= B < TB:
return '{0:.2f} GiB'.format(B/GB)
elif TB <= B:
return '{0:.2f} TiB'.format(B/TB)
else:
raise ValueError("That's fucking huge!")
def icon_as_pixbuf(name, height=16, ext='svg'):
filename = get_app().path.resources.join('icons').join(f'{name}.{ext}')
return GdkPixbuf.Pixbuf.new_from_file_at_scale(filename, -1, height, True)
def icon_set(widget, *args, **kwargs):
widget.set_from_pixbuf(icon_as_pixbuf(*args, **kwargs))
return widget
def install_desktop_file(overwrite=False):
app = get_app()
deskfile = Path.xdg.data.join('applications/barkshark-web.desktop')
deskfile.parent.mkdir()
workdir = app.path.script.parent
executable = appimage_exec or f'{sys.executable} -m barkshark_web'
if deskfile.exists() and not overwrite:
logging.debug(f'Desktop file already exists: {deskfile}')
return
config = ConfigObj()
config.filename = deskfile
config.update({
'Desktop Entry': {
'Name': 'Barkshark Web',
'GenericName': 'Web Browser',
'Comment': 'Browse the World Wide Web',
'Categories': ';'.join(categories),
'Icon': 'applications-internet',
'MimeType': ';'.join(mimetypes),
'Exec': f'{sys.executable} -m barkshark_web %U',
'Path': workdir,
'StartupNotify': True,
'StartupWMClass': 'BarksharkWeb',
'Type': 'Application',
'X-DBUS-ServiceName': 'xyz.barkshark.Web',
'X-DBUS-StartupType': 'Unique'
},
'Desktop Action NewTab': {
'Name': 'New Tab',
'Exec': f'{sys.executable} -m barkshark_web tab new',
'Path': workdir
}
})
if appimage_exec:
del config['Desktop Entry']['Path']
del config['Desktop Action NewTab']['Path']
config.write()
deskfile.chmod(755)
return deskfile
def set_default():
subprocess.run(['xdg-settings' 'set' 'default-web-browser' 'barkshark-web.desktop'])
def load_js_file(name, ext=False):
cache_name = name + str(1 if ext else 0)
cached = js_cache.fetch(cache_name)
if cached:
return cached
js = get_app().path.resources.join(f'{"ext_js" if ext else "js"}/{name}.js').read()
js_cache.store(cache_name, js)
return js
def new_pixbuf(image):
if isinstance(image, (cairo.Surface, cairo.ImageSurface)):
return Gdk.pixbuf_get_from_surface(image, 0, 0, image.get_width(), image.get_height())
elif isinstance(image, Path):
return GdkPixbuf.Pixbuf.new_from_file(image)
elif not isinstance(image, GdkPixbuf.Pixbuf):
raise TypeError('Image is not a Cairo Surface, Pixbuf, or Path object')
return image
def resolve_address(domain, type=None):
domain = domain.strip()
if not type:
type == 'AAAA'
try:
return random.choice(resolve(domain, type)).address
except TypeError:
# Not sure why this error is coming from dns.enum, so ignore it
pass
except NXDOMAIN as e:
if type != 'AAAA':
raise e
return random.choice(resolve(domain, 'A')).address
def run_in_gui_thread(func, *args, **kwargs):
GLib.idle_add(lambda *unused: func(*args, **kwargs))
def scale_pixbuf(pixbuf, new_size=16, keep='height'):
width = pixbuf.get_width()
height = pixbuf.get_height()
## (size ratio) * width * width to height ratio
new_width = (new_size / width) * width
new_height = (new_size / height) * height
if keep == 'width':
new_height *= height/width
elif keep == 'height':
new_width *= width/height
return pixbuf.scale_simple(new_width, new_height, GdkPixbuf.InterpType.BILINEAR)
def set_image(widget, image, size=16, keep='height'):
try:
image = scale_pixbuf(new_pixbuf(image), size, keep)
widget.set_from_pixbuf(image)
return widget
except TypeError:
pass
if isinstance(image, str):
widget.set_from_icon_name(image, Gtk.IconSize.SMALL_TOOLBAR)
widget.set_pixel_size(size)
else:
raise TypeError('Image is not a Cairo Surface, Pixbuf, Path object, or an icon name (str)')
return widget
def set_margin(widget, size=5, horizontal=True, vertical=True):
if vertical:
widget.set_margin_top(size)
widget.set_margin_bottom(size)
if horizontal:
widget.set_margin_left(size)
widget.set_margin_right(size)
def set_proc_name(name='pyweb'):
## Currently only works on unix OSs with libc
if system() == 'Windows':
return
try:
libc = cdll.LoadLibrary('libc.so.6')
buff = create_string_buffer(len(name) + 1)
buff.value = name.encode("UTF-8")
ret = libc.prctl(15, byref(buff), 0, 0, 0)
except OSError:
logging.debug('Cannot find libc')
ret = 1
if ret != 0:
logging.error('Failed to set process title')
return ret
class Thread(threading.Thread):
def __init__(self, func, *args, **kwargs):
super().__init__(daemon=True)
self.func = func or self.run_func
self.args = args
self.kwargs = kwargs
self.stop_event = threading.Event()
if not callable(self.func):
raise TypeError('func argument must be a callable')
def run(self):
self.func(*self.args, **self.kwargs)
def stop(self):
self.stop_event.set()
def stopped(self):
return self.stop_event.is_set()
def run_func(self, *args, **kwargs):
pass
class TimeoutCallback(Thread):
def __init__(self, timeout, func, *args, **kwargs):
super().__init__(func, *args, **kwargs)
self.timeout = timeout
def run(self):
time.sleep(self.timeout)
super().run()