171 lines
3.6 KiB
Python
171 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
'''
|
|
Mastodon Block Manager v0.2
|
|
by Izalia Mae (@Izalia@barkshark.xyz)
|
|
|
|
installation:
|
|
sudo apt install python3-dev libpq-dev
|
|
pip3 install pygresql envbash
|
|
Copy to mastodon directory
|
|
'''
|
|
import json, os, sys
|
|
|
|
from datetime import datetime
|
|
from os import environ as env
|
|
from pathlib import Path
|
|
|
|
from pg import DB
|
|
from envbash import load_envbash
|
|
|
|
|
|
severity_import = {
|
|
'silence': 0,
|
|
'suspend': 1,
|
|
'none': 2
|
|
}
|
|
|
|
severity_export = {v:k for k,v in severity_import.items()}
|
|
|
|
block_type = {
|
|
'suspend': 'suspended_at',
|
|
'silence': 'silenced_at'
|
|
}
|
|
|
|
|
|
def dbconn():
|
|
try:
|
|
load_envbash('.env.production')
|
|
|
|
except FileNotFoundError:
|
|
print('Could not find ".env.production" in current directory.')
|
|
sys.exit(1)
|
|
|
|
dbpass = env.get('DB_PASS')
|
|
kwargs = {
|
|
'host': env.get('DB_HOST', 'localhost'),
|
|
'port': int(env.get('DB_PORT', 5432)),
|
|
'dbname': env.get('DB_NAME', 'mastodon_production'),
|
|
'user': env.get('DB_USER', env.get('USER'))
|
|
}
|
|
|
|
if dbpass:
|
|
kwargs['passwd'] = dbpass
|
|
|
|
return DB(**kwargs)
|
|
|
|
|
|
def dump(data):
|
|
if not data:
|
|
print('Path doesn\'t exist. Saving to current directory instead as "block.json"')
|
|
filename = Path('blocks.json').resolve()
|
|
|
|
else:
|
|
filename = Path(data[0]).resolve()
|
|
filename.mkdir(parents=True, exist_okj=True)
|
|
|
|
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
|
|
|
|
with open(filename, 'w') as fd:
|
|
new_data = []
|
|
for domain in domains:
|
|
del domain['id']
|
|
del domain['private_comment']
|
|
del domain['created_at']
|
|
del domain['updated_at']
|
|
domain['severity'] = severity_export.get( domain['severity'])
|
|
|
|
new_data.append(domain)
|
|
fd.write(json.dumps(new_data, indent=4))
|
|
|
|
return 'Done! :3'
|
|
|
|
|
|
def load(data):
|
|
if not data:
|
|
print('Path doesn\'t exist. Reading from "blocks.json" instead.')
|
|
filename = Path('blocks.json').resolve()
|
|
|
|
else:
|
|
filename = Path(data[0]).resolve()
|
|
|
|
data = json.load(filename.open())
|
|
|
|
for line in data:
|
|
domain = line['domain']
|
|
rowquery = db.query(f"SELECT * FROM public.domain_blocks WHERE domain = '{domain}';").dictresult()
|
|
row = rowquery[0] if rowquery else None
|
|
date = datetime.now()
|
|
|
|
data = {
|
|
'severity': severity_import.get(line['severity'], 1),
|
|
'reject_media': line['reject_media'],
|
|
'reject_reports': line['reject_reports'],
|
|
'public_comment': line['public_comment'],
|
|
'updated_at': date
|
|
}
|
|
|
|
if row:
|
|
db.update('public.domain_blocks', {'id': row['id']}, **data)
|
|
print(f'Updated block for {line["domain"]}')
|
|
|
|
else:
|
|
data['domain'] = line['domain']
|
|
data['created_at'] = date
|
|
db.insert('public.domain_blocks', **data)
|
|
print(f'Created new block for {line["domain"]}')
|
|
|
|
btype = block_type.get(line['severity'])
|
|
|
|
if btype:
|
|
accts = db.query(f"SELECT * FROM public.accounts WHERE domain LIKE '%{domain}'").dictresult()
|
|
|
|
for acct in accts:
|
|
account_id = acct['id']
|
|
db.update('public.accounts', {'id': account_id}, **{btype: date})
|
|
|
|
if btype == 'suspended_at':
|
|
follows = db.query(f"SELECT * FROM public.follows WHERE account_id = '{account_id}' or target_account_id = '{account_id}'")
|
|
|
|
for follow in follows:
|
|
db.delete('public.follows', follow)
|
|
|
|
return 'Done! :3'
|
|
|
|
|
|
db = dbconn()
|
|
|
|
|
|
def main():
|
|
arg = sys.argv
|
|
|
|
cmdhelp = '''Mastodon Blocklist Manager
|
|
|
|
import <file> Import a json blocklist
|
|
export <file> Export the blocklist to json
|
|
'''
|
|
|
|
tasks = {
|
|
'export': dump,
|
|
'import': load
|
|
}
|
|
|
|
if len(arg) < 2 or arg[1] not in tasks:
|
|
return cmdhelp
|
|
|
|
try:
|
|
db.begin()
|
|
msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:])
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
db.close()
|
|
raise e from None
|
|
|
|
db.commit()
|
|
db.close()
|
|
return msg
|
|
|
|
|
|
if __name__ == '__main__':
|
|
print(main())
|