426 lines
9.1 KiB
Python
426 lines
9.1 KiB
Python
import hashlib
|
|
import threading
|
|
import traceback
|
|
import watchfiles
|
|
|
|
from configparser import ConfigParser
|
|
from izzylib.logging import LogLevel
|
|
from izzylib.misc import class_name
|
|
from izzylib.object_base import ObjectBase
|
|
from izzylib.path import Path
|
|
from izzylib.url import Url
|
|
from izzylib_http_async import Template
|
|
|
|
from . import var, __version__ as version, __software__ as swname
|
|
from .base import ComponentBase
|
|
from .enums import StylePriority, WatcherChangeType
|
|
from .functions import run_in_gui_thread
|
|
|
|
|
|
class Themes(ComponentBase, ObjectBase):
|
|
def __init__(self, window):
|
|
ComponentBase.__init__(self)
|
|
ObjectBase.__init__(self,
|
|
window = window,
|
|
main = None,
|
|
current = None,
|
|
current_system = None,
|
|
watcher = None,
|
|
user = DotDict(),
|
|
system = DotDict(),
|
|
readonly_props = ['window', 'user', 'system']
|
|
)
|
|
|
|
self.setup()
|
|
|
|
|
|
def __del__(self):
|
|
self.watcher_stop()
|
|
|
|
|
|
@property
|
|
def screen(self):
|
|
return Gdk.Screen.get_default()
|
|
|
|
|
|
@property
|
|
def systempath(self):
|
|
return self.app.path.script.join('systhemes')
|
|
|
|
|
|
@property
|
|
def userpath(self):
|
|
return self.app.path.themes
|
|
|
|
|
|
def get_theme_by_file(self, path, system=False):
|
|
themes = self.system.values() if system else self.user.values()
|
|
|
|
for theme in themes:
|
|
if theme.path in path:
|
|
return theme
|
|
|
|
raise KeyError(f'Path not in any theme: {path}')
|
|
|
|
|
|
def get_theme_by_property(self, key, value, system=False):
|
|
themes = self.system.values() if system else self.user.values()
|
|
|
|
for theme in themes:
|
|
if theme.get_property(key) == value:
|
|
return theme
|
|
|
|
raise KeyError(f'Cannot find theme with property: {key}="{value}"')
|
|
|
|
|
|
def list_unloaded(self):
|
|
for path in self.userpath.listdir(recursive=False):
|
|
if path.isdir() and path.name not in self.user:
|
|
yield path
|
|
|
|
|
|
def load_main(self):
|
|
if self.main:
|
|
Gtk.StyleContext.remove_provider_for_screen(self.screen, self.main)
|
|
|
|
self.main = Gtk.CssProvider()
|
|
self.main.load_from_file(Gio.File.new_for_path(self.app.path.resources.join('main.css')))
|
|
|
|
Gtk.StyleContext.add_provider_for_screen(self.screen, self.main, StylePriority.SETTINGS)
|
|
|
|
|
|
def set(self, hash, save=True):
|
|
theme = None
|
|
|
|
if hash == 'default':
|
|
self.unset()
|
|
self.unset_system()
|
|
|
|
#logging.verbose('Set theme to system')
|
|
|
|
else:
|
|
if hash in self.system:
|
|
theme = self.set_system(hash, True)
|
|
|
|
elif hash in self.user:
|
|
theme = self.set_user(hash)
|
|
|
|
else:
|
|
raise KeyError(f'Cannot find theme with hash: {hash}') from None
|
|
|
|
#logging.verbose(f'Set theme to {theme.name}')
|
|
|
|
if save:
|
|
with self.db.session as s:
|
|
s.put_config('theme', hash)
|
|
|
|
return theme
|
|
|
|
|
|
def set_user(self, hash):
|
|
theme = self.user[hash]
|
|
|
|
if self.current == hash:
|
|
return theme
|
|
|
|
self.unset_user()
|
|
|
|
if not theme.base:
|
|
self.unset_system()
|
|
|
|
elif theme.base != self.current_system:
|
|
systheme = self.get_theme_by_property('name', theme.base.title(), system=True)
|
|
self.set_system(systheme.hash)
|
|
|
|
Gtk.StyleContext.add_provider_for_screen(self.screen, theme.get_provider(), StylePriority.USER)
|
|
self.current = theme.hash
|
|
|
|
return theme
|
|
|
|
|
|
def set_system(self, hash, unset_user=False):
|
|
theme = self.system[hash]
|
|
|
|
if unset_user:
|
|
self.unset_user()
|
|
|
|
if self.current_system == hash:
|
|
return theme
|
|
|
|
elif self.current_system:
|
|
self.unset_system()
|
|
|
|
Gtk.StyleContext.add_provider_for_screen(self.screen, theme.get_provider(), StylePriority.APPLICATION)
|
|
self.current_system = theme.hash
|
|
|
|
return theme
|
|
|
|
|
|
def setup(self):
|
|
self.load_main()
|
|
|
|
for path in self.systempath.listdir(recursive=False):
|
|
if path.isdir():
|
|
theme = Theme(path)
|
|
self.system[theme.hash] = theme
|
|
logging.verbose(f'Loaded system theme: {theme.name}')
|
|
|
|
for path in self.list_unloaded():
|
|
try:
|
|
theme = Theme(path)
|
|
self.user[theme.hash] = theme
|
|
logging.verbose(f'Loaded user theme: {theme.name} by {theme.author}')
|
|
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
|
|
with self.db.session as s:
|
|
try:
|
|
self.set(s.get_config('theme', 'default'), save=False)
|
|
|
|
except KeyError:
|
|
logging.warning(f'Theme with hash does not exist: {hash}')
|
|
|
|
|
|
def unset(self):
|
|
self.unset_user()
|
|
self.unset_system()
|
|
|
|
|
|
def unset_user(self):
|
|
if not self.current:
|
|
return
|
|
|
|
Gtk.StyleContext.remove_provider_for_screen(self.screen, self.user[self.current].provider)
|
|
self.current = None
|
|
|
|
|
|
def unset_system(self):
|
|
if not self.current_system:
|
|
return
|
|
|
|
Gtk.StyleContext.remove_provider_for_screen(self.screen, self.system[self.current_system].provider)
|
|
self.current_system = None
|
|
|
|
|
|
def watcher_start(self):
|
|
if not self.watcher:
|
|
self.watcher = ThemeWatcher(self)
|
|
|
|
self.watcher.start()
|
|
|
|
|
|
def watcher_stop(self):
|
|
if not self.watcher:
|
|
return
|
|
|
|
self.watcher.stop()
|
|
|
|
|
|
class Theme(ObjectBase):
|
|
def __init__(self, path):
|
|
ObjectBase.__init__(self,
|
|
path = path,
|
|
provider = Gtk.CssProvider(),
|
|
name = None,
|
|
author = None,
|
|
url = None,
|
|
license = None,
|
|
entry = None,
|
|
base = None,
|
|
hash = None
|
|
)
|
|
|
|
self.load_manifest()
|
|
self.provider.connect('parsing-error', self.handle_parsing_error)
|
|
|
|
|
|
def _parse_property(self, key, value):
|
|
if value == None:
|
|
return
|
|
|
|
if key == 'path':
|
|
if not isinstance(value, Path):
|
|
return path(value)
|
|
|
|
elif key == 'base':
|
|
if not value or value.lower().strip() in ['', 'null', 'none']:
|
|
return None
|
|
|
|
elif key == 'url':
|
|
if not isinstance(value, Url):
|
|
return Url(value)
|
|
|
|
return value
|
|
|
|
|
|
def get_provider(self):
|
|
if not self.provider.to_string():
|
|
self.load()
|
|
|
|
return self.provider
|
|
|
|
|
|
def load(self):
|
|
self.provider.load_from_path(self.path.join(self.entry))
|
|
|
|
|
|
def load_manifest(self):
|
|
path = self.path.join('manifest.ini')
|
|
|
|
if not path.exists():
|
|
raise FileNotFoundError(f'Cannot find manifest for theme: {self.path.name}')
|
|
|
|
config = ConfigParser()
|
|
config.read(self.path.join('manifest.ini'))
|
|
|
|
try:
|
|
info = config['info']
|
|
settings = config['settings']
|
|
|
|
self.name = info['name']
|
|
self.author = info['author']
|
|
self.url = info.get('url')
|
|
self.license = info.get('license')
|
|
|
|
self.entry = settings['entry']
|
|
self.base = settings.get('base')
|
|
|
|
except KeyError as e:
|
|
raise KeyError(f'Failed to parse manifest for "{self.path.name}": {e}') from None
|
|
|
|
if not self.entry:
|
|
raise ValueError(f'No entry css specified for theme: {self.name}')
|
|
|
|
value = self.name + self.author + self.entry
|
|
self.hash = hashlib.sha256(value.encode('utf-8')).hexdigest()
|
|
|
|
|
|
def handle_parsing_error(self, provider, section, error):
|
|
logging.error(f'[{self.name}] {error.message} {section.get_start_line()}')
|
|
|
|
|
|
class ThemeWatcher(ComponentBase):
|
|
def __init__(self, themes):
|
|
self.themes = themes
|
|
self._stop = threading.Event()
|
|
self._stopped = threading.Event()
|
|
self._stopped.set()
|
|
|
|
|
|
@property
|
|
def path(self):
|
|
return self.app.path
|
|
|
|
|
|
@property
|
|
def systempath(self):
|
|
return self.themes.systempath
|
|
|
|
|
|
@property
|
|
def userpath(self):
|
|
return self.themes.userpath
|
|
|
|
|
|
def start(self):
|
|
if not self._stopped.is_set():
|
|
return
|
|
|
|
thread = threading.Thread(target=self.handle_file_watcher)
|
|
thread.start()
|
|
|
|
logging.verbose('Started theme watcher')
|
|
|
|
|
|
def stop(self):
|
|
if self._stopped.is_set():
|
|
return
|
|
|
|
self._stop.set()
|
|
self._stopped.wait()
|
|
|
|
logging.verbose('Stopped theme watcher')
|
|
|
|
|
|
def handle_file_watcher(self):
|
|
self._stopped.clear()
|
|
|
|
watcher = watchfiles.watch(
|
|
self.path.resources,
|
|
self.systempath,
|
|
self.userpath,
|
|
recursive = True,
|
|
force_polling = True,
|
|
poll_delay_ms = 250,
|
|
stop_event = self._stop
|
|
)
|
|
|
|
for changes in watcher:
|
|
for change in changes:
|
|
try:
|
|
action = change[0]
|
|
path = Path(change[1])
|
|
|
|
except IndexError:
|
|
continue
|
|
|
|
try:
|
|
if path.startswith(self.app.path.resources):
|
|
if path.name == 'main.css':
|
|
run_in_gui_thread(self.themes.load_main)
|
|
|
|
elif any(map(path.startswith, [self.themes.systempath, self.themes.userpath])):
|
|
system = path.startswith(self.themes.systempath)
|
|
|
|
try: theme = self.themes.get_theme_by_file(path, system=systempath)
|
|
except KeyError: theme = None
|
|
|
|
if theme and action == WatcherChangeType.MODIFY:
|
|
run_in_gui_thread(self.handle_theme_modified, theme, path)
|
|
|
|
elif theme and action == WatcherChangeType.DELETE:
|
|
if path.name == 'manifest.ini':
|
|
run_in_gui_thread(self.handle_theme_deleted, theme, path)
|
|
|
|
else:
|
|
run_in_gui_thread(self.handle_theme_modified, theme, path)
|
|
|
|
elif action == WatcherChangeType.CREATE:
|
|
run_in_gui_thread(self.handle_theme_created, path)
|
|
|
|
except:
|
|
traceback.print_exc()
|
|
|
|
self._stopped.set()
|
|
self._stop.clear()
|
|
|
|
|
|
def handle_theme_created(self, path):
|
|
if not (path.name == 'manifest.ini' and self.userpath.join(path.parent.name).isdir()):
|
|
return
|
|
|
|
theme = Theme(path.parent)
|
|
self.themes.user[theme.hash] = theme
|
|
|
|
logging.verbose(f'Created new theme: {theme.name}')
|
|
|
|
|
|
def handle_theme_deleted(self, theme, path):
|
|
if self.themes.current == theme.hash:
|
|
self.themes.unset()
|
|
|
|
del self.themes.user[theme.hash]
|
|
|
|
logging.verbose(f'Deleted theme: {theme.name}')
|
|
|
|
|
|
def handle_theme_modified(self, theme, path):
|
|
if path.name == 'manifest.ini':
|
|
theme.load_manifest()
|
|
logging.verbose(f'Reloaded manifest for theme: {theme.name}')
|
|
|
|
else:
|
|
theme.load()
|
|
logging.verbose(f'Reloaded theme: {theme.name}')
|