This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/bsweb/handlers/signal.py

391 lines
10 KiB
Python

import json, mimetypes, ftplib
from os.path import isfile, basename, splitext, join, dirname, abspath
from urllib.parse import urlparse, unquote
from ipaddress import ip_address as ipaddr
from socket import gethostbyname as dig
from collections import namedtuple
import tldextract
from IzzyLib import logging
from IzzyLib.template import *
from jinja2.exceptions import TemplateNotFound
from gi.repository import WebKit2, Gio, GLib
from ..config import var, dirs
from ..database import db
from ..functions import Error, Notif
## blocked domains and urls (will be optional in the future)
blockedDomain = [
'doubleclick.net',
'rubiconproject.com',
'outbrain.com',
'googletagservices.com',
'amazon-adsystem.com',
'optimizely.com',
'adswizz.com'
]
blockedURL = {
'youtube.com': [
'/api/stats/ads',
'/pagead/adview',
'/pagead/lvz'
'/get_midroll/info'
],
'soundcloud.com': [
'/audio-ads'
],
'facebook.com': [
'/tr']
}
def CloseTab(tab, webview):
tab.mainui.CloseTab(tab.tabid)
Gtk.Widget.destroy(webview)
def ContextMenu(tab, webview, menu, event, hit):
'''
context_is_editable
context_is_image
context_is_link
context_is_media
context_is_scrollbar
context_is_selection
'''
paste_cmd = lambda *args, plain=False: _handle_paste(*[tab, webview, hit, plain])
new_tab_cmd = lambda *args: _handle_new_tab(*[tab, webview, hit, False])
if hit.context_is_selection() or hit.context_is_editable():
#for item in menu.get_items():
#label = item.get_action().get_label().replace('_', '') if item.get_action() else None
#if label and label.lower() in ['paste']:
##print(dir(item.get_gaction()))
#menu.remove(item)
menu.set_user_data(GLib.Variant('s', 'heck'))
#paste_action = Gio.SimpleAction(name='menu-paste')
#paste_action.connect('activate', paste_cmd)
#paste_option = WebKit2.ContextMenuItem().new_from_gaction(paste_action, "Paste", None)
#menu.insert(paste_option, 3)
paste_plain_action = Gio.SimpleAction(name='menu-paste')
paste_plain_action.connect('activate', lambda *args: paste_cmd(*args, plain=True))
paste_plain_option = WebKit2.ContextMenuItem().new_from_gaction(paste_plain_action, "Paste Plain Text", None)
menu.insert(paste_plain_option, 3)
if hit.context_is_link():
for item in menu.get_items():
label = item.get_action().get_label().replace('_', '') if item.get_action() else None
if label and label.lower() in ['open link in new window']:
menu.remove(item)
new_tab_action = Gio.SimpleAction(name='menu-new-tab')
new_tab_action.connect('activate', new_tab_cmd)
new_tab_option = WebKit2.ContextMenuItem().new_from_gaction(new_tab_action, "Open Link In New Tab", None)
menu.insert(new_tab_option, 1)
return False
## There's probably a way to get webkit to handle the download, but I can't figure it out rn
def DownloadFile(window, view, download):
url = download.get_request().get_uri()
parsed = urlparse(url)
filename = basename(unquote(parsed.path))
name, ext = splitext(filename)
resp = Client.request(url)
if not resp.data:
logging.error(f'Failed to download {url}')
return
if not ext:
ext = mimetypes.guess_extension(resp.headers.get('content-type'))
newname = filename if filename else f'{parsed.netloc}-{parsed.path.replace("/", "-")}'
dest = f'{dirs.downloads}/{newname}{ext}'
else:
dest = f'{dirs.downloads}/{filename}'
if isfile(dest):
logging.debug(f'Downloaded file already exits: {dest}')
#
notif = Notif.new(f'Downloaded file already exits: {dest}')
notif.show()
return
try:
with open(dest, 'wb') as fd:
notif = Notif.new(f'Saving "{url}" as {dest}')
fd.write(resp.data)
notif = Notif.new(f'Saved "{url}" as {dest}')
except Exception as e:
notif = Notif.new(f'Failed to save "{url}" as "{dest}"')
notif.show()
def Fullscreen(tab, state, webview):
parsed = urlparse(webview.get_uri())
with db.session(False) as s:
row = s.get.permission(parsed.netloc)
if row and not row.fullscreen:
logging.debug(f'Denied fullscreen for domain: {parsed.netloc}')
return True
# the navbar and status bars don't hide for some reason
if state == 'enter':
tab.ui.navbar.hide()
tab.ui.status.hide()
tab.window.fullscreen()
tab.window.tabs.set_show_tabs(False)
else:
tab.ui.navbar.show()
tab.ui.status.show()
tab.window.unfullscreen()
tab.window.tabs.set_show_tabs(True)
def InsecureContent(tab, webview, event):
logging.debug('insecure-content:', event.value_nick)
return True
def LoadChanged(tab, webview, event):
url = webview.get_uri()
title = webview.get_title()
if title:
tab.SetTitle(title)
if tab.objects.urlbar.get_text() != url:
tab.objects.urlbar.set_text(url)
if event.value_nick == 'started':
tab.objects.stop.set_sensitive(True)
elif event.value_nick == 'committed':
for button, action in {'back': webview.can_go_back(), 'forward': webview.can_go_forward()}.items():
tab.objects.get(button).set_sensitive(action)
if any(map(url.startswith, ['http', 'https'])):
tab.ui.status.settings.set_sensitive(True)
else:
tab.ui.status.settings.set_sensitive(False)
elif event.value_nick in ['finished', 'redirected']:
tab.objects.stop.set_sensitive(False)
pass
else:
print('load-changed:', event.value_nick)
def LoadFailed(tab, webview, event, uri, error):
logging.error('Load failed:', uri, error.value_nick)
def MouseHover(tab, webview, hit, modifiers):
link_url = hit.get_property('link-uri')
if link_url:
tab.ui.status.url.set_text(link_url)
elif tab.ui.status.url.get_text() != '':
tab.ui.status.url.set_text('')
def NewWindow(tab, webview, action):
navtype = action.get_navigation_type()
url = action.get_request().get_uri()
if navtype == WebKit2.NavigationType.LINK_CLICKED:
tab.window.NewWebTab(url)
else:
print(navtype)
logging.debug(f'''New window data:
navtype: {action.get_navigation_type()}')
user action?: {action.is_user_gesture()}
is redirect: {action.is_redirect()}''')
def Notification(tab, webview, notif):
parsed = urlparse(webview.get_uri())
with db.session(False) as s:
row = s.get.permission(parsed.netloc)
if row and not row.notification:
logging.debug(f'Denied notification for domain: {parsed.netloc}')
return True
logging.debug(f'Notification: Title: {notif.get_title()}, Body: {notif.get_body()}')
def PermissionRequest(tab, webview, permrequest):
domain = urlparse(webview.get_uri()).netloc
with db.session(False) as s:
row = s.get.permission(domain)
if not row:
permrequest.deny()
return
if type(permrequest) == WebKit2.NotificationPermissionRequest and row.notification:
logging.verbose(f'Allowed notification permission request from {domain}')
permrequest.allow()
elif type(permrequest) == WebKit2.GeolocationPermissionRequest and row.location:
logging.verbose(f'Allowed location permission request from {domain}')
permrequest.allow()
elif type(permrequest) == WebKit2.UserMediaPermissionRequest:
if WebKit2.UserMediaPermissionRequest.props.is_for_audio_device and row.microphone:
logging.verbose(f'Allowed microphone permission request from {domain}')
permrequest.allow()
elif WebKit2.UserMediaPermissionRequest.props.is_for_video_device and row.camera:
logging.verbose(f'Allowed webcam permission request from {domain}')
permrequest.allow()
else:
logging.verbose(f'Rejected permission request from {domain}: {type(permrequest)}')
permrequest.deny()
print(type(permrequest))
def ResourceFilter(tab, webview, resource, request):
url = resource.get_uri()
wvurl = webview.get_uri()
parsed = urlparse(url)
wvparsed = urlparse(wvurl)
domain = tldextract.extract(parsed.netloc).registered_domain
path = parsed.path if not parsed.path.endswith('/') else parsed.path[:-1]
with db.session(False) as s:
row = s.get.permission(wvparsed.netloc)
if webview.get_uri().startswith('https://') and url.startswith('http://'):
logging.debug('Blocked insecure resource')
request.set_property('uri', 'about:blank')
return
if not url:
logging.debug('Missing url in resource')
return
if row and not row.adblock:
return
## Needed to allow twitch.tv to function properly
if parsed.netloc == 'twitch.tv' and domain == 'amazon-adsystem.com':
return
if path in blockedURL.get(domain, []) or domain in blockedDomain:
request.set_property('uri', 'about:blank')
logging.debug(f'blocked resource: {parsed.netloc}{path}')
def ScriptDialog(tab, view, dialog):
'''
WebKit2.ScriptDialog.ALERT
WebKit2.ScriptDialog.CONFIRM
WebKit2.ScriptDialog.PROMPT
WebKit2.ScriptDialog.BEFORE_UNLOAD_CONFIRM
'''
print(dialog)
#return True
## calling list_text_fields sometimes cases a crash
def SubmitForm(tab, webview, form):
form_data = form.list_text_fields()
data = DotDict()
if form_data[0]:
keys = form_data[1]
values = form_data[2]
for index, key in enumerate(keys):
data[key] = values[index]
print(data)
form.submit()
def TabCrash(tab, webview, reason):
reasons = {
'crashed': 'The web process crashed. Reload the page.'
}
msg = reasons.get(reason.value_nick, 'An error occured. Please reload')
if not reasons.get(reason.value_nick):
logging.debug('Webkit process crash', reason.value_nick)
logging.debug('Tab crashed:', tab.tabid)
tab.window.overlay.NewNotif(f'The web process crashed. Reload the page')
## This loads an error page and then goes back to the page it was on. Can cause a crash loop if the webview fails to start
#data = {
#'error': msg,
#'actions': {
#'refresh': webview.get_uri(),
#'time': 60
#}
#}
#webview.load_html(var.template.render('error.haml', data), webview.get_uri())
def TlsError(tab, webview, url, cert, error):
domain = urlparse(url).hostname
ip = ipaddr(dig(domain))
err = error.first_value_nick
with db.session(False) as s:
if ip.is_private and s.get.config('allow_local_unsigned'):
tab.context.allow_tls_certificate_for_host(cert, domain)
webview.load_uri(url)
return True
if err == 'unknown-ca':
msg = 'Unknown certificate authority'
else:
logging.debug('handlers.signal.TlsError: Unhandled error:', err)
if url == tab.ui.navbar.objects.get('urlbar').get_text():
Error(webview, msg, title='TLS Error', redirect_url=url)
return True
## helper functions
def _handle_new_tab(tab, webview, hit, switch=True):
tab.window.NewWebTab(hit.get_link_uri(), switch=switch)
def _handle_paste(tab, webview, hit, action, plain=False):
webview.execute_editing_command('Paste' if not plain else 'PasteAsPlainText')