scripts/bin/mastoblocks.py

171 lines
3.6 KiB
Python

#!/usr/bin/env python3
'''
Mastodon Block Manager v0.2
by Izalia Mae (@Izalia@barkshark.xyz)
installation:
sudo apt install python3-dev libpq-dev
pip3 install pygresql envbash
Copy to mastodon directory
'''
import json, os, sys
from datetime import datetime
from os import environ as env
from pathlib import Path
from pg import DB
from envbash import load_envbash
severity_import = {
'silence': 0,
'suspend': 1,
'none': 2
}
severity_export = {v:k for k,v in severity_import.items()}
block_type = {
'suspend': 'suspended_at',
'silence': 'silenced_at'
}
def dbconn():
try:
load_envbash('.env.production')
except FileNotFoundError:
print('Could not find ".env.production" in current directory.')
sys.exit(1)
dbpass = env.get('DB_PASS')
kwargs = {
'host': env.get('DB_HOST', 'localhost'),
'port': int(env.get('DB_PORT', 5432)),
'dbname': env.get('DB_NAME', 'mastodon_production'),
'user': env.get('DB_USER', env.get('USER'))
}
if dbpass:
kwargs['passwd'] = dbpass
return DB(**kwargs)
def dump(data):
if not data:
print('Path doesn\'t exist. Saving to current directory instead as "block.json"')
filename = Path('blocks.json').resolve()
else:
filename = Path(data[0]).resolve()
filename.mkdir(parents=True, exist_okj=True)
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
with open(filename, 'w') as fd:
new_data = []
for domain in domains:
del domain['id']
del domain['private_comment']
del domain['created_at']
del domain['updated_at']
domain['severity'] = severity_export.get( domain['severity'])
new_data.append(domain)
fd.write(json.dumps(new_data, indent=4))
return 'Done! :3'
def load(data):
if not data:
print('Path doesn\'t exist. Reading from "blocks.json" instead.')
filename = Path('blocks.json').resolve()
else:
filename = Path(data[0]).resolve()
data = json.load(filename.open())
for line in data:
domain = line['domain']
rowquery = db.query(f"SELECT * FROM public.domain_blocks WHERE domain = '{domain}';").dictresult()
row = rowquery[0] if rowquery else None
date = datetime.now()
data = {
'severity': severity_import.get(line['severity'], 1),
'reject_media': line['reject_media'],
'reject_reports': line['reject_reports'],
'public_comment': line['public_comment'],
'updated_at': date
}
if row:
db.update('public.domain_blocks', {'id': row['id']}, **data)
print(f'Updated block for {line["domain"]}')
else:
data['domain'] = line['domain']
data['created_at'] = date
db.insert('public.domain_blocks', **data)
print(f'Created new block for {line["domain"]}')
btype = block_type.get(line['severity'])
if btype:
accts = db.query(f"SELECT * FROM public.accounts WHERE domain LIKE '%{domain}'").dictresult()
for acct in accts:
account_id = acct['id']
db.update('public.accounts', {'id': account_id}, **{btype: date})
if btype == 'suspended_at':
follows = db.query(f"SELECT * FROM public.follows WHERE account_id = '{account_id}' or target_account_id = '{account_id}'")
for follow in follows:
db.delete('public.follows', follow)
return 'Done! :3'
db = dbconn()
def main():
arg = sys.argv
cmdhelp = '''Mastodon Blocklist Manager
import <file> Import a json blocklist
export <file> Export the blocklist to json
'''
tasks = {
'export': dump,
'import': load
}
if len(arg) < 2 or arg[1] not in tasks:
return cmdhelp
try:
db.begin()
msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:])
except Exception as e:
db.rollback()
db.close()
raise e from None
db.commit()
db.close()
return msg
if __name__ == '__main__':
print(main())