paws/paws/database.py
2020-01-13 08:10:48 -05:00

100 lines
2.2 KiB
Python

import sys
from DBUtils.PooledPg import PooledPg as DB
from datetime import datetime
from tinydb import TinyDB, Query
from tinydb_smartcache import SmartCacheTable
from tinyrecord import transaction as trans
from tldextract import extract
from urllib.parse import urlparse
from json.decoder import JSONDecodeError
from .config import stor_path, logging, MASTOCONFIG as mdb
from .functions import bool_check
def jsondb():
try:
db = TinyDB(f'{stor_path}/db.json', indent='\t')
except JSONDecodeError as e:
logging.critical(f'Failed to load DB: {e}. Exiting...')
sys.exit()
db.table_class = SmartCacheTable
tables = {
'bans': db.table('bans'),
'follows': db.table('follows'),
'users': db.table('users'),
'tokens': db.table('tokens')
}
return tables
def pgdb():
try:
if mdb['dbpass']:
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser'], passwd=mdb['dbpass']).connection()
else:
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser']).connection()
except Exception as e:
logging.critical(f'Failed to connect to DB: {e}. Exiting...')
sys.exit()
def get_bans():
domains = mastodb.query('SELECT * FROM public.domain_blocks;').dictresult()
banlist = {}
for domain in domains:
instance = domain['domain']
banlist[instance] = {
'severity': domain['severity'],
'media': bool_check(domain['reject_media']),
'reports': bool_check(domain['reject_reports']),
'private': domain['private_comment'],
'public': domain['public_comment'],
'updated': domain['updated_at']
}
return banlist
def update_bans():
'''I'll implement this later'''
pass
def update_bancache():
bans = get_bans()
banlist = cache.get('bans')
for domain in bans:
if domain not in banlist or bans[domain]['updated'] > banlist[domain]['updated']:
banlist[domain] = bans[domain]
cache['bans'] = banlist
logging.debug('Updated ban cache')
def ban_check(url):
instance = urlparse(url).netloc if url.startswith('http') else url
domain = extract(url)
parsed = f'{domain.domain}.{domain.suffix}'
for ban in get_bans():
if ban in [instance, parsed]:
return True
logging.debug(f'{parsed} not in blocklist')
pawsdb = jsondb()
query = Query()
mastodb = pgdb()