add mastoblocks script
This commit is contained in:
parent
7eede149c6
commit
4c2a580067
176
mastoblocks.py
Normal file
176
mastoblocks.py
Normal file
|
@ -0,0 +1,176 @@
|
|||
#!/usr/bin/env python3
|
||||
'''
|
||||
Mastodon Block Manager v0.1
|
||||
by Izalia Mae (@Izalia@barkshark.xyz)
|
||||
|
||||
requirements:
|
||||
pip3 install pygresql>=5.1 envbash>=1.1.2
|
||||
'''
|
||||
import sys
|
||||
import os
|
||||
import csv
|
||||
import datetime
|
||||
|
||||
from os import environ as env
|
||||
from os.path import dirname, exists
|
||||
|
||||
from pg import DB
|
||||
from envbash import load_envbash
|
||||
|
||||
|
||||
def dbconn():
|
||||
try:
|
||||
load_envbash('.env.production')
|
||||
|
||||
except FileNotFoundError:
|
||||
print('Could not find ".env.production" in current directory.')
|
||||
sys.exit(1)
|
||||
|
||||
dbhost = env.get('DB_HOST', 'localhost')
|
||||
dbport = int(env.get('DB_PORT', 5432))
|
||||
dbname = env.get('DB_NAME', 'mastodon_production')
|
||||
dbuser = env.get('DB_USER', env.get('USER'))
|
||||
dbpass = boolean(env.get('DB_PASS'))
|
||||
|
||||
if type(dbpass) == str:
|
||||
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser, passwd=dbpass)
|
||||
|
||||
else:
|
||||
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser)
|
||||
|
||||
|
||||
def boolean(raw_val):
|
||||
val = raw_val.lower() if raw_val not in [None, True, False, 0, 1] else raw_val
|
||||
|
||||
if val in [True, False]:
|
||||
return val
|
||||
|
||||
elif val in ['t', 'true', 'yes', 'enable', 'enabled', '1', 1]:
|
||||
return True
|
||||
|
||||
elif val in ['f', 'false', 'no', 'disable', 'disabled', '0', 0, '', None]:
|
||||
return False
|
||||
|
||||
else:
|
||||
return val
|
||||
|
||||
|
||||
def dump(data):
|
||||
db = dbconn()
|
||||
|
||||
if not data or not exists(dirname(data[0])):
|
||||
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
||||
filename = 'block.csv'
|
||||
|
||||
else:
|
||||
filename = data[0]
|
||||
|
||||
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
|
||||
|
||||
with open(filename, 'w') as csvfile:
|
||||
blocks = csv.writer(csvfile, delimiter=':')
|
||||
|
||||
blocks.writerow(['#domain', 'severity', 'reject media', 'reject reports', 'private comment', 'public comment'])
|
||||
|
||||
for domain in domains:
|
||||
blocks.writerow([
|
||||
domain['domain'],
|
||||
domain['severity'],
|
||||
boolean(domain['reject_media']),
|
||||
boolean(domain['reject_reports']),
|
||||
domain['private_comment'],
|
||||
domain['public_comment']
|
||||
])
|
||||
|
||||
db.close()
|
||||
return 'Done! :3'
|
||||
|
||||
|
||||
def load(data):
|
||||
'''
|
||||
0 silence
|
||||
1 suspend
|
||||
2 none
|
||||
'''
|
||||
db = dbconn()
|
||||
|
||||
if not data or not exists(data[0]):
|
||||
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
||||
filename = 'block.csv'
|
||||
|
||||
else:
|
||||
filename = data[0]
|
||||
|
||||
csvfile = csv.reader(open(filename), delimiter=':')
|
||||
|
||||
for row in csvfile:
|
||||
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{row[0]}\';').dictresult()
|
||||
rowdata = rowquery[0] if rowquery else None
|
||||
|
||||
if row[0].startswith('#'):
|
||||
pass
|
||||
|
||||
elif rowdata:
|
||||
date = datetime.datetime.now()
|
||||
|
||||
update = [
|
||||
int(rowdata['severity'])==int(row[1]),
|
||||
boolean(rowdata['reject_media'])==boolean(row[2]),
|
||||
boolean(rowdata['reject_reports'])==boolean(row[3]),
|
||||
rowdata['private_comment']==row[4],
|
||||
rowdata['public_comment']==row[5]
|
||||
]
|
||||
|
||||
if False in update:
|
||||
db.update('public.domain_blocks', {'id': rowdata['id']},
|
||||
severity=row[1],
|
||||
reject_media=row[2],
|
||||
reject_reports=row[3],
|
||||
private_comment=row[4],
|
||||
public_comment=row[5],
|
||||
updated_at=date
|
||||
)
|
||||
print(f'Updated block for {row[0]}')
|
||||
|
||||
else:
|
||||
date = datetime.datetime.now()
|
||||
db.insert('public.domain_blocks',
|
||||
domain=row[0],
|
||||
severity=row[1],
|
||||
reject_media=row[2],
|
||||
reject_reports=row[3],
|
||||
private_comment=row[4],
|
||||
public_comment=row[5],
|
||||
created_at=date,
|
||||
updated_at=date
|
||||
)
|
||||
print(f'Created new block for {row[0]}')
|
||||
|
||||
db.close()
|
||||
return 'Done! :3'
|
||||
|
||||
|
||||
def main():
|
||||
arg = sys.argv
|
||||
|
||||
cmdhelp = '''Mastodon Blocklist Manager
|
||||
|
||||
import <file> Import a blocklist
|
||||
export <file> Export the blocklist to csv
|
||||
'''
|
||||
|
||||
tasks = {
|
||||
'export': dump,
|
||||
'import': load
|
||||
}
|
||||
|
||||
if len(arg) < 2 or arg[1] not in tasks:
|
||||
return cmdhelp
|
||||
|
||||
msg = tasks[arg[1]](None if len(arg) < 3 else arg[2:])
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(main())
|
Loading…
Reference in a new issue