mstdn-block/block.py
2019-11-29 15:33:40 -05:00

275 lines
6.2 KiB
Python
Executable file

#!/usr/bin/env python3
'''
Mastodon Block Manager v0.1
by Izalia Mae @Izalia@barkshark.xyz
requirements:
pygresql==5.1
envbash==1.1.2
'''
import sys
import os
import csv
import datetime
from os import environ as env
from os.path import dirname, exists, abspath
from pg import DB
from envbash import load_envbash
masto_path = env.get('MASTOPATH', abspath(dirname(__file__)))
def dbconn():
try:
load_envbash(f'{masto_path}/.env.production')
except FileNotFoundError:
print(f'Could not find ".env.production" in {masto_path}')
sys.exit(1)
dbhost = env.get('DB_HOST', '/var/run/postgresql')
dbport = int(env.get('DB_PORT', 5432))
dbname = env.get('DB_NAME', 'mastodon_production')
dbuser = env.get('DB_USER', env.get('USER'))
dbpass = boolean(env.get('DB_PASS'))
if type(dbpass) == str:
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser, passwd=dbpass)
else:
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser)
def boolean(raw_val):
val = raw_val.lower() if raw_val not in [None, True, False, 0, 1] else raw_val
if val in [True, False]:
return val
elif val in ['t', 'true', 'yes', 'enable', 'enabled', '1', 1]:
return True
elif val in ['f', 'false', 'no', 'disable', 'disabled', '0', 0, '', None]:
return False
else:
return val
def ban_check(domain):
query = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{domain}\';').dictresult()
if query == []:
return
blocks = [domain['domain'] for domain in query]
if domain in blocks:
return True
def dump(data):
if not data or not exists(dirname(data[0])):
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
filename = 'block.csv'
else:
filename = data[0]
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
with open(filename, 'w') as csvfile:
blocks = csv.writer(csvfile, delimiter=':')
blocks.writerow(['#domain', 'severity', 'reject media', 'reject reports', 'private comment', 'public comment'])
for domain in domains:
blocks.writerow([
domain['domain'],
domain['severity'],
boolean(domain['reject_media']),
boolean(domain['reject_reports']),
domain['private_comment'],
domain['public_comment']
])
return 'Done! :3'
def load(data):
'''
0 silence
1 suspend
2 none
'''
if not data or not exists(data[0]):
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
filename = 'block.csv'
else:
filename = data[0]
csvfile = csv.reader(open(filename), delimiter=':')
for row in csvfile:
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{row[0]}\';').dictresult()
rowdata = rowquery[0] if rowquery else None
if row[0].startswith('#'):
pass
elif rowdata:
date = datetime.datetime.now()
update = [
int(rowdata['severity'])==int(row[1]),
boolean(rowdata['reject_media'])==boolean(row[2]),
boolean(rowdata['reject_reports'])==boolean(row[3]),
rowdata['private_comment']==row[4],
rowdata['public_comment']==row[5]
]
if False in update:
db.update('public.domain_blocks', {'id': rowdata['id']},
severity=row[1],
reject_media=row[2],
reject_reports=row[3],
private_comment=row[4],
public_comment=row[5],
updated_at=date
)
print(f'Updated block for {row[0]}')
else:
date = datetime.datetime.now()
db.insert('public.domain_blocks',
domain=row[0],
severity=row[1],
reject_media=row[2],
reject_reports=row[3],
private_comment=row[4],
public_comment=row[5],
created_at=date,
updated_at=date
)
print(f'Created new block for {row[0]}')
return 'Done! :3'
def ban(data):
if not data:
return 'Missing domain to block'
if data[1] in ['suspend', 'silence', 'none']:
bantype = {'silence': 0, 'suspend': 1, 'none': 2}
severity = bantype[data[1]]
else:
severity = 0
media=data[2] if data[2] != None else False
reports=data[3] if data[3] != None else False
private=data[4] if boolean(data[4]) else None
public=data[5] if boolean(data[5]) else None
date = datetime.datetime.now()
if not ban_check(data[0]):
db.insert('public.domain_blocks',
domain=data[0],
severity=severity,
reject_media=media,
reject_reports=reports,
private_comment=private,
public_comment=public,
created_at=date,
updated_at=date
)
return f'Created new block for {data[0]}'
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult()
rowdata = rowquery[0] if rowquery else None
db.update('public.domain_blocks', {'id': rowdata['id']},
severity=severity,
reject_media=media,
reject_reports=reports,
private_comment=private,
public_comment=public,
updated_at=date
)
return f'Ban for {data[0]} has been updated'
def unban(data):
if not data:
return 'Missing domain to block'
if not ban_check(data[0]):
return f'{data[0]} wasn\'t in the banlist'
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult()
rowdata = rowquery[0] if rowquery else None
if db.delete('public.domain_blocks', {'id': rowdata['id']}) == 1:
return f'Unbanned {data[0]}'
else:
return f'Failed to unban {data[0]}'
def get(data):
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
msg = 'Domain Severity reject media reject reports private comment public comment'
for domain in domains:
msg += f"\n{domain['domain']}\t{domain['severity']}\t{boolean(domain['reject_media'])}\t{boolean(domain['reject_reports'])}\t{domain['private_comment']}\t{domain['public_comment']}"
return msg
def noodle(heck):
return '''Skidaddle skidoodle!
Your dick is now a noodle!'''
def main():
arg = sys.argv
cmdhelp = '''Mastodon Blocklist Manager
import [file]
export [file]
ban <domain> [severity] [reject media] [reject reports] [private commend] [public comment]
unban <domain>
update <domain> [severity] [reject media] [reject reports] [private commend] [public comment]
get
'''
tasks = {
'export': dump,
'import': load,
'ban': ban,
'unban': unban,
'get': get,
'noodle': noodle
}
if len(arg) < 2 or arg[1] not in tasks:
return cmdhelp
msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:])
return msg
if __name__ == '__main__':
db = dbconn()
print(main())
db.close()