340 lines
7.9 KiB
Python
340 lines
7.9 KiB
Python
import cairo
|
|
import multiprocessing
|
|
import random
|
|
import os
|
|
import socket
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
|
|
from configobj import ConfigObj
|
|
from ctypes import cdll, create_string_buffer, byref
|
|
from dns.resolver import NXDOMAIN, resolve
|
|
from izzylib.http_client import HttpClient
|
|
from json.decoder import JSONDecodeError
|
|
from platform import system
|
|
from urllib.parse import unquote
|
|
|
|
from . import __version__, var
|
|
|
|
|
|
js_cache = LruCache()
|
|
appimage_exec = os.environ.get('APPIMAGE')
|
|
client_options = {'appagent': f'PyWeb/{__version__}'}
|
|
glib_types = {
|
|
bytes: GLib.Variant.new_byte,
|
|
str: GLib.Variant.new_string,
|
|
bool: GLib.Variant.new_boolean,
|
|
int: GLib.Variant.new_int64,
|
|
tuple: GLib.Variant.new_tuple,
|
|
list: GLib.Variant.new_array
|
|
}
|
|
|
|
categories = ['Network', 'WebBrowser']
|
|
mimetypes = [
|
|
'application/x-extension-htm', 'application/x-extension-html',
|
|
'application/x-extension-shtml', 'application/x-extension-xht',
|
|
'application/x-extension-xhtml', 'application/x-extension-xml'
|
|
'text/html', 'text/xml',
|
|
'application/xhtml+xml', 'application/xml',
|
|
'x-scheme-handler/http', 'x-scheme-handler/https',
|
|
'x-scheme-handler/ftp', 'x-scheme-handler/file'
|
|
]
|
|
|
|
## Haven't implemented proxy support yet
|
|
#if var.proxy:
|
|
#client_options.update({
|
|
#'proxy_type': var.proxy.scheme,
|
|
#'proxy_host': var.proxy.host,
|
|
#'proxy_port': var.proxy.port
|
|
#})
|
|
|
|
|
|
def connect(widget, signal, callback, *args, original_args=False, **kwargs):
|
|
if not widget:
|
|
logging.debug(f'Signal: {signal}, Callback: {callback}, Args: {args}, Kwargs: {kwargs}')
|
|
raise TypeError('Missing widget')
|
|
|
|
if original_args:
|
|
return widget.connect(signal, lambda *sigarg: callback(*sigarg, *args, **kwargs))
|
|
else:
|
|
return widget.connect(signal, lambda *sigarg: callback(*args, **kwargs))
|
|
|
|
|
|
# Not used atm, but keeping just in case
|
|
def convert_to_variant(arg):
|
|
if isinstance(arg, dict):
|
|
return GLib.Variant('s', json.dumps(arg))
|
|
|
|
elif isinstance(arg, tuple) or isinstance(arg, list):
|
|
values = []
|
|
|
|
for value in arg:
|
|
func = glib_types.get(type(value), glib_types[str])
|
|
|
|
if func == glib_types[str]:
|
|
value = str(value)
|
|
|
|
values.append(func(value))
|
|
|
|
return glib_types[tuple](values)
|
|
|
|
else:
|
|
func = glib_types.get(type(arg), glib_types[str])
|
|
|
|
if func == glib_types[str]:
|
|
value = str(arg)
|
|
|
|
return func(arg)
|
|
|
|
|
|
def get_app():
|
|
return Gio.Application.get_default()
|
|
|
|
|
|
def get_buffer_text(text_buffer):
|
|
if not isinstance(text_buffer, Gtk.TextBuffer):
|
|
text_buffer = text_buffer.get_buffer()
|
|
|
|
return text_buffer.get_text(text_buffer.get_start_iter(), text_buffer.get_end_iter(), True)
|
|
|
|
|
|
def human_bytes(B):
|
|
B = float(B)
|
|
KB = float(1024)
|
|
MB = float(KB ** 2)
|
|
GB = float(KB ** 3)
|
|
TB = float(KB ** 4)
|
|
|
|
if B < KB:
|
|
return '{0} {1}'.format(B,'B')
|
|
elif KB <= B < MB:
|
|
return '{0:.2f} KiB'.format(B/KB)
|
|
elif MB <= B < GB:
|
|
return '{0:.2f} MiB'.format(B/MB)
|
|
elif GB <= B < TB:
|
|
return '{0:.2f} GiB'.format(B/GB)
|
|
elif TB <= B:
|
|
return '{0:.2f} TiB'.format(B/TB)
|
|
else:
|
|
raise ValueError("That's fucking huge!")
|
|
|
|
|
|
def icon_as_pixbuf(name, height=16, ext='svg'):
|
|
filename = get_app().path.resources.join('icons').join(f'{name}.{ext}')
|
|
return GdkPixbuf.Pixbuf.new_from_file_at_scale(filename, -1, height, True)
|
|
|
|
|
|
def icon_set(widget, *args, **kwargs):
|
|
widget.set_from_pixbuf(icon_as_pixbuf(*args, **kwargs))
|
|
return widget
|
|
|
|
|
|
def install_desktop_file(overwrite=False):
|
|
app = get_app()
|
|
deskfile = Path.xdg.data.join('applications/barkshark-web.desktop')
|
|
deskfile.parent.mkdir()
|
|
workdir = app.path.script.parent
|
|
executable = appimage_exec or f'{sys.executable} -m barkshark_web'
|
|
|
|
if deskfile.exists() and not overwrite:
|
|
logging.debug(f'Desktop file already exists: {deskfile}')
|
|
return
|
|
|
|
config = ConfigObj()
|
|
config.filename = deskfile
|
|
|
|
config.update({
|
|
'Desktop Entry': {
|
|
'Name': 'Barkshark Web',
|
|
'GenericName': 'Web Browser',
|
|
'Comment': 'Browse the World Wide Web',
|
|
'Categories': ';'.join(categories),
|
|
'Icon': 'applications-internet',
|
|
'MimeType': ';'.join(mimetypes),
|
|
'Exec': f'{sys.executable} -m barkshark_web %U',
|
|
'Path': workdir,
|
|
'StartupNotify': True,
|
|
'StartupWMClass': 'BarksharkWeb',
|
|
'Type': 'Application',
|
|
'X-DBUS-ServiceName': 'xyz.barkshark.Web',
|
|
'X-DBUS-StartupType': 'Unique'
|
|
},
|
|
'Desktop Action NewTab': {
|
|
'Name': 'New Tab',
|
|
'Exec': f'{sys.executable} -m barkshark_web tab new',
|
|
'Path': workdir
|
|
}
|
|
})
|
|
|
|
if appimage_exec:
|
|
del config['Desktop Entry']['Path']
|
|
del config['Desktop Action NewTab']['Path']
|
|
|
|
config.write()
|
|
deskfile.chmod(755)
|
|
|
|
return deskfile
|
|
|
|
|
|
def set_default():
|
|
subprocess.run(['xdg-settings' 'set' 'default-web-browser' 'barkshark-web.desktop'])
|
|
|
|
|
|
def load_js_file(name, ext=False):
|
|
cache_name = name + str(1 if ext else 0)
|
|
cached = js_cache.fetch(cache_name)
|
|
|
|
if cached:
|
|
return cached
|
|
|
|
js = get_app().path.resources.join(f'{"ext_js" if ext else "js"}/{name}.js').read()
|
|
js_cache.store(cache_name, js)
|
|
return js
|
|
|
|
|
|
def new_pixbuf(image):
|
|
if isinstance(image, (cairo.Surface, cairo.ImageSurface)):
|
|
return Gdk.pixbuf_get_from_surface(image, 0, 0, image.get_width(), image.get_height())
|
|
|
|
elif isinstance(image, Path):
|
|
return GdkPixbuf.Pixbuf.new_from_file(image)
|
|
|
|
elif not isinstance(image, GdkPixbuf.Pixbuf):
|
|
raise TypeError('Image is not a Cairo Surface, Pixbuf, or Path object')
|
|
|
|
return image
|
|
|
|
|
|
def resolve_address(domain, type=None):
|
|
if not domain:
|
|
return
|
|
|
|
domain = domain.strip()
|
|
|
|
if not type:
|
|
type == 'AAAA'
|
|
|
|
try:
|
|
return random.choice(resolve(domain, type)).address
|
|
|
|
except TypeError:
|
|
# Not sure why this error is coming from dns.enum, so ignore it
|
|
pass
|
|
|
|
except NXDOMAIN as e:
|
|
if type != 'AAAA':
|
|
raise e
|
|
|
|
return random.choice(resolve(domain, 'A')).address
|
|
|
|
|
|
def run_in_gui_thread(func, *args, **kwargs):
|
|
GLib.idle_add(lambda *unused: func(*args, **kwargs))
|
|
|
|
|
|
def scale_pixbuf(pixbuf, new_size=16, keep='height'):
|
|
width = pixbuf.get_width()
|
|
height = pixbuf.get_height()
|
|
|
|
## (size ratio) * width * width to height ratio
|
|
new_width = (new_size / width) * width
|
|
new_height = (new_size / height) * height
|
|
|
|
if keep == 'width':
|
|
new_height *= height/width
|
|
|
|
elif keep == 'height':
|
|
new_width *= width/height
|
|
|
|
return pixbuf.scale_simple(new_width, new_height, GdkPixbuf.InterpType.BILINEAR)
|
|
|
|
|
|
def set_image(widget, image, size=16, keep='height'):
|
|
try:
|
|
image = scale_pixbuf(new_pixbuf(image), size, keep)
|
|
widget.set_from_pixbuf(image)
|
|
return widget
|
|
|
|
except TypeError:
|
|
pass
|
|
|
|
if isinstance(image, str):
|
|
widget.set_from_icon_name(image, Gtk.IconSize.SMALL_TOOLBAR)
|
|
widget.set_pixel_size(size)
|
|
|
|
else:
|
|
raise TypeError('Image is not a Cairo Surface, Pixbuf, Path object, or an icon name (str)')
|
|
|
|
return widget
|
|
|
|
|
|
def set_margin(widget, size=5, horizontal=True, vertical=True):
|
|
if vertical:
|
|
widget.set_margin_top(size)
|
|
widget.set_margin_bottom(size)
|
|
|
|
if horizontal:
|
|
widget.set_margin_left(size)
|
|
widget.set_margin_right(size)
|
|
|
|
|
|
def set_proc_name(name='pyweb'):
|
|
## Currently only works on unix OSs with libc
|
|
if system() == 'Windows':
|
|
return
|
|
|
|
try:
|
|
libc = cdll.LoadLibrary('libc.so.6')
|
|
buff = create_string_buffer(len(name) + 1)
|
|
buff.value = name.encode("UTF-8")
|
|
ret = libc.prctl(15, byref(buff), 0, 0, 0)
|
|
|
|
except OSError:
|
|
logging.debug('Cannot find libc')
|
|
ret = 1
|
|
|
|
if ret != 0:
|
|
logging.error('Failed to set process title')
|
|
|
|
return ret
|
|
|
|
|
|
class Thread(threading.Thread):
|
|
def __init__(self, func, *args, **kwargs):
|
|
super().__init__(daemon=True)
|
|
self.func = func or self.run_func
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.stop_event = threading.Event()
|
|
|
|
if not callable(self.func):
|
|
raise TypeError('func argument must be a callable')
|
|
|
|
|
|
def run(self):
|
|
self.func(*self.args, **self.kwargs)
|
|
|
|
|
|
def stop(self):
|
|
self.stop_event.set()
|
|
|
|
|
|
def stopped(self):
|
|
return self.stop_event.is_set()
|
|
|
|
|
|
def run_func(self, *args, **kwargs):
|
|
pass
|
|
|
|
|
|
class TimeoutCallback(Thread):
|
|
def __init__(self, timeout, func, *args, **kwargs):
|
|
super().__init__(func, *args, **kwargs)
|
|
self.timeout = timeout
|
|
|
|
def run(self):
|
|
time.sleep(self.timeout)
|
|
super().run()
|