#!/usr/bin/env python3 ''' Mastodon Block Manager v0.1 by Izalia Mae @Izalia@barkshark.xyz requirements: pygresql==5.1 envbash==1.1.2 ''' import sys import os import csv import datetime from os import environ as env from os.path import dirname, exists, abspath from pg import DB from envbash import load_envbash masto_path = env.get('MASTOPATH', abspath(dirname(__file__))) def dbconn(): try: load_envbash(f'{masto_path}/.env.production') except FileNotFoundError: print(f'Could not find ".env.production" in {masto_path}') sys.exit(1) dbhost = env.get('DB_HOST', '/var/run/postgresql') dbport = int(env.get('DB_PORT', 5432)) dbname = env.get('DB_NAME', 'mastodon_production') dbuser = env.get('DB_USER', env.get('USER')) dbpass = boolean(env.get('DB_PASS')) if type(dbpass) == str: return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser, passwd=dbpass) else: return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser) def boolean(raw_val): val = raw_val.lower() if raw_val not in [None, True, False, 0, 1] else raw_val if val in [True, False]: return val elif val in ['t', 'true', 'yes', 'enable', 'enabled', '1', 1]: return True elif val in ['f', 'false', 'no', 'disable', 'disabled', '0', 0, '', None]: return False else: return val def ban_check(domain): query = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{domain}\';').dictresult() if query == []: return blocks = [domain['domain'] for domain in query] if domain in blocks: return True def dump(data): if not data or not exists(dirname(data[0])): print('Path doesn\'t exist. Saving to current directory instead as "block.csv"') filename = 'block.csv' else: filename = data[0] domains = db.query('SELECT * FROM public.domain_blocks;').dictresult() with open(filename, 'w') as csvfile: blocks = csv.writer(csvfile, delimiter=':') blocks.writerow(['#domain', 'severity', 'reject media', 'reject reports', 'private comment', 'public comment']) for domain in domains: blocks.writerow([ domain['domain'], domain['severity'], boolean(domain['reject_media']), boolean(domain['reject_reports']), domain['private_comment'], domain['public_comment'] ]) return 'Done! :3' def load(data): ''' 0 silence 1 suspend 2 none ''' if not data or not exists(data[0]): print('Path doesn\'t exist. Saving to current directory instead as "block.csv"') filename = 'block.csv' else: filename = data[0] csvfile = csv.reader(open(filename), delimiter=':') for row in csvfile: rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{row[0]}\';').dictresult() rowdata = rowquery[0] if rowquery else None if row[0].startswith('#'): pass elif rowdata: date = datetime.datetime.now() update = [ int(rowdata['severity'])==int(row[1]), boolean(rowdata['reject_media'])==boolean(row[2]), boolean(rowdata['reject_reports'])==boolean(row[3]), rowdata['private_comment']==row[4], rowdata['public_comment']==row[5] ] if False in update: db.update('public.domain_blocks', {'id': rowdata['id']}, severity=row[1], reject_media=row[2], reject_reports=row[3], private_comment=row[4], public_comment=row[5], updated_at=date ) print(f'Updated block for {row[0]}') else: date = datetime.datetime.now() db.insert('public.domain_blocks', domain=row[0], severity=row[1], reject_media=row[2], reject_reports=row[3], private_comment=row[4], public_comment=row[5], created_at=date, updated_at=date ) print(f'Created new block for {row[0]}') return 'Done! :3' def ban(data): if not data: return 'Missing domain to block' if data[1] in ['suspend', 'silence', 'none']: bantype = {'silence': 0, 'suspend': 1, 'none': 2} severity = bantype[data[1]] else: severity = 0 media=data[2] if data[2] != None else False reports=data[3] if data[3] != None else False private=data[4] if boolean(data[4]) else None public=data[5] if boolean(data[5]) else None date = datetime.datetime.now() if not ban_check(data[0]): db.insert('public.domain_blocks', domain=data[0], severity=severity, reject_media=media, reject_reports=reports, private_comment=private, public_comment=public, created_at=date, updated_at=date ) return f'Created new block for {data[0]}' rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult() rowdata = rowquery[0] if rowquery else None db.update('public.domain_blocks', {'id': rowdata['id']}, severity=severity, reject_media=media, reject_reports=reports, private_comment=private, public_comment=public, updated_at=date ) return f'Ban for {data[0]} has been updated' def unban(data): if not data: return 'Missing domain to block' if not ban_check(data[0]): return f'{data[0]} wasn\'t in the banlist' rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult() rowdata = rowquery[0] if rowquery else None if db.delete('public.domain_blocks', {'id': rowdata['id']}) == 1: return f'Unbanned {data[0]}' else: return f'Failed to unban {data[0]}' def get(data): domains = db.query('SELECT * FROM public.domain_blocks;').dictresult() msg = 'Domain Severity reject media reject reports private comment public comment' for domain in domains: msg += f"\n{domain['domain']}\t{domain['severity']}\t{boolean(domain['reject_media'])}\t{boolean(domain['reject_reports'])}\t{domain['private_comment']}\t{domain['public_comment']}" return msg def noodle(heck): return '''Skidaddle skidoodle! Your dick is now a noodle!''' def main(): arg = sys.argv cmdhelp = '''Mastodon Blocklist Manager import [file] export [file] ban [severity] [reject media] [reject reports] [private commend] [public comment] unban update [severity] [reject media] [reject reports] [private commend] [public comment] get ''' tasks = { 'export': dump, 'import': load, 'ban': ban, 'unban': unban, 'get': get, 'noodle': noodle } if len(arg) < 2 or arg[1] not in tasks: return cmdhelp msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:]) return msg if __name__ == '__main__': db = dbconn() print(main()) db.close()