91 lines
2.1 KiB
Python
91 lines
2.1 KiB
Python
import sys
|
|
|
|
from DBUtils.PooledPg import PooledPg as DB
|
|
from datetime import datetime
|
|
from tinydb import TinyDB, Query, where
|
|
from tinydb_smartcache import SmartCacheTable
|
|
from tinyrecord import transaction as trans
|
|
from tldextract import extract
|
|
from urllib.parse import urlparse
|
|
from json.decoder import JSONDecodeError
|
|
|
|
from .config import stor_path, logging, MASTOCONFIG as mdb
|
|
from .functions import bool_check
|
|
|
|
|
|
def jsondb():
|
|
try:
|
|
db = TinyDB(f'{stor_path}/db.json', indent='\t')
|
|
|
|
except JSONDecodeError as e:
|
|
logging.critical(f'Failed to load DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
db.table_class = SmartCacheTable
|
|
|
|
class table:
|
|
bans = db.table('bans')
|
|
follows = db.table('follows')
|
|
users = db.table('users')
|
|
|
|
return table
|
|
|
|
|
|
def pgdb():
|
|
try:
|
|
if mdb['dbpass']:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser'], passwd=mdb['dbpass']).connection()
|
|
|
|
else:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser']).connection()
|
|
|
|
except Exception as e:
|
|
logging.critical(f'Failed to connect to DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
|
|
def get_bans(suspend=True, details=False):
|
|
domains = mastodb.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
banlist = {} if details else []
|
|
|
|
for domain in domains:
|
|
instance = domain['domain']
|
|
severity = domain['severity']
|
|
|
|
if suspend and severity != 1:
|
|
continue
|
|
|
|
if details:
|
|
banlist[instance] = {
|
|
'severity': domain['severity'],
|
|
'media': bool_check(domain['reject_media']),
|
|
'reports': bool_check(domain['reject_reports']),
|
|
'private': domain['private_comment'],
|
|
'public': domain['public_comment'],
|
|
'updated': domain['updated_at']
|
|
}
|
|
|
|
else:
|
|
banlist.append(instance)
|
|
|
|
return banlist
|
|
|
|
|
|
def ban_check(url):
|
|
instance = urlparse(url).netloc if url.startswith('http') else url
|
|
domain = extract(url)
|
|
parsed = f'{domain.domain}.{domain.suffix}'
|
|
banlist = get_bans()
|
|
|
|
for ban in banlist:
|
|
if ban in [instance, parsed]:
|
|
return True
|
|
|
|
logging.debug(f'{parsed} not in blocklist')
|
|
|
|
|
|
pawsdb = jsondb()
|
|
query = Query()
|
|
mastodb = pgdb()
|
|
|