add mastodon notification script

This commit is contained in:
Izalia Mae 2019-12-04 04:08:41 -05:00
parent 0a2757fc34
commit 66d275eee4

274
multi-notif.py Executable file
View file

@ -0,0 +1,274 @@
#!/usr/bin/env python3
###
# Multi-Notifs v0.1
# by Zoey Mae @izalia@barkshark.xyz
#
# Installation:
# $ pip install aiohttp --user
###
import asyncio
import aiohttp
import os
import json
import re
import sys
import logging
import time
import http.client as http
from os.path import isfile, abspath, dirname
from urllib.parse import urlencode
from getpass import getpass
from os import environ as env
LOGDATE = env.get('LOGDATE').lower() if type(env.get('LOGDATE')) == str else ''
if LOGDATE in ['true', 'yes', 'y']:
time = '[%(asctime)s] '
else:
time = ''
logging.basicConfig(
level='INFO',
format=f'{time}%(levelname)s: %(message)s',
handlers=[logging.StreamHandler()],
datefmt = '%Y-%m-%d %H:%M:%S'
)
NAME = f'Simple Notifs'
NOTIF = '/data/data/com.termux/files/usr/bin/termux-notification'
CONF = f'{dirname(abspath(__file__))}/config.json'
droid = isfile(NOTIF)
HEADERS = {
'User-Agent': 'MerpNotifs 0.1',
'Accept-Encoding': 'identity',
'Accept': 'application/json'
}
def setup_config(config):
if isfile(config):
confjson = json.load(open(CONF))
else:
confjson = {}
appdata = {
'client_name': NAME,
'scopes': 'read:notifications',
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
'website': 'https://barkshark.xyz'
}
username = input('Username [user@example.com]: ')
#username = 'izalia@barkshark.xyz'
if username in confjson:
print('Already logged in')
return
email = input('Email: ')
#email = 'izalia@barkshark.xyz'
if '@' not in username or '@' not in email:
print('Invalid username or email. It has to be handle@domain.')
sys.exit()
password = getpass('Password: ')
user, domain = username.split('@')
app = create_app(domain, appdata)
if type(app) != dict or app.get('error'):
print(app)
sys.exit()
clientid = app['client_id']
secret = app['client_secret']
fetch_token = create_token(domain, clientid, secret, email, password, appdata)
if type(fetch_token) != dict or fetch_token.get('error'):
print(fetch_token)
sys.exit()
token = fetch_token['access_token']
confjson.update({
username: {
'domain': domain,
'token': token
}
})
with open(CONF, 'w') as jsondata:
jsondata.write(json.dumps(confjson, indent='\t'))
print(f'Logged into {username}. Rerun to start the app :3')
sys.exit()
def logout(username):
config = json.load(open(CONF))
if username in config:
del config[username]
with open(CONF, 'w') as jsonfile:
jsonfile.write(json.dumps(config, indent='\t'))
print(f'Logged out {username}')
else:
print(f'{username} was never logged in')
def create_app(domain, data):
conn = http.HTTPSConnection(domain)
conn.request('POST', '/api/v1/apps', urlencode(data), HEADERS)
try:
response = conn.getresponse().read()
return json.loads(response.decode('UTF-8'))
except Exception as e:
return f'error: {e}'
def create_token(domain, client_id, client_secret, username, password, appdata):
data = {
'grant_type': 'password',
'client_id': client_id,
'client_secret': client_secret,
'username': username,
'password': password,
'redirect_uri': appdata['redirect_uris'],
'scope': appdata['scopes']
}
conn = http.HTTPSConnection(domain)
conn.request('POST', '/oauth/token', urlencode(data), HEADERS)
try:
response = conn.getresponse().read()
return json.loads(response.decode('UTF-8'))
except Exception as e:
return f'error: {e}'
def strip(text):
clean = re.compile('<.*?>')
return re.sub(clean, '', text)
def display_notif(username, action, msg):
notmsg = {
'follow': 'followed you',
'favourite': 'faved your toot',
'reblog': 'boosted your toot',
'mention': 'merped at you'
}
if action not in notmsg:
return msg
account = msg['account']
message = f"{account['display_name']} ({account['acct']}) {notmsg[action]}\n"
if action != 'follow':
message += strip(msg['status'].get('content'))
#message = message.replace('"', '\"')
message = message.replace("'", "")
logging.info(f'{action} from {account["display_name"]}')
if droid:
CMDOPTS = f'--title="{NAME} - {username}" --content=\'{message}\''
URL = f'termux-open {HOSTURL}/web/notifications'
os.system(f'{NOTIF} --action "{URL}" {CMDOPTS}')
return
logging.info(message)
async def run(host, token, username):
session = aiohttp.ClientSession()
async with session.get(f'https://{host}/api/v1/streaming/user?access_token={token}', headers=HEADERS) as stream:
logging.info(f'Ready and waiting on events for {username}')
async for line in stream.content:
chunks = line.decode('utf-8').split('\n')
for chunk in chunks:
if chunk.startswith('data: '):
string = chunk
data = string.replace('data: ', '', 1)
try:
msg = json.loads(data)
except Exception as e:
msg = data
if type(msg) != dict:
pass
elif msg.get('type'):
display_notif(username, msg.get('type'), msg)
if __name__ == '__main__':
arg = sys.argv
if 'logout' in arg:
if len(arg) > 2:
for user in arg[2:]:
logout(user)
else:
print('You need to specify at least one user')
sys.exit()
if not isfile(CONF) or 'login' in arg:
setup_config(CONF)
config = json.load(open(CONF))
if len([user for user in json.load(open(CONF))]) < 1:
setup_config(CONF)
loop = asyncio.get_event_loop()
if len(arg) > 1:
users = []
for user in arg[1:]:
if user in config:
users.append(user)
else:
print(f'User not in config: {user}')
else:
users = [user for user in config]
for username in users:
user = config[username]
logging.info(f'Starting loop for {username}')
asyncio.ensure_future(run(user['domain'], user['token'], username))
try:
loop.run_forever()
except KeyboardInterrupt:
print('Bye!')
sys.exit()