97 lines
2.2 KiB
Python
97 lines
2.2 KiB
Python
import sys
|
|
from DBUtils.PooledPg import PooledPg as DB
|
|
from datetime import datetime
|
|
from tinydb import TinyDB, Query
|
|
from tinydb_smartcache import SmartCacheTable
|
|
from tinyrecord import transaction as trans
|
|
from tldextract import extract
|
|
from json.decoder import JSONDecodeError
|
|
|
|
from .config import stor_path, logging, MASTOCONFIG as mdb
|
|
|
|
|
|
def jsondb():
|
|
try:
|
|
db = TinyDB(f'{stor_path}/db.json', indent='\t')
|
|
|
|
except JSONDecodeError as e:
|
|
logging.critical(f'Failed to load DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
db.table_class = SmartCacheTable
|
|
|
|
tables = {
|
|
'bans': db.table('bans'),
|
|
'follows': db.table('follows'),
|
|
'users': db.table('users'),
|
|
'tokens': db.table('tokens')
|
|
}
|
|
|
|
return tables
|
|
|
|
|
|
def pgdb():
|
|
try:
|
|
if type(dbpass) == str:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser'], passwd=mdb['dbpass'])
|
|
|
|
else:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser'])
|
|
|
|
except Exception as e:
|
|
logging.critical(f'Failed to connect to DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
|
|
def get_bans():
|
|
domains = mastodb.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
banlist = {}
|
|
|
|
for domain in domains:
|
|
instance = domain['domain']
|
|
|
|
banlist[instance] = {
|
|
'severity': domain['severity'],
|
|
'media': boolean(domain['reject_media']),
|
|
'reports': boolean(domain['reject_reports']),
|
|
'private': domain['private_comment'],
|
|
'public': domain['public_comment'],
|
|
'updated': domain['updated_at']
|
|
}
|
|
|
|
return banlist
|
|
|
|
|
|
def update_bans():
|
|
'''I'll implement this later'''
|
|
pass
|
|
|
|
|
|
def update_bancache():
|
|
bans = get_bans()
|
|
banlist = cache.get('bans')
|
|
|
|
for domain in bans:
|
|
if domain not in banlist or bans[domain]['updated'] > banlist[domain]['updated']:
|
|
banlist[domain] = bans[domain]
|
|
|
|
cache.get('bans') = banlist
|
|
logging.debug('Updated ban cache')
|
|
|
|
|
|
def ban_check(url):
|
|
instance = urlparse(url).netloc if url.startswith('https') else url
|
|
domain = extract(url)
|
|
parsed = f'{domain.domain}.{domain.suffix}'
|
|
|
|
for ban in cache.get('ban'):
|
|
if ban in [url, parsed]:
|
|
return True
|
|
|
|
logging.debug(f'{parsed} not in blocklist')
|
|
|
|
pawsdb = jsondb()
|
|
query = Query()
|
|
mastodb = pgdb()
|
|
cache = {'bans': get_bans()}
|