275 lines
6.2 KiB
Python
Executable file
275 lines
6.2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
'''
|
|
Mastodon Block Manager v0.1
|
|
by Izalia Mae @Izalia@barkshark.xyz
|
|
|
|
requirements:
|
|
pygresql==5.1
|
|
envbash==1.1.2
|
|
'''
|
|
import sys
|
|
import os
|
|
import csv
|
|
import datetime
|
|
|
|
from os import environ as env
|
|
from os.path import dirname, exists, abspath
|
|
|
|
from pg import DB
|
|
from envbash import load_envbash
|
|
|
|
|
|
masto_path = env.get('MASTOPATH', abspath(dirname(__file__)))
|
|
|
|
|
|
def dbconn():
|
|
try:
|
|
load_envbash(f'{masto_path}/.env.production')
|
|
|
|
except FileNotFoundError:
|
|
print(f'Could not find ".env.production" in {masto_path}')
|
|
sys.exit(1)
|
|
|
|
dbhost = env.get('DB_HOST', '/var/run/postgresql')
|
|
dbport = int(env.get('DB_PORT', 5432))
|
|
dbname = env.get('DB_NAME', 'mastodon_production')
|
|
dbuser = env.get('DB_USER', env.get('USER'))
|
|
dbpass = boolean(env.get('DB_PASS'))
|
|
|
|
if type(dbpass) == str:
|
|
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser, passwd=dbpass)
|
|
|
|
else:
|
|
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser)
|
|
|
|
|
|
def boolean(raw_val):
|
|
val = raw_val.lower() if raw_val not in [None, True, False, 0, 1] else raw_val
|
|
|
|
if val in [True, False]:
|
|
return val
|
|
|
|
elif val in ['t', 'true', 'yes', 'enable', 'enabled', '1', 1]:
|
|
return True
|
|
|
|
elif val in ['f', 'false', 'no', 'disable', 'disabled', '0', 0, '', None]:
|
|
return False
|
|
|
|
else:
|
|
return val
|
|
|
|
|
|
def ban_check(domain):
|
|
query = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{domain}\';').dictresult()
|
|
|
|
if query == []:
|
|
return
|
|
|
|
blocks = [domain['domain'] for domain in query]
|
|
|
|
if domain in blocks:
|
|
return True
|
|
|
|
|
|
def dump(data):
|
|
if not data or not exists(dirname(data[0])):
|
|
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
|
filename = 'block.csv'
|
|
|
|
else:
|
|
filename = data[0]
|
|
|
|
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
|
|
with open(filename, 'w') as csvfile:
|
|
blocks = csv.writer(csvfile, delimiter=':')
|
|
|
|
blocks.writerow(['#domain', 'severity', 'reject media', 'reject reports', 'private comment', 'public comment'])
|
|
|
|
for domain in domains:
|
|
blocks.writerow([
|
|
domain['domain'],
|
|
domain['severity'],
|
|
boolean(domain['reject_media']),
|
|
boolean(domain['reject_reports']),
|
|
domain['private_comment'],
|
|
domain['public_comment']
|
|
])
|
|
|
|
return 'Done! :3'
|
|
|
|
|
|
def load(data):
|
|
'''
|
|
0 silence
|
|
1 suspend
|
|
2 none
|
|
'''
|
|
if not data or not exists(data[0]):
|
|
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
|
filename = 'block.csv'
|
|
|
|
else:
|
|
filename = data[0]
|
|
|
|
csvfile = csv.reader(open(filename), delimiter=':')
|
|
|
|
for row in csvfile:
|
|
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{row[0]}\';').dictresult()
|
|
rowdata = rowquery[0] if rowquery else None
|
|
|
|
if row[0].startswith('#'):
|
|
pass
|
|
|
|
elif rowdata:
|
|
date = datetime.datetime.now()
|
|
|
|
update = [
|
|
int(rowdata['severity'])==int(row[1]),
|
|
boolean(rowdata['reject_media'])==boolean(row[2]),
|
|
boolean(rowdata['reject_reports'])==boolean(row[3]),
|
|
rowdata['private_comment']==row[4],
|
|
rowdata['public_comment']==row[5]
|
|
]
|
|
|
|
if False in update:
|
|
db.update('public.domain_blocks', {'id': rowdata['id']},
|
|
severity=row[1],
|
|
reject_media=row[2],
|
|
reject_reports=row[3],
|
|
private_comment=row[4],
|
|
public_comment=row[5],
|
|
updated_at=date
|
|
)
|
|
print(f'Updated block for {row[0]}')
|
|
|
|
else:
|
|
date = datetime.datetime.now()
|
|
db.insert('public.domain_blocks',
|
|
domain=row[0],
|
|
severity=row[1],
|
|
reject_media=row[2],
|
|
reject_reports=row[3],
|
|
private_comment=row[4],
|
|
public_comment=row[5],
|
|
created_at=date,
|
|
updated_at=date
|
|
)
|
|
print(f'Created new block for {row[0]}')
|
|
|
|
return 'Done! :3'
|
|
|
|
|
|
def ban(data):
|
|
if not data:
|
|
return 'Missing domain to block'
|
|
|
|
if data[1] in ['suspend', 'silence', 'none']:
|
|
bantype = {'silence': 0, 'suspend': 1, 'none': 2}
|
|
severity = bantype[data[1]]
|
|
|
|
else:
|
|
severity = 0
|
|
|
|
media=data[2] if data[2] != None else False
|
|
reports=data[3] if data[3] != None else False
|
|
private=data[4] if boolean(data[4]) else None
|
|
public=data[5] if boolean(data[5]) else None
|
|
date = datetime.datetime.now()
|
|
|
|
if not ban_check(data[0]):
|
|
db.insert('public.domain_blocks',
|
|
domain=data[0],
|
|
severity=severity,
|
|
reject_media=media,
|
|
reject_reports=reports,
|
|
private_comment=private,
|
|
public_comment=public,
|
|
created_at=date,
|
|
updated_at=date
|
|
)
|
|
return f'Created new block for {data[0]}'
|
|
|
|
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult()
|
|
rowdata = rowquery[0] if rowquery else None
|
|
|
|
db.update('public.domain_blocks', {'id': rowdata['id']},
|
|
severity=severity,
|
|
reject_media=media,
|
|
reject_reports=reports,
|
|
private_comment=private,
|
|
public_comment=public,
|
|
updated_at=date
|
|
)
|
|
|
|
return f'Ban for {data[0]} has been updated'
|
|
|
|
|
|
def unban(data):
|
|
if not data:
|
|
return 'Missing domain to block'
|
|
|
|
if not ban_check(data[0]):
|
|
return f'{data[0]} wasn\'t in the banlist'
|
|
|
|
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{data[0]}\';').dictresult()
|
|
rowdata = rowquery[0] if rowquery else None
|
|
|
|
if db.delete('public.domain_blocks', {'id': rowdata['id']}) == 1:
|
|
return f'Unbanned {data[0]}'
|
|
|
|
else:
|
|
return f'Failed to unban {data[0]}'
|
|
|
|
|
|
def get(data):
|
|
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
|
|
msg = 'Domain Severity reject media reject reports private comment public comment'
|
|
|
|
for domain in domains:
|
|
msg += f"\n{domain['domain']}\t{domain['severity']}\t{boolean(domain['reject_media'])}\t{boolean(domain['reject_reports'])}\t{domain['private_comment']}\t{domain['public_comment']}"
|
|
|
|
return msg
|
|
|
|
|
|
def noodle(heck):
|
|
return '''Skidaddle skidoodle!
|
|
Your dick is now a noodle!'''
|
|
|
|
|
|
def main():
|
|
arg = sys.argv
|
|
|
|
cmdhelp = '''Mastodon Blocklist Manager
|
|
|
|
import [file]
|
|
export [file]
|
|
ban <domain> [severity] [reject media] [reject reports] [private commend] [public comment]
|
|
unban <domain>
|
|
update <domain> [severity] [reject media] [reject reports] [private commend] [public comment]
|
|
get
|
|
'''
|
|
|
|
tasks = {
|
|
'export': dump,
|
|
'import': load,
|
|
'ban': ban,
|
|
'unban': unban,
|
|
'get': get,
|
|
'noodle': noodle
|
|
}
|
|
|
|
if len(arg) < 2 or arg[1] not in tasks:
|
|
return cmdhelp
|
|
|
|
msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:])
|
|
|
|
return msg
|
|
|
|
|
|
if __name__ == '__main__':
|
|
db = dbconn()
|
|
print(main())
|
|
db.close()
|