cleanup mastoblocks
This commit is contained in:
parent
a1b07c31a7
commit
6113747f51
159
mastoblocks.py
159
mastoblocks.py
|
@ -6,18 +6,28 @@ by Izalia Mae (@Izalia@barkshark.xyz)
|
|||
requirements:
|
||||
pip3 install pygresql envbash
|
||||
'''
|
||||
import sys
|
||||
import os
|
||||
import csv
|
||||
import datetime
|
||||
import datetime, json, os, sys
|
||||
|
||||
from os import environ as env
|
||||
from os.path import dirname, exists
|
||||
from pathlib import Path
|
||||
|
||||
from pg import DB
|
||||
from envbash import load_envbash
|
||||
|
||||
|
||||
severity_import = {
|
||||
'silence': 0,
|
||||
'suspend': 1,
|
||||
'none': 2
|
||||
}
|
||||
|
||||
severity_export = {
|
||||
0: 'silence',
|
||||
1: 'suspend',
|
||||
2: 'none'
|
||||
}
|
||||
|
||||
|
||||
def dbconn():
|
||||
try:
|
||||
load_envbash('.env.production')
|
||||
|
@ -26,125 +36,84 @@ def dbconn():
|
|||
print('Could not find ".env.production" in current directory.')
|
||||
sys.exit(1)
|
||||
|
||||
dbhost = env.get('DB_HOST', 'localhost')
|
||||
dbport = int(env.get('DB_PORT', 5432))
|
||||
dbname = env.get('DB_NAME', 'mastodon_production')
|
||||
dbuser = env.get('DB_USER', env.get('USER'))
|
||||
dbpass = boolean(env.get('DB_PASS'))
|
||||
dbpass = env.get('DB_PASS')
|
||||
kwargs = {
|
||||
'host': env.get('DB_HOST', 'localhost'),
|
||||
'port': int(env.get('DB_PORT', 5432)),
|
||||
'dbname': env.get('DB_NAME', 'mastodon_production'),
|
||||
'user': env.get('DB_USER', env.get('USER'))
|
||||
}
|
||||
|
||||
if type(dbpass) == str:
|
||||
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser, passwd=dbpass)
|
||||
if dbpass:
|
||||
kwargs['passwd'] = dbpass
|
||||
|
||||
else:
|
||||
return DB(dbname=dbname, host=dbhost, port=dbport, user=dbuser)
|
||||
|
||||
|
||||
def boolean(raw_val):
|
||||
val = raw_val.lower() if raw_val not in [None, True, False, 0, 1] else raw_val
|
||||
|
||||
if val in [True, False]:
|
||||
return val
|
||||
|
||||
elif val in ['t', 'true', 'yes', 'enable', 'enabled', '1', 1]:
|
||||
return True
|
||||
|
||||
elif val in ['f', 'false', 'no', 'disable', 'disabled', '0', 0, '', None]:
|
||||
return False
|
||||
|
||||
else:
|
||||
return val
|
||||
return DB(**kwargs)
|
||||
|
||||
|
||||
def dump(data):
|
||||
db = dbconn()
|
||||
|
||||
if not data or not exists(dirname(data[0])):
|
||||
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
||||
filename = 'block.csv'
|
||||
if not data:
|
||||
print('Path doesn\'t exist. Saving to current directory instead as "block.json"')
|
||||
filename = Path('blocks.json').resolve()
|
||||
|
||||
else:
|
||||
filename = data[0]
|
||||
filename = Path(data[0]).resolve()
|
||||
filename.mkdir(parents=True, exist_okj=True)
|
||||
|
||||
domains = db.query('SELECT * FROM public.domain_blocks;').dictresult()
|
||||
|
||||
with open(filename, 'w') as csvfile:
|
||||
blocks = csv.writer(csvfile, delimiter=':')
|
||||
|
||||
blocks.writerow(['#domain', 'severity', 'reject media', 'reject reports', 'private comment', 'public comment'])
|
||||
|
||||
with open(filename, 'w') as fd:
|
||||
new_data = []
|
||||
for domain in domains:
|
||||
blocks.writerow([
|
||||
domain['domain'],
|
||||
domain['severity'],
|
||||
boolean(domain['reject_media']),
|
||||
boolean(domain['reject_reports']),
|
||||
domain['private_comment'],
|
||||
domain['public_comment']
|
||||
])
|
||||
del domain['id']
|
||||
del domain['private_comment']
|
||||
del domain['created_at']
|
||||
del domain['updated_at']
|
||||
domain['severity'] = severity_export.get( domain['severity'])
|
||||
|
||||
new_data.append(domain)
|
||||
fd.write(json.dumps(new_data, indent=4))
|
||||
|
||||
db.close()
|
||||
return 'Done! :3'
|
||||
|
||||
|
||||
def load(data):
|
||||
'''
|
||||
0 silence
|
||||
1 suspend
|
||||
2 none
|
||||
'''
|
||||
db = dbconn()
|
||||
|
||||
if not data or not exists(data[0]):
|
||||
print('Path doesn\'t exist. Saving to current directory instead as "block.csv"')
|
||||
filename = 'block.csv'
|
||||
if not data:
|
||||
print('Path doesn\'t exist. Reading from "blocks.json" instead.')
|
||||
filename = Path('blocks.json').resolve()
|
||||
|
||||
else:
|
||||
filename = data[0]
|
||||
filename = Path(data[0]).resolve()
|
||||
|
||||
csvfile = csv.reader(open(filename), delimiter=':')
|
||||
data = json.load(filename.open())
|
||||
|
||||
for row in csvfile:
|
||||
rowquery = db.query(f'SELECT * FROM public.domain_blocks WHERE domain = \'{row[0]}\';').dictresult()
|
||||
rowdata = rowquery[0] if rowquery else None
|
||||
for line in data:
|
||||
domain = line['domain']
|
||||
rowquery = db.query(f"SELECT * FROM public.domain_blocks WHERE domain = '{domain}';").dictresult()
|
||||
row = rowquery[0] if rowquery else None
|
||||
date = datetime.datetime.now()
|
||||
|
||||
if row[0].startswith('#'):
|
||||
pass
|
||||
data = {
|
||||
'severity': severity_import.get(line['severity'], 1),
|
||||
'reject_media': line['reject_media'],
|
||||
'reject_reports': line['reject_reports'],
|
||||
'public_comment': line['public_comment'],
|
||||
'updated_at': date
|
||||
}
|
||||
|
||||
elif rowdata:
|
||||
date = datetime.datetime.now()
|
||||
|
||||
update = [
|
||||
int(rowdata['severity'])==int(row[1]),
|
||||
boolean(rowdata['reject_media'])==boolean(row[2]),
|
||||
boolean(rowdata['reject_reports'])==boolean(row[3]),
|
||||
rowdata['private_comment']==row[4],
|
||||
rowdata['public_comment']==row[5]
|
||||
]
|
||||
|
||||
if False in update:
|
||||
db.update('public.domain_blocks', {'id': rowdata['id']},
|
||||
severity=row[1],
|
||||
reject_media=row[2],
|
||||
reject_reports=row[3],
|
||||
private_comment=row[4],
|
||||
public_comment=row[5],
|
||||
updated_at=date
|
||||
)
|
||||
print(f'Updated block for {row[0]}')
|
||||
if row:
|
||||
db.update('public.domain_blocks', {'id': row['id']}, **data)
|
||||
print(f'Updated block for {line["domain"]}')
|
||||
|
||||
else:
|
||||
date = datetime.datetime.now()
|
||||
db.insert('public.domain_blocks',
|
||||
domain=row[0],
|
||||
severity=row[1],
|
||||
reject_media=row[2],
|
||||
reject_reports=row[3],
|
||||
private_comment=row[4],
|
||||
public_comment=row[5],
|
||||
created_at=date,
|
||||
updated_at=date
|
||||
)
|
||||
print(f'Created new block for {row[0]}')
|
||||
data['domain'] = line['domain']
|
||||
data['created_at'] = date
|
||||
db.insert('public.domain_blocks', **data)
|
||||
print(f'Created new block for {line["domain"]}')
|
||||
|
||||
db.close()
|
||||
return 'Done! :3'
|
||||
|
|
Loading…
Reference in a new issue