162 lines
3.7 KiB
Python
162 lines
3.7 KiB
Python
import sys
|
|
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
from json.decoder import JSONDecodeError
|
|
|
|
from DBUtils.PooledPg import PooledPg as DB
|
|
from tinydb import TinyDB, Query, where
|
|
from tinydb_smartcache import SmartCacheTable
|
|
from tinyrecord import transaction as trans
|
|
from tldextract import extract
|
|
from Crypto.PublicKey import RSA
|
|
|
|
from .config import stor_path, logging, MASTOCONFIG as mdb
|
|
from .functions import bool_check
|
|
from .cache import LRUCache
|
|
|
|
|
|
def jsondb():
|
|
try:
|
|
db = TinyDB(f'{stor_path}/db.json', indent='\t')
|
|
|
|
except JSONDecodeError as e:
|
|
logging.critical(f'Failed to load DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
db.table_class = SmartCacheTable
|
|
|
|
class table:
|
|
keys = db.table('keys')
|
|
follows = db.table('follows')
|
|
users = db.table('users')
|
|
|
|
return table
|
|
|
|
|
|
def pgdb():
|
|
try:
|
|
if mdb['dbpass']:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser'], passwd=mdb['dbpass']).connection()
|
|
|
|
else:
|
|
return DB(dbname=mdb['dbname'], host=mdb['dbhost'], port=mdb['dbport'], user=mdb['dbuser']).connection()
|
|
|
|
except Exception as e:
|
|
logging.critical(f'Failed to connect to DB: {e}. Exiting...')
|
|
sys.exit()
|
|
|
|
|
|
def keys(actor):
|
|
if pawsdb.keys.get(query.actor == actor) == None:
|
|
logging.info(f'No RSA key. Generating one for {actor}...')
|
|
PRIV = RSA.generate(4096)
|
|
PUB = PRIV.publickey()
|
|
|
|
keydata = {
|
|
'actor': actor,
|
|
'pubkey': PUB.exportKey('PEM').decode('utf-8'),
|
|
'privkey': PRIV.exportKey('PEM').decode('utf-8')
|
|
}
|
|
|
|
with trans(pawsdb.keys) as tr:
|
|
tr.insert(keydata)
|
|
|
|
|
|
return pawsdb.keys.get(query.actor == actor)
|
|
|
|
|
|
def get_handle(userid):
|
|
user_data = mastodb.query(f'SELECT username,domain FROM public.accounts WHERE id = \'{userid}\'').dictresult()
|
|
|
|
if len(user_data) < 1:
|
|
return
|
|
|
|
return (user_data[0]['username'].lower(), user_data[0]['domain'].lower())
|
|
|
|
|
|
def get_bans(suspend=True, details=False):
|
|
domains = mastodb.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
banlist = {} if details else []
|
|
|
|
for domain in domains:
|
|
instance = domain['domain']
|
|
severity = domain['severity']
|
|
|
|
if suspend and severity != 1:
|
|
continue
|
|
|
|
if details:
|
|
banlist[instance] = {
|
|
'severity': domain['severity'],
|
|
'media': bool_check(domain['reject_media']),
|
|
'reports': bool_check(domain['reject_reports']),
|
|
'private': domain['private_comment'],
|
|
'public': domain['public_comment'],
|
|
'updated': domain['updated_at']
|
|
}
|
|
|
|
else:
|
|
banlist.append(instance)
|
|
|
|
return banlist
|
|
|
|
|
|
def banned_user_check(access_user):
|
|
users = mastodb.query('SELECT username, domain FROM accounts WHERE suspended_at is not NULL').dictresult()
|
|
|
|
if not users:
|
|
return
|
|
|
|
allbans = [(user['username'].lower(), user['domain'].lower()) for user in users]
|
|
|
|
if access_user in allbans:
|
|
return True
|
|
|
|
|
|
def ban_check(url):
|
|
instance = urlparse(url).netloc if url.startswith('http') else url
|
|
domain = extract(url)
|
|
parsed = f'{domain.domain}.{domain.suffix}'
|
|
banlist = get_bans()
|
|
|
|
for ban in banlist:
|
|
if ban in [instance, parsed]:
|
|
return True
|
|
|
|
|
|
|
|
logging.debug(f'{parsed} not in blocklist')
|
|
|
|
|
|
def user_ban_check(user, access_user):
|
|
user_data = mastodb.query(f'SELECT id FROM public.accounts WHERE LOWER(username) = \'{user}\' and domain is NULL').dictresult()
|
|
|
|
if len(user_data) < 1:
|
|
return
|
|
|
|
userid = user_data[0]['id']
|
|
userban_query = mastodb.query(f'SELECT * FROM public.blocks WHERE account_id = \'{userid}\'').dictresult()
|
|
userbans = []
|
|
|
|
for ban in userban_query:
|
|
acct_id = ban['target_account_id']
|
|
user = get_handle(acct_id)
|
|
|
|
if user:
|
|
userbans.append(user)
|
|
|
|
else:
|
|
logging.warning(f'Invalid userid: {acct_id}')
|
|
|
|
if access_user in userbans:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
pawsdb = jsondb()
|
|
query = Query()
|
|
mastodb = pgdb()
|
|
|