This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/component/web_tab_webview_handler.py

748 lines
22 KiB
Python

import functools
from izzylib.exceptions import HttpClientError
from jinja2.exceptions import TemplateNotFound
from json.decoder import JSONDecodeError
from urllib.parse import urlparse, quote, unquote
from ssl import SSLCertVerificationError
from .. import cache, var
from ..base import ComponentBase
from ..enums import EditAction, Javascript, FileChooserResponse, WebviewContextActions
from ..functions import Thread, connect, run_in_gui_thread
from ..widgets import FileChooser
cert_error_msg = {
'unknown-ca': 'Unknown certificate authority for url: {url}',
'bad-identity': 'Certificate domain does not match: {url}'
}
class WebviewHandler(ComponentBase):
def __init__(self, tab):
ComponentBase.__init__(self)
self.tab = tab
self.inspector_open = False
## Webview-specific signals
connect(self.webview, 'close', self.tab.close)
connect(self.webview, 'context-menu', self.handle_context_menu, original_args=True)
connect(self.webview, 'create', self.handle_new_window, original_args=True)
#connect(self.webview, 'decide-policy', self.handle_decide_policy, original_args=True)
connect(self.webview, 'enter-fullscreen', self.handle_fullscreen, 'enter')
connect(self.webview, 'leave-fullscreen', self.handle_fullscreen, 'exit')
#connect(self.webview, 'insecure-content-detected', self.handle_insecure_content, original_args=True)
connect(self.webview, 'load-changed', self.handle_load_changed, original_args=True)
connect(self.webview, 'load-failed-with-tls-errors', self.handle_tls_error, original_args=True)
connect(self.webview, 'mouse-target-changed', self.handle_mouse_hover, original_args=True)
connect(self.webview, 'resource-load-started', self.handle_resource_load, original_args=True)
connect(self.webview, 'run-file-chooser', self.handle_file_chooser, original_args=True)
connect(self.webview, 'script-dialog', self.handle_script_dialog, original_args=True)
connect(self.webview, 'show-notification', self.handle_notification, original_args=True)
#connect(self.webview, 'submit-form', self.handle_submit_form, original_args=True)
connect(self.webview, 'permission-request', self.handle_permission_request, original_args=True)
connect(self.webview, 'web-process-terminated', self.handle_tab_crashed, original_args=True)
## Property signals
connect(self.webview, 'notify::estimated-load-progress', self.handle_load_progress, original_args=True)
connect(self.webview, 'notify::favicon', self.handle_set_favicon)
connect(self.webview, 'notify::title', self.tab.set_title)
connect(self.webview, 'notify::uri', self.tab.set_url)
connect(self.webview, 'notify::zoom-level', self.handle_zoom_level)
## Generic signals
connect(self.webview, 'scroll-event', self.handle_scroll_event, original_args=True)
## Web inspector
connect(self.inspector, 'attach', self.handle_inspector, 'attach', original_args=True)
connect(self.inspector, 'detach', self.handle_inspector, 'detach', original_args=True)
connect(self.inspector, 'closed', self.handle_inspector, 'close', original_args=True)
## History list
connect(self.webview.get_back_forward_list(), 'changed', self.handle_history_change, original_args=True)
@property
def inspector(self):
return self.webview.get_inspector()
@property
def settings(self):
return self.tab.settings
@property
def webview(self):
return self.tab.webview
def handle_context_menu(self, webview, context_menu, event, hit):
account = self.app.get_default_account()
menu = ContextMenuClass(context_menu, self.tab)
url = DotDict(
page = webview.get_uri(),
link = hit.get_link_uri(),
image = hit.get_image_uri(),
media = hit.get_media_uri()
)
data = DotDict(
link_text = hit.get_link_label(),
selection = hit.context_is_selection(),
editable = hit.context_is_editable(),
link = hit.context_is_link(),
media = hit.context_is_media(),
image = hit.context_is_image(),
scrollbar = hit.context_is_scrollbar()
)
for key, value in url.items():
if not value:
continue
url[key] = Url(value)
with self.app.db.session as s:
permissions = s.get_permission(url.page.hostname())
if data.link:
menu.new_action('link_open', 'Open Link', self.tab.load_url, url.link)
menu.new_action('link_new', 'Open Link In New Tab', self.window.tab_new, url.link, switch=True)
menu.new_action('link_bg', 'Open Link In Background Tab', self.window.tab_new, url.link)
menu.new_stock('link_copy', 'Copy Link Url', WebviewContextActions.LINK_COPY)
menu.new_stock('link_dl', 'Download Link', WebviewContextActions.LINK_DOWNLOAD)
menu.new_sep()
if data.media:
menu.new_stock('media_play', 'Play', WebviewContextActions.MEDIA_PLAY)
menu.new_stock('media_pause', 'Pause', WebviewContextActions.MEDIA_PAUSE)
menu.new_stock('media_loop', 'Toggle Loop', WebviewContextActions.MEDIA_LOOP)
menu.new_stock('media_ctrl', 'Toggle Controls', WebviewContextActions.MEDIA_TOGGLE)
menu.new_action('media_open', 'Open Media', self.tab.load_url, url.media)
menu.new_action('media_new', 'Open Media In New Tab', self.window.tab_new, url.media, switch=True)
menu.new_action('media_bg', 'Open Media In Background Tab', self.window.tab_new, url.media)
menu.new_action('media_dl', 'Download Media', self.app.context.download_uri, url.media)
menu.new_sep()
if data.image:
menu.new_action('image_open', 'Open Image', self.tab.load_url, url.image)
menu.new_action('image_new', 'Open Image In New Tab', self.window.tab_new, url.image, switch=True)
menu.new_action('image_bg', 'Open Image In Background Tab', self.window.tab_new, url.image)
menu.new_stock('image_dl', 'Download Image', WebviewContextActions.IMAGE_DOWNLOAD)
menu.new_stock('image_url', 'Copy Image Url', WebviewContextActions.IMAGE_COPY)
menu.new_stock('image_copy', 'Copy Image', WebviewContextActions.IMAGE_COPY_FULL)
menu.new_sep()
if data.editable:
menu.new_webview_action('edit_undo', 'Undo', 'Undo')
menu.new_webview_action('edit_redo', 'Redo', 'Redo')
menu.new_stock('edit_paste', 'Paste', WebviewContextActions.TEXT_PASTE)
menu.new_stock('edit_plain', 'Paste Plain Text', WebviewContextActions.TEXT_PASTE_PLAIN)
if data.selection:
menu.new_stock('edit_copy', 'Copy', WebviewContextActions.TEXT_COPY)
if data.selection and data.editable:
menu.new_stock('edit_cut', 'Cut', WebviewContextActions.TEXT_CUT)
menu.new_stock('edit_delete', 'Delete', WebviewContextActions.TEXT_DELETE)
menu.new_stock('edit_select', 'Select All', WebviewContextActions.TEXT_SELECT)
if data.selection or data.editable:
menu.new_sep()
if data.selection:
menu.new_action(
'search',
'Search or Go',
self.tab.run_js,
Javascript.SELECTION,
self.callback_new_tab
)
search_menu = menu.new_sub('search_menu', 'Search via..')
with self.app.db.session as s:
default = s.get_config('default_search')
for row in sorted(s.fetch('search'), key=lambda x:x['name']):
search_menu.new_action(
row.keyword,
row.name,
self.tab.run_js,
Javascript.SELECTION,
self.callback_search,
row.keyword
)
menu.new_sep()
if url.page.proto != var.local:
menu.new_action('print', 'Print Page', self.tab.page_action, 'print')
if url.page.proto in ['http', 'https']:
menu.new_action('source', 'View Page Source', self.tab.page_action, 'source')
menu.new_stock('inspect', 'Inspect Element', WebviewContextActions.INSPECT)
return False
# Can't get the new url, so this is pointless
def handle_decide_policy(self, webview, decision, decision_type):
url = self.tab.url
if decision_type == WebKit2.PolicyDecisionType.NAVIGATION_ACTION:
if url.proto == 'file':
decision.ignore()
webview.load_uri(url.replace_property('proto', 'local'))
return True
if url.proto == 'ftp':
decision.ignore()
webview.load_uri(url.replace_property('proto', 'filetp'))
return True
## Can't just pass oauth:// urls to mastodon's oauth, so here's a workaround
if url.startswith('https://pyweb-oauth'):
url = url.replace('https://pyweb-oauth', 'oauth://', 1)
webview.load_uri(url)
decision.ignore()
return True
try:
if self.tab.url.top in ['facebook.com']:
decision.ignore()
return True
except:
pass
if not url.startswith(var.local):
with self.app.db.session as s:
permissions = s.get_permission(url.hostname())
decision.use_with_policies(WebKit2.WebsitePolicies(autoplay = bool(permissions.autoplay)))
def handle_file_chooser(self, webview, chooser):
with self.db.session as s:
fc = FileChooser(self.window,
save = False,
multiple = chooser.get_select_multiple(),
path = s.get_config('download_dir')
)
fc.set_callback(FileChooserResponse.OK, self.handle_file_chooser_finish, chooser)
fc.set_callback(FileChooserResponse.CANCEL, self.handle_file_chooser_cancel, chooser)
filters = chooser.get_mime_types_filter()
if filters:
fc.add_filter(filters)
fc.run()
return True
def handle_file_chooser_cancel(self, chooser):
chooser.cancel()
logging.verbose('Canceled file open')
def handle_file_chooser_finish(self, chooser, paths):
if not isinstance(paths, (tuple, list, set)):
paths = [paths]
chooser.select_files(paths)
def handle_fullscreen(self, webview, state):
host = self.tab.url.hostname()
navbar = self.tab['navbar']
statusbar = self.window['statusbar']
tabs = self.window['tabs']
with self.app.db.session as s:
row = s.get_permission(host)
if not row.fullscreen:
self.window.notification(f'Denied fullscreen for domain: {host}', 'error')
return True
# the navbar and status bars don't hide for some reason
if state == 'enter':
self.window.fullscreen()
navbar.hide()
statusbar.hide()
tabs.set_show_tabs(False)
else:
self.window.unfullscreen()
navbar.show()
statusbar.show()
tabs.set_show_tabs(True)
def handle_history_change(self, history, added, removed):
self.tab.set_history_button_state()
def handle_insecure_content(self, webview, event):
logging.debug('insecure-content:', event.value_nick)
return True
def handle_inspector(self, inspector, action):
with self.app.db.session as s:
config = s.get_config('detach_inspector')
if action == 'attach':
if config and inspector.is_attached():
inspector.detach()
return True
elif action == 'detach':
if not config and not inspector.is_attached():
inspector.attach()
return True
else:
self.inspector_open = False
return
self.inspector_open = True
def handle_load_changed(self, webview, event):
url = self.tab.url
if event.value_nick == 'started':
self.tab._data.favicon = False
elif event.value_nick == 'committed':
self.tab.set_button_state()
self.tab.search_action('close')
if not url:
return
#self.tab.set_favicon_from_cache(url)
## Only enable the page cache for http(s) urls
self.settings.set('enable-page-cache', url.proto in ['http', 'https'])
with self.app.db.session as s:
permissions = s.get_permission(url.hostname())
## This only prevents non-muted media from playing :/
self.settings.set('media-playback-requires-user-gesture', not permissions.autoplay)
elif event.value_nick == 'redirected':
pass
elif event.value_nick == 'finished':
if self.window.active_tab == self.tab:
self.window.set_button_state(self.tab.id)
self.handle_set_favicon()
self.tab.set_button_state()
if not url:
return
if url.proto == 'https':
FediverseCheck(None, self.tab, url).start()
elif url.proto in ['http', 'sftp', 'filetp']:
with self.app.db.session as s:
s.put_history_from_tab(self.tab)
def handle_load_failed(self, webview, event, uri, error):
logging.error('Load failed:', uri, error.value_nick)
def handle_load_progress(self, webview, _):
progress = webview.get_estimated_load_progress()
finished = True if progress == 1.0 else False
self.tab['navbar-url'].set_progress_fraction(0 if progress == 1.0 else progress)
if finished:
self.tab.set_button_state()
def handle_mouse_hover(self, webview, hit, _):
link_url = hit.get_property('link-uri')
self.window['statusbar-url'].set_text(link_url if link_url else '')
def handle_resource_load(self, webview, resource, request):
resource.connect('failed', self.handle_resource_failed)
resource.connect('finished', self.handle_resource_finished)
def handle_resource_failed(self, resource, error):
try:
url = Url(resource.get_uri())
except Exception as e:
print(f'{class_name(e)}: {e}')
return
if error.domain == 'WebKitPolicyError' and 'interrupted' in error.message.lower():
if not url.mimetype:
self.window.notification(f'Cannot open url: {url}', 'warning')
elif not self.webview.can_show_mime_type(url.mimetype):
self.app.context.download_uri(url)
return
if error.domain not in ['HandlerError', 'soup-http-error-quark', 'WebKitPolicyError']:
if error.domain == 'WebKitNetworkError' and error.message.lower() == 'load request cancelled':
return
logging.debug(f'''Resource failed to load: {resource.get_uri()}
- domain: {error.domain}
- message: {error.message}''')
def handle_resource_finished(self, resource):
return
def handle_new_window(self, webview, action):
navtype = action.get_navigation_type()
user_action = action.is_user_gesture()
url = action.get_request().get_uri()
if not user_action:
logging.debug('handle_new_window: Not a user action: "{url}"')
return
if navtype in [WebKit2.NavigationType.FORM_SUBMITTED, WebKit2.NavigationType.LINK_CLICKED, WebKit2.NavigationType.OTHER]:
self.window.tab_new(url, switch=True)
else:
logging.debug('handle_new_window: unhandled nav type:', navtype)
def handle_notification(self, webview, notif):
try:
host = self.tab.url.hostname()
## Not a url, so deny permission
except AttributeError:
return True
with self.app.db.session as s:
row = s.get_permission(host)
if not row.notification:
logging.debug(f'Denied notification for domain: {host}')
return True
def handle_permission_request(self, webview, permrequest):
try:
host = self.tab.url.hostname()
## Not a url, so deny perlmission
except AttributeError:
permrequest.deny()
return
with self.db.session as s:
row = s.get_permission(host)
if type(permrequest) == WebKit2.NotificationPermissionRequest and row.notification:
logging.verbose(f'Allowed notification permission request from {host}')
permrequest.allow()
elif type(permrequest) == WebKit2.GeolocationPermissionRequest and row.location:
logging.verbose(f'Allowed location permission request from {host}')
permrequest.allow()
elif type(permrequest) == WebKit2.UserMediaPermissionRequest:
if permrequest.props.is_for_audio_device and row.microphone:
logging.verbose(f'Allowed microphone permission request from {host}')
permrequest.allow()
elif permrequest.props.is_for_video_device and row.camera:
logging.verbose(f'Allowed webcam permission request from {host}')
permrequest.allow()
else:
logging.verbose(f'Rejected permission request from {host}: {type(permrequest)}')
permrequest.deny()
def handle_script_dialog(self, webview, dialog):
try:
host = self.tab.url.hostname()
except:
return True
with self.db.session as s:
row = s.get_permission(host)
if not row.dialog:
logging.verbose('JS dialogs not allowed for domain:', host)
return True
def handle_scroll_event(self, webview, event):
if not event.state == Gdk.ModifierType.CONTROL_MASK:
return
zoom = webview.get_zoom_level()
if event.direction == Gdk.ScrollDirection.DOWN:
self.tab.page_action('zoom', zoom=zoom - 0.1)
elif event.direction == Gdk.ScrollDirection.UP:
self.tab.page_action('zoom', zoom=zoom + 0.1)
elif event.direction in {Gdk.ScrollDirection.LEFT, Gdk.ScrollDirection.RIGHT}:
self.tab.page_action('zoom', zoom=1.0)
return True
def handle_scroll_event2(self, webview, event):
if not event.state == Gdk.ModifierType.CONTROL_MASK:
return
zoom = webview.get_zoom_level()
if event.direction == Gdk.ScrollDirection.DOWN:
if zoom <= 0.2:
return True
webview.set_zoom_level(zoom - 0.1)
elif event.direction == Gdk.ScrollDirection.UP:
if zoom >= 4.0:
return True
webview.set_zoom_level(zoom + 0.1)
elif event.direction in {Gdk.ScrollDirection.LEFT, Gdk.ScrollDirection.RIGHT}:
webview.set_zoom_level(1.0)
return True
## calling list_text_fields causes a crash half the time
def handle_submit_form(self, webview, form):
form_data = form.list_text_fields()
data = DotDict()
if form_data[0]:
keys = form_data[1]
values = form_data[2]
for index, key in enumerate(keys):
data[key] = values[index]
logging.debug(data)
form.submit()
def handle_tab_crashed(self, webview, reason):
reasons = {
'crashed': 'Web process crashed. Reload the page.',
'exceeded_memory_limit': 'Web process exceeded the memory limit. Reload the page.'
}
self.tab['navbar-stop'].set_sensitive(False)
self.tab['navbar-refresh'].set_sensitive(True)
self.window.notification(reasons[reason.value_nick], 'ERROR')
def handle_tls_error(self, webview, url, cert, error):
url = Url(url)
err = error.first_value_nick
with self.db.session as s:
if s.get_config('allow_local_unsigned') and url.address.is_type('local', 'loopback', 'private'):
self.tab.context.allow_tls_certificate_for_host(cert, url.hostname())
webview.load_uri(url)
return True
msg = cert_error_msg.get(err, f'TLS error: {url}')
if not msg:
logging.debug('handlers.signal.TlsError: Unhandled error:', err)
context = {'error_message': msg.format(url=url), 'title': 'TLS Error'}
webview.load_html(self.app.template.render('error.haml', context))
return True
def handle_set_url(self, webview, url):
self.tab.set_url()
def handle_set_favicon(self, *_):
if not self.tab._data.favicon:
self.tab.set_favicon_from_cache(self.tab.url)
def handle_set_title(self, *_):
self.tab.set_title()
def handle_zoom_level(self, *_):
zoom = int(round(self.webview.get_zoom_level(), 1) * 100)
self.tab['navbar-zoom'].set_label(f'{zoom}%')
def callback_new_tab(self, url, switch=True):
if not url:
self.window.notification('No selection found', 'error')
return
self.window.tab_new(url, switch=switch)
def callback_search(self, text, key=None):
if not text:
self.window.notification('No selection found', 'error')
return
with self.app.db.session as s:
keyword = key or s.get_config('default_search')
search = s.get_search(keyword)
self.window.tab_new(search.compile(text), switch=True)
## I'd like to subclass WebKit2.ContextMenu if possible
class ContextMenuClass(DotDict):
def __init__(self, menu, tab):
super().__init__()
self.menu = menu or WebKit2.ContextMenu.new()
self.tab = tab
self.window = tab.window
self.webview = tab.webview
self.menu.remove_all()
def new_action(self, name, label, callback, *args, **kwargs):
action = Gio.SimpleAction(name=name)
connect(action, 'activate', callback, *args, **kwargs)
item = WebKit2.ContextMenuItem().new_from_gaction(action, label, None)
self[name] = item
self.menu.append(item)
def new_stock(self, name, label, action):
if not isinstance(action, WebviewContextActions):
raise TypeError(f'Webview context action must be a WebviewContextActions, not {class_name(action)}')
#action = WebviewContextActions[action.upper()]
item = WebKit2.ContextMenuItem.new_from_stock_action_with_label(action.value, label)
self[name] = item
self.menu.append(item)
def new_webview_action(self, name, label, action):
action = Gio.SimpleAction(name=name)
connect(action, 'activate', self.webview.execute_editing_command, action)
item = WebKit2.ContextMenuItem().new_from_gaction(action, label, None)
self[name] = item
self.menu.append(item)
## I'll probably have to mess with the class itself to use this properly
def new_sub(self, name, label):
menu = WebKit2.ContextMenu.new()
menu_class = ContextMenuClass(menu, self.tab)
self[name] = menu_class
self.menu.append(WebKit2.ContextMenuItem.new_with_submenu(label, menu))
return menu_class
def new_sep(self):
self.menu.append(WebKit2.ContextMenuItem.new_separator())
class FediverseCheck(Thread):
def run_func(self, tab, url):
acct = tab.app.get_default_account()
post = {}
with tab.db.session as s:
permissions = s.get_permission(url.hostname())
history_row = s.get_history(url)
is_post = bool(history_row.post) if history_row else None
nodeinfo = None
if not acct:
s.put_history_from_tab(tab)
return
if s.get_config('ap_check') and permissions.instance == None:
try:
nodeinfo = tab.app.http_client.fetch_nodeinfo(permissions.domain)
except SSLCertVerificationError:
logging.verbose(f'FediverseCheck: SSL Error for domain: {url.hostname()}')
return
except ConnectionRefusedError:
logging.verbose(f'FediverseCheck: Failed to connect to domain: {url.hostname()}')
return
except HttpClientError as e:
if e.status in range(500, 600):
logging.verbose(f'FediverseCheck: Server error: {e.status}: {e.message}')
return
if e.status == 404:
is_post = False
except JSONDecodeError:
is_post = False
except Exception as e:
logging.error(f'FediverseCheck: {type(e).__name__}: {e}')
return
is_instance = True if nodeinfo and nodeinfo.get('software') else False
logging.debug(f'Result of instance check for {permissions.domain}: {is_instance}')
permissions = s.put_permission(permissions.domain, 'instance', is_instance)
if s.get_config('post_check') and permissions and permissions.instance and (is_post in [None, True]):
if url.path.startswith('/web'):
is_post = False
elif not (post := cache.posts.fetch(url)):
post = acct.fetch_post(url)
cache.posts.store(url, post)
is_post = True if post else False
logging.debug(f'Result of post check for {url}: {is_post}')
s.put_history(url, tab.title, is_post)
tab._data.post = post