208 lines
4.1 KiB
Python
208 lines
4.1 KiB
Python
import re, traceback
|
|
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from izzylib_sql import Row
|
|
from mastodon import Mastodon
|
|
from PIL import Image
|
|
from urllib.parse import quote_plus
|
|
|
|
from ..functions import TimeoutCallback, get_app, run_in_gui_thread
|
|
|
|
|
|
row_classes = {}
|
|
emojipatt = re.compile(r':.*?:')
|
|
|
|
|
|
def register_class(table):
|
|
def wrapper(cls):
|
|
row_classes[table] = cls
|
|
return cls
|
|
|
|
return wrapper
|
|
|
|
|
|
class RowBase(Row):
|
|
@property
|
|
def app(self):
|
|
return get_app()
|
|
|
|
|
|
@register_class('accounts')
|
|
class Account(RowBase):
|
|
_api = None
|
|
_emojis = None
|
|
|
|
|
|
@property
|
|
def api(self):
|
|
if not self._api:
|
|
self._set_api()
|
|
|
|
return self._api
|
|
|
|
|
|
@property
|
|
def active(self):
|
|
with self._db.session as s:
|
|
return s.get_config('active_acct') == self.id
|
|
|
|
|
|
@property
|
|
def avatar(self):
|
|
return get_app().path.avatars.join(f'{self.id}.png')
|
|
|
|
|
|
@property
|
|
def bio(self):
|
|
try:
|
|
text = self.data['note'].replace('<p>', '<span class="bio-line">').replace('</p>', '</span>')
|
|
return self.replace_emojis(text)
|
|
|
|
except KeyError:
|
|
return
|
|
|
|
|
|
@property
|
|
def emojis(self):
|
|
if not self._emojis:
|
|
try:
|
|
self._emojis = self.fetch_emojis()
|
|
except:
|
|
traceback.print_exc()
|
|
self._emojis = {}
|
|
|
|
return self._emojis
|
|
|
|
|
|
@property
|
|
def fullhandle(self):
|
|
return f'{self.handle}@{self.domain}'
|
|
|
|
|
|
def _set_api(self):
|
|
self._api = Mastodon(access_token=self.token, api_base_url=f'https://{self.domain}')
|
|
logging.debug(f'logged into {self.fullhandle}')
|
|
|
|
|
|
def fetch_avatar(self):
|
|
byte = BytesIO()
|
|
http_client = get_app().http_client
|
|
|
|
resp = http_client.request(self.data.avatar_static, follow_redir=True)
|
|
|
|
if resp.status != 200:
|
|
logging.error(f'Failed to download {self.data.avatar_static}:', resp.status)
|
|
return False
|
|
|
|
image = Image.open(resp.body)
|
|
image.thumbnail((256,256))
|
|
image.save(byte, format='png')
|
|
|
|
with get_app().path.avatars.join(f'{self.id}.png').open('wb') as fd:
|
|
fd.write(byte.getvalue())
|
|
|
|
return True
|
|
|
|
|
|
def fetch_emojis(self, refresh=False):
|
|
cache_path = get_app().path.emojis.join(self.domain + '.json')
|
|
emojis = DotDict()
|
|
|
|
try:
|
|
if cache_path.mtime + timedelta(days=7) < datetime.now():
|
|
raise FileNotFoundError('heck')
|
|
|
|
emojis.load_json(cache_path)
|
|
|
|
except FileNotFoundError:
|
|
refresh = True
|
|
|
|
if refresh:
|
|
for emoji in sorted(self.api.custom_emojis(), key=lambda x:x['shortcode']):
|
|
code = emoji.pop('shortcode')
|
|
emojis[code] = emoji
|
|
|
|
if len(emojis):
|
|
emojis.save_json(cache_path, indent=4)
|
|
|
|
return emojis
|
|
|
|
|
|
def fetch_post(self, url):
|
|
data = self.api.search(url, result_type='statuses', resolve=True)['statuses']
|
|
|
|
try:
|
|
return DotDict(data[0])
|
|
|
|
except IndexError:
|
|
return None
|
|
|
|
|
|
def refresh(self):
|
|
user_data = self.api.me()
|
|
instance = self.api.instance()
|
|
|
|
if not user_data:
|
|
logging.warning('Failed to fetch new user data')
|
|
return
|
|
|
|
with self._db.session as s:
|
|
try: toot_limit = instance['max_toot_chars']
|
|
except: toot_limit = 500
|
|
|
|
row = s.update_row(self,
|
|
username = user_data.display_name,
|
|
data = DotDict(user_data).to_json(),
|
|
toot_limit = toot_limit
|
|
)
|
|
|
|
for key, value in row.items():
|
|
self[key] = value
|
|
|
|
if not s.get_config('active_acct'):
|
|
s.put_config('active_acct', self.id)
|
|
|
|
self.fetch_avatar()
|
|
|
|
|
|
def replace_emojis(self, string):
|
|
for match in set(emojipatt.findall(string)):
|
|
try:
|
|
shortcode = match[1:-1]
|
|
url = self.emojis[shortcode].static_url
|
|
string = string.replace(match, f"<img class='emoji' src='{url}' title='{shortcode}' />")
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
return string
|
|
|
|
|
|
def set_active(self):
|
|
with self._db.session as s:
|
|
s.put_config('active_acct', self.id)
|
|
|
|
|
|
@register_class('passwords')
|
|
class Password(RowBase):
|
|
def copy_password(self, timeout=60):
|
|
self.app.set_clipboard_text(self.password)
|
|
|
|
timer = TimeoutCallback(timeout, run_in_gui_thread, self.app.handle_clipboard_clear_password, self.password)
|
|
timer.start()
|
|
|
|
|
|
def copy_username(self):
|
|
self.app.set_clipboard_text(self.username)
|
|
|
|
|
|
@register_class('search')
|
|
class Search(RowBase):
|
|
def compile(self, text):
|
|
if text.startswith(f'{self.keyword} '):
|
|
text = text[len(self.keyword)+1:]
|
|
|
|
search_text = quote_plus(text)
|
|
return self.url.format(q=search_text)
|