write cache module and split up database.py
This commit is contained in:
parent
3605b256a2
commit
4d5a923701
|
@ -1,10 +1,13 @@
|
|||
|
||||
import aiohttp, json, re, sys, os, asyncio
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import re
|
||||
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from .database import *
|
||||
from .database import token_cache, cookie_cache, user_cache
|
||||
from .database import cache, get, put, update, delete
|
||||
from functions import json_error
|
||||
from config import config, logging
|
||||
|
||||
|
@ -143,8 +146,8 @@ class post_cmd:
|
|||
|
||||
|
||||
def cache(data):
|
||||
#return {'tokens': str(token_cache), 'cookies': str(cookie_cache), 'users': str(user_cache)}
|
||||
return {'cookies': cookie_cache.items, 'tokens': token_cache.items, 'users': user_cache.items}
|
||||
#return {'tokens': str(cache.token.items), 'cookies': str(cache.cookie.items)}
|
||||
return {'cookies': cache.cookie.items, 'tokens': cache.token.items, 'users': cache.user.items}
|
||||
|
||||
|
||||
#-----------------------
|
||||
|
|
|
@ -1,348 +0,0 @@
|
|||
import pg
|
||||
import uuid
|
||||
import sys
|
||||
import json
|
||||
|
||||
from datetime import datetime
|
||||
from DBUtils.PooledPg import PooledPg
|
||||
|
||||
from config import config, script_path, logging
|
||||
from functions import _hash, genkey
|
||||
from simplecache import TTLCache
|
||||
|
||||
|
||||
token_cache = TTLCache(maxsize=4096, ttl='1h')
|
||||
cookie_cache = TTLCache(maxsize=4096, ttl='1h')
|
||||
user_cache = TTLCache(maxsize=4096, ttl='1h')
|
||||
|
||||
DB_CONFIG = config['db']
|
||||
|
||||
|
||||
def basic_db_conn(db_name):
|
||||
if DB_CONFIG['host'] == None:
|
||||
return pg.DB(dbname=db_name, user=DB_CONFIG['user'], passwd=DB_CONFIG['password'])
|
||||
|
||||
else:
|
||||
return pg.DB(host=DB_CONFIG['host'], port=DB_CONFIG['port'], dbname=db_name, user=DB_CONFIG['user'], passwd=DB_CONFIG['password'])
|
||||
|
||||
|
||||
def db_conn():
|
||||
if DB_CONFIG['host'] == None:
|
||||
pool = PooledPg(DB_CONFIG['connections'], dbname=DB_CONFIG['database'], user=DB_CONFIG['user'], passwd=DB_CONFIG['password'])
|
||||
|
||||
else:
|
||||
pool = PooledPg(DB_CONFIG['connections'], host=DB_CONFIG['host'], port=DB_CONFIG['port'], dbname=DB_CONFIG['database'], user=DB_CONFIG['user'], passwd=DB_CONFIG['password'])
|
||||
|
||||
return pool.connection()
|
||||
|
||||
|
||||
def db_check():
|
||||
database = DB_CONFIG['database']
|
||||
pre_db = basic_db_conn('postgres')
|
||||
|
||||
if '--dropdb' in sys.argv:
|
||||
pre_db.query('SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = \'{}\';'.format(database))
|
||||
pre_db.query('DROP DATABASE {};'.format(database))
|
||||
|
||||
if database not in pre_db.get_databases():
|
||||
logging.info('Database doesn\'t exist. Creating it now...')
|
||||
|
||||
pre_db.query('CREATE DATABASE {} WITH TEMPLATE = template0;'.format(database))
|
||||
|
||||
db = basic_db_conn(database)
|
||||
database = open(script_path+'/dist/database.sql').read().replace('\t', '').replace('\n', '')
|
||||
db.query(database)
|
||||
|
||||
logging.info('Done :3')
|
||||
|
||||
pre_db.close()
|
||||
|
||||
|
||||
if '--skipdbcheck' not in sys.argv:
|
||||
db_check()
|
||||
|
||||
db = db_conn()
|
||||
|
||||
|
||||
def newtrans(funct):
|
||||
db.begin()
|
||||
result = funct
|
||||
db.end()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def check_token(token):
|
||||
fetch = token_cache.fetch(token)
|
||||
|
||||
if fetch != False:
|
||||
return fetch
|
||||
|
||||
try:
|
||||
raw_row = db.query('SELECT * FROM tokens WHERE token = \'{}\''.format(token)).dictresult()
|
||||
row = raw_row[0]
|
||||
|
||||
except IndexError:
|
||||
return False
|
||||
|
||||
token_cache.store(row['token'], row)
|
||||
|
||||
return row['userid']
|
||||
|
||||
|
||||
def check_cookie(cookie):
|
||||
fetch = cookie_cache.fetch(cookie)
|
||||
|
||||
if fetch != False:
|
||||
logging.debug('Returning cached cookie')
|
||||
return fetch
|
||||
|
||||
check = db.query('SELECT * FROM login_cookies WHERE cookie = \'{}\''.format(cookie)).dictresult()
|
||||
|
||||
if check == []:
|
||||
return False
|
||||
|
||||
else:
|
||||
logging.debug('Caching new cookie')
|
||||
cookie_cache.store(check[0]['cookie'],check[0])
|
||||
return check[0]
|
||||
|
||||
|
||||
def get_user(username, fields='*', ret_data=True):
|
||||
fetch = user_cache.fetch(username)
|
||||
|
||||
def filter_data(data, fields):
|
||||
if fields != '*':
|
||||
new_data = {}
|
||||
|
||||
for field in fields.split(','):
|
||||
if field != 'settings': ## Remove this line after fixing the DB
|
||||
new_data[field] = data[field]
|
||||
|
||||
return new_data
|
||||
|
||||
else:
|
||||
return data
|
||||
|
||||
if fetch != False:
|
||||
logging.debug('Returning cached user data')
|
||||
|
||||
return filter_data(fetch, fields)
|
||||
|
||||
if isinstance(username, int):
|
||||
check = db.query('SELECT * FROM users WHERE id = \'{}\''.format(username)).dictresult()
|
||||
|
||||
else:
|
||||
check = db.query('SELECT * FROM users WHERE handle = \'{}\''.format(username)).dictresult()
|
||||
|
||||
if check == []:
|
||||
return False
|
||||
|
||||
else:
|
||||
logging.debug('Saving user data to cache')
|
||||
user_cache.store(username, check[0])
|
||||
|
||||
if ret_data == True:
|
||||
return filter_data(user_cache.fetch(username), fields)
|
||||
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def first_setup():
|
||||
dbcheck = db.query('SELECT * FROM settings WHERE setting = \'setup\'').dictresult()
|
||||
|
||||
if dbcheck == []:
|
||||
keys = genkey()
|
||||
|
||||
settings = {
|
||||
'setup': True,
|
||||
'pubkey': keys['pubkey'],
|
||||
'privkey': keys['privkey'],
|
||||
'char_limit': 4096,
|
||||
'table_limit': 8,
|
||||
'name': 'Barkshark Social',
|
||||
'description': 'UwU',
|
||||
'theme': 'blue',
|
||||
'domain': config['domain']
|
||||
}
|
||||
|
||||
for key in settings:
|
||||
db.insert('settings', setting=key, val=settings[key])
|
||||
|
||||
logging.info('Database setup')
|
||||
|
||||
|
||||
def create_token(username, password):
|
||||
pass_hash = _hash(password)
|
||||
|
||||
|
||||
def create_cookie(userid, password, address, agent):
|
||||
timestamp = int(datetime.timestamp(datetime.now()))
|
||||
cookie = _hash(password+config['salt']+str(timestamp))
|
||||
|
||||
db.insert('login_cookies', userid=userid, cookie=cookie, timestamp=timestamp, address=address, agent=agent)
|
||||
|
||||
return cookie
|
||||
|
||||
|
||||
def delete_cookie(cookie):
|
||||
saved_cookie = check_cookie(cookie)
|
||||
|
||||
if saved_cookie != False and saved_cookie['cookie'] == cookie:
|
||||
db.delete('login_cookies', id=saved_cookie['id'])
|
||||
cookie_cache.pop(cookie)
|
||||
|
||||
return True
|
||||
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def create_user(handle, email, password, display, bio, table, sig):
|
||||
keys = genkey()
|
||||
|
||||
timestamp = int(datetime.timestamp(datetime.now()))
|
||||
token_string = str(timestamp) + email
|
||||
pass_hash = _hash(password+config['salt'])
|
||||
token = _hash(token_string)
|
||||
|
||||
try:
|
||||
user = db.insert('users', handle=handle.lower(), name=display, bio=bio, info_table=json.dumps(table), email=email, password=pass_hash, permissions=4, timestamp=timestamp, forum_sig=sig, pubkey=keys['pubkey'], privkey=keys['privkey'])
|
||||
|
||||
except pg.IntegrityError:
|
||||
db.end()
|
||||
return False
|
||||
|
||||
if user['id'] == 1:
|
||||
db.update('users', {'id': 1}, perms=0)
|
||||
|
||||
db.insert('tokens', userid=user['id'], appid=0, token=token)
|
||||
|
||||
return {'id': user['id'], 'token': token, 'password': pass_hash, 'username': user['handle'], 'name': user['name']}
|
||||
|
||||
|
||||
def settings():
|
||||
setresults = db.query('SELECT * FROM settings').dictresult()
|
||||
|
||||
set_dict = {}
|
||||
|
||||
for line in setresults:
|
||||
if line['setting'] not in ['pubkey', 'privkey']:
|
||||
set_dict.update({line['setting']: line['val']})
|
||||
|
||||
return set_dict
|
||||
|
||||
|
||||
def change_pass(password, token):
|
||||
pass_hash = _hash(password+config['salt'])
|
||||
|
||||
db.begin()
|
||||
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
db.update('users', {'id': user_id}, password=pass_hash)
|
||||
db.end()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def local_post(token, text, privacy, media_id, reply_id):
|
||||
timestamp = datetime.timestamp(datetime.now())
|
||||
post_id = int(_hash(str(timestamp) + text, alg='md5'), 16)
|
||||
|
||||
db.begin()
|
||||
|
||||
user_id = check_token(token)
|
||||
|
||||
db.insert('statuses', hash=post_id, userid=user_id, timestamp=int(timestamp), content=text, visibility=privacy)
|
||||
db.end()
|
||||
|
||||
return post_id
|
||||
|
||||
|
||||
def local_post2(posts):
|
||||
db.begin()
|
||||
|
||||
for post in posts:
|
||||
timestamp = datetime.timestamp(datetime.now())
|
||||
post_hash = int(_hash(str(timestamp) + post['text'], alg='sha256'), 16)
|
||||
user_id = check_token(post['token'])
|
||||
|
||||
db.insert('statuses', hash=post_hash, userid=user_id, timestamp=timestamp, content=post['text'], warning=post['warning'], visibility=post['privacy'])
|
||||
|
||||
db.end()
|
||||
|
||||
return 'Done'
|
||||
|
||||
|
||||
def get_posts(userid, postid, newtrans=False):
|
||||
if postid != None:
|
||||
page = 'and id < {}'.format(postid)
|
||||
|
||||
else:
|
||||
page = ''
|
||||
|
||||
posts = db.query('SELECT id, userid, content, visibility, mentions, timestamp, warning FROM statuses WHERE userid = {} {} ORDER BY id DESC limit {};'.format(userid, page, config['vars']['posts'])).dictresult()
|
||||
|
||||
return posts
|
||||
|
||||
|
||||
def get_profile(username, postid=None):
|
||||
try:
|
||||
row = db.query('SELECT * FROM users WHERE handle = \'{}\''.format(username)).dictresult()[0]
|
||||
|
||||
post_count = db.query('SELECT COUNT(*) FROM statuses WHERE userid={};'.format(row['id'])).dictresult()
|
||||
|
||||
row['posts'] = get_posts(row['id'], postid)
|
||||
row.update(post_count[0])
|
||||
|
||||
info = row['info_table']
|
||||
|
||||
if info != None:
|
||||
row['info_table'] = json.loads(info)
|
||||
|
||||
except IndexError:
|
||||
return False
|
||||
|
||||
return row
|
||||
|
||||
|
||||
def get_post(postid):
|
||||
post = db.query('SELECT * FROM statuses WHERE id = \'{}\''.format(postid)).dictresult()
|
||||
|
||||
if post == []:
|
||||
return False
|
||||
|
||||
user = db.query('SELECT * FROM users WHERE id= \'{}\''.format(post[0]['userid'])).dictresult()
|
||||
|
||||
return {'post': post[0], 'user': user[0]}
|
||||
|
||||
|
||||
def update_bio(token, bio):
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
return db.update('users', {'id': user_id}, bio=bio)
|
||||
|
||||
|
||||
def update_tables(token, bio):
|
||||
db.begin()
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
msg = db.update('users', {'id': user_id}, info_table=json.dumps(bio))
|
||||
db.end()
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
newtrans(first_setup())
|
||||
SETTINGS = settings()
|
103
backend/database/__init__.py
Normal file
103
backend/database/__init__.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
import pg
|
||||
import uuid
|
||||
import sys
|
||||
import json
|
||||
|
||||
from DBUtils.PooledPg import PooledPg
|
||||
|
||||
from config import config, script_path, logging
|
||||
from simplecache import TTLCache
|
||||
|
||||
|
||||
class cache:
|
||||
token = TTLCache(maxsize=4096, ttl='1h')
|
||||
cookie = TTLCache(maxsize=4096, ttl='1h')
|
||||
user = TTLCache(maxsize=4096, ttl='1h')
|
||||
|
||||
DB_CONFIG = config['db']
|
||||
|
||||
|
||||
def dbconn(database, pooled=True):
|
||||
extra = ", host=DB_CONFIG['host'], port=DB_CONFIG['port']" if DB_CONFIG['host'] != None else ''
|
||||
dbtype = "PooledPg(DB_CONFIG['connections'], " if pooled == True else "pg.DB("
|
||||
|
||||
dbsetup = eval(f"{dbtype}dbname='{database}', user=DB_CONFIG['user'], passwd=DB_CONFIG['password']{extra})")
|
||||
|
||||
return dbsetup.connection() if pooled == True else dbsetup
|
||||
|
||||
|
||||
def db_check():
|
||||
database = DB_CONFIG['database']
|
||||
pre_db = dbconn('postgres', pooled=False)
|
||||
|
||||
if '--dropdb' in sys.argv:
|
||||
pre_db.query(f'SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = \'{database}\';')
|
||||
pre_db.query(f'DROP DATABASE {database};')
|
||||
|
||||
if database not in pre_db.get_databases():
|
||||
logging.info('Database doesn\'t exist. Creating it now...')
|
||||
|
||||
pre_db.query(f'CREATE DATABASE {database} WITH TEMPLATE = template0;')
|
||||
|
||||
db_setup = dbconn(database, pooled=False)
|
||||
database = open(script_path+'/dist/database.sql').read().replace('\t', '').replace('\n', '')
|
||||
db_setup.query(database)
|
||||
db_setup.close()
|
||||
|
||||
logging.info('Done :3')
|
||||
|
||||
pre_db.close()
|
||||
|
||||
|
||||
if '--skipdbcheck' not in sys.argv:
|
||||
db_check()
|
||||
|
||||
db = dbconn(DB_CONFIG['database'])
|
||||
|
||||
|
||||
def newtrans(funct):
|
||||
db.begin()
|
||||
result = funct
|
||||
db.end()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def first_setup():
|
||||
dbcheck = db.query('SELECT * FROM settings WHERE setting = \'setup\'').dictresult()
|
||||
|
||||
if dbcheck == []:
|
||||
keys = genkey()
|
||||
|
||||
settings = {
|
||||
'setup': True,
|
||||
'pubkey': keys['pubkey'],
|
||||
'privkey': keys['privkey'],
|
||||
'char_limit': 4096,
|
||||
'table_limit': 8,
|
||||
'name': 'Barkshark Social',
|
||||
'description': 'UwU',
|
||||
'theme': 'blue',
|
||||
'domain': config['domain']
|
||||
}
|
||||
|
||||
for key in settings:
|
||||
db.insert('settings', setting=key, val=settings[key])
|
||||
|
||||
logging.info('Database setup')
|
||||
|
||||
|
||||
def settings():
|
||||
setresults = db.query('SELECT * FROM settings').dictresult()
|
||||
|
||||
set_dict = {}
|
||||
|
||||
for line in setresults:
|
||||
if line['setting'] not in ['pubkey', 'privkey']:
|
||||
set_dict.update({line['setting']: line['val']})
|
||||
|
||||
return set_dict
|
||||
|
||||
|
||||
newtrans(first_setup())
|
||||
SETTINGS = settings()
|
10
backend/database/delete.py
Normal file
10
backend/database/delete.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
from . import db, cache
|
||||
|
||||
from config import logging
|
||||
|
||||
|
||||
def login_cookie(cookie):
|
||||
db.delete('login_cookies', id=cookie)
|
||||
cache.cookie.invalidate(cookie)
|
||||
|
||||
return True
|
158
backend/database/get.py
Normal file
158
backend/database/get.py
Normal file
|
@ -0,0 +1,158 @@
|
|||
import json
|
||||
|
||||
from . import db, cache
|
||||
|
||||
from config import logging, config
|
||||
|
||||
|
||||
def api_token(token):
|
||||
fetch = cache.token.fetch(token)
|
||||
|
||||
if fetch != False:
|
||||
return fetch
|
||||
|
||||
raw_token = db.query(f'SELECT * FROM tokens WHERE token = \'{token}\'').dictresult()
|
||||
row = raw_row[0]
|
||||
|
||||
cache.token.store(row['token'], row)
|
||||
|
||||
return row['userid']
|
||||
|
||||
|
||||
def login_cookie(cookie):
|
||||
fetch = cache.cookie.fetch(cookie)
|
||||
|
||||
if fetch != None:
|
||||
logging.debug('Returning cached cookie')
|
||||
return fetch
|
||||
|
||||
raw_cookie = db.query(f'SELECT * FROM login_cookies WHERE cookie = \'{cookie}\'').dictresult()
|
||||
|
||||
if raw_cookie == []:
|
||||
return None
|
||||
|
||||
else:
|
||||
cookie = raw_cookie[0]
|
||||
|
||||
logging.debug('Caching new cookie')
|
||||
cache.cookie.store(cookie['cookie'], cookie)
|
||||
return cookie
|
||||
|
||||
|
||||
def post(postid):
|
||||
post = db.query(f'SELECT * FROM statuses WHERE id = \'{postid}\'').dictresult()
|
||||
|
||||
if post == []:
|
||||
return None
|
||||
|
||||
userid = post[0]['userid']
|
||||
|
||||
user = db.query(f'SELECT * FROM users WHERE id= \'{userid}\'').dictresult()
|
||||
|
||||
return {'post': post[0], 'user': user[0]}
|
||||
|
||||
|
||||
def posts(username, postid, newtrans=False):
|
||||
if postid != None:
|
||||
page = f'and id < {postid}'
|
||||
|
||||
else:
|
||||
page = ''
|
||||
|
||||
user_data = user(username)
|
||||
|
||||
if user_data == None:
|
||||
return None
|
||||
|
||||
postlimit = config['vars']['posts']
|
||||
|
||||
posts = db.query(f'''SELECT id, userid, content, visibility, mentions, timestamp, warning
|
||||
FROM statuses
|
||||
WHERE userid = {user_data['id']} {page}
|
||||
ORDER BY id DESC LIMIT {postlimit};''').dictresult()
|
||||
|
||||
return posts
|
||||
|
||||
|
||||
def userid_to_handle(userid):
|
||||
fetch = cache.user.fetch(userid)
|
||||
|
||||
if fetch != None:
|
||||
logging.debug('Returning cached user handle')
|
||||
|
||||
return fetch
|
||||
|
||||
user = db.query(f'SELECT handle FROM users WHERE id = {userid}').dictresult()
|
||||
|
||||
if user == []:
|
||||
return None
|
||||
|
||||
else:
|
||||
handle = user[0]['handle']
|
||||
|
||||
logging.debug('Saving user handle to the cache')
|
||||
cache.user.store(userid, handle)
|
||||
|
||||
return handle
|
||||
|
||||
|
||||
def user(username, fields='*'):
|
||||
def filter_data(data, fields):
|
||||
if data == None:
|
||||
logging.warning('Missing data for filtering.')
|
||||
return None
|
||||
|
||||
if fields != '*':
|
||||
new_data = {}
|
||||
|
||||
for field in fields.split(','):
|
||||
if field != 'settings': ## Remove this line after fixing the DB
|
||||
new_data[field] = data[field]
|
||||
|
||||
return new_data
|
||||
|
||||
else:
|
||||
return data
|
||||
|
||||
if isinstance(username, int):
|
||||
username = userid_to_handle(username)
|
||||
|
||||
fetch = cache.user.fetch(username)
|
||||
|
||||
if fetch != None:
|
||||
logging.debug('Returning cached user data')
|
||||
|
||||
return filter_data(fetch, fields)
|
||||
|
||||
raw_user_data = db.query(f'SELECT * FROM users WHERE handle = \'{username}\'').dictresult()
|
||||
|
||||
if raw_user_data == []:
|
||||
return None
|
||||
|
||||
else:
|
||||
user_data = raw_user_data[0]
|
||||
table = user_data['info_table']
|
||||
|
||||
if table != None:
|
||||
user_data['info_table'] = json.loads(table)
|
||||
|
||||
logging.debug('Saving user data to cache')
|
||||
cache.user.store(user_data['handle'], user_data)
|
||||
|
||||
return filter_data(cache.user.fetch(user_data['handle']), fields)
|
||||
|
||||
|
||||
def profile(handle, postid=None):
|
||||
user_data = user(handle)
|
||||
|
||||
if user_data == None:
|
||||
return None
|
||||
|
||||
userid = user_data['id']
|
||||
|
||||
post_count = db.query(f'SELECT COUNT(*) FROM statuses WHERE userid={userid};').dictresult()
|
||||
|
||||
user_data['posts'] = posts(handle, postid)
|
||||
user_data.update(post_count[0])
|
||||
|
||||
return user_data
|
84
backend/database/put.py
Normal file
84
backend/database/put.py
Normal file
|
@ -0,0 +1,84 @@
|
|||
from . import db
|
||||
|
||||
from config import logging
|
||||
from functions import mkhash, genkey, timestamp
|
||||
|
||||
|
||||
def user(handle, email, password, display, bio, table, sig):
|
||||
keys = genkey()
|
||||
|
||||
ts = timestamp()
|
||||
token_string = str(ts) + email
|
||||
pass_hash = _hash(password+config['salt'])
|
||||
token = _hash(token_string)
|
||||
|
||||
try:
|
||||
user = db.insert('users',
|
||||
handle=handle.lower(), name=display, bio=bio,
|
||||
info_table=json.dumps(table), email=email, password=pass_hash,
|
||||
permissions=4, timestamp=timestamp, forum_sig=sig,
|
||||
pubkey=keys['pubkey'], privkey=keys['privkey']
|
||||
)
|
||||
|
||||
except pg.IntegrityError:
|
||||
db.end()
|
||||
return False
|
||||
|
||||
if user['id'] == 1:
|
||||
db.update('users', {'id': 1}, perms=0)
|
||||
|
||||
db.insert('tokens', userid=user['id'], appid=0, token=token)
|
||||
|
||||
return {'id': user['id'], 'token': token, 'password': pass_hash, 'username': user['handle'], 'name': user['name']}
|
||||
|
||||
|
||||
def login_cookie(userid, password, address, agent):
|
||||
ts = timestamp()
|
||||
cookie = mkhash(password+config['salt']+str(ts))
|
||||
|
||||
db.insert('login_cookies',
|
||||
userid=userid, cookie=cookie, timestamp=ts,
|
||||
address=address, agent=agent
|
||||
)
|
||||
|
||||
return cookie
|
||||
|
||||
|
||||
def api_token(username, password):
|
||||
pass_hash = mkhash(password)
|
||||
|
||||
|
||||
def local_post(token, text, privacy, media_id, reply_id):
|
||||
ts = timestamp()
|
||||
post_id = int(mkhash(str(ts) + text, alg='md5'), 16)
|
||||
|
||||
db.begin()
|
||||
|
||||
user_id = check_token(token)
|
||||
|
||||
db.insert('statuses',
|
||||
hash=post_id, userid=user_id, timestamp=int(ts),
|
||||
content=text, visibility=privacy
|
||||
)
|
||||
|
||||
db.end()
|
||||
|
||||
return post_id
|
||||
|
||||
|
||||
def local_post2(posts):
|
||||
db.begin()
|
||||
|
||||
for post in posts:
|
||||
ts = timestamp()
|
||||
post_hash = int(mkhash(str(ts) + post['text'], alg='sha256'), 16)
|
||||
user_id = check_token(post['token'])
|
||||
|
||||
db.insert('statuses',
|
||||
hash=post_hash, userid=user_id, timestamp=st,
|
||||
content=post['text'], warning=post['warning'], visibility=post['privacy']
|
||||
)
|
||||
|
||||
db.end()
|
||||
|
||||
return 'Done'
|
40
backend/database/update.py
Normal file
40
backend/database/update.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from . import db
|
||||
|
||||
from config import logging
|
||||
from functions import mkhash
|
||||
|
||||
'''
|
||||
Literally all of these use a login token as an identifier instead of user handle. fuck
|
||||
'''
|
||||
|
||||
def password(handle, password):
|
||||
pass_hash = mkhash(password+config['salt'])
|
||||
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
db.update('users', {'id': user_id}, password=pass_hash)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def bio(handle, bio):
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
return db.update('users', {'id': user_id}, bio=bio)
|
||||
|
||||
|
||||
def tables(handle, bio):
|
||||
user_id = check_token(token)
|
||||
|
||||
if user_id == False:
|
||||
return False
|
||||
|
||||
msg = db.update('users', {'id': user_id}, info_table=json.dumps(bio))
|
||||
|
||||
return msg
|
|
@ -16,7 +16,7 @@ from aiohttp_session.redis_storage import RedisStorage
|
|||
|
||||
from config import config, logging
|
||||
from functions import json_error
|
||||
from .database import newtrans, check_token, check_cookie, get_user
|
||||
from .database import SETTINGS, newtrans, get
|
||||
|
||||
|
||||
def fetch_actor(actor):
|
||||
|
@ -196,7 +196,7 @@ async def http_auth(app, handler):
|
|||
json_error(401, 'MissingTokenHeader')
|
||||
|
||||
#if pass_hash() != token:
|
||||
db_token = newtrans(check_token(token))
|
||||
db_token = newtrans(get.api_token(token))
|
||||
|
||||
if db_token == False:
|
||||
json_error(401, 'InvalidToken')
|
||||
|
@ -204,18 +204,18 @@ async def http_auth(app, handler):
|
|||
else:
|
||||
pass
|
||||
|
||||
cookie = request.cookies.get('login_token')
|
||||
cookie_val = newtrans(check_cookie(cookie))
|
||||
login_token = request.cookies.get('login_token')
|
||||
login_token_val = newtrans(get.login_cookie(login_token))
|
||||
|
||||
if any(map(request.path.startswith, cookie_include_paths)):
|
||||
if cookie == None:
|
||||
if login_token == None:
|
||||
return aiohttp.web.HTTPFound('/login')
|
||||
|
||||
elif cookie_val == False:
|
||||
elif login_token_val == None:
|
||||
return aiohttp.web.HTTPFound('/login?msg=InvalidToken')
|
||||
|
||||
elif any(map(request.path.startswith, ['/login', '/register'])) and cookie_val not in [False, None]:
|
||||
if cookie == cookie_val['cookie']:
|
||||
elif any(map(request.path.startswith, ['/login', '/register'])) and login_token_val != None:
|
||||
if login_token == login_token_val['cookie']:
|
||||
return aiohttp.web.HTTPFound('/welcome')
|
||||
|
||||
return (await handler(request))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import aiohttp
|
||||
|
||||
from config import config
|
||||
from .database import SETTINGS, newtrans, get_user
|
||||
from .database import SETTINGS, newtrans, get
|
||||
|
||||
async def nodeinfo_json(request):
|
||||
data = {
|
||||
|
@ -60,7 +60,7 @@ async def webfinger_get(request):
|
|||
resource = query['resource'].split('@')
|
||||
user = resource[0]
|
||||
|
||||
if resource[1] == SETTINGS['domain'] and newtrans(get_user(user, ret_data=False)) != False:
|
||||
if resource[1] == SETTINGS['domain'] and newtrans(get.user(user)) != None:
|
||||
data = {
|
||||
'subject': 'acct:izaliamae@barkshark.tk',
|
||||
'aliases': [
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{% set token = newtrans(check_cookie(data.login_token)) %}
|
||||
{% if check != False %}
|
||||
{% set user_login = newtrans(get_user(token.userid, fields='handle,name,permissions')) %}
|
||||
{% if data.login_token != None %}
|
||||
{% set cookie = newtrans(get_cookie(data.login_token)) %}
|
||||
{% set user_data = newtrans(get_user(cookie.userid, fields='handle,name,permissions')) %}
|
||||
{% endif %}
|
||||
|
||||
{% set colors %}
|
||||
|
@ -13,6 +13,7 @@
|
|||
</details>
|
||||
</div>
|
||||
{% endset %}
|
||||
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
|
@ -29,22 +30,32 @@
|
|||
function delete_cookie(name) {
|
||||
document.cookie = name + '=; expires=Thu, 01 Jan 1970 00:00:01 GMT;';
|
||||
}
|
||||
|
||||
function reloadCss() {
|
||||
var links = document.getElementsByTagName("link");
|
||||
for (var cl in links) {
|
||||
var link = links[cl];
|
||||
if (link.rel === "stylesheet")
|
||||
link.href += "";
|
||||
}
|
||||
}
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
<div id="user_panel" class="menu">
|
||||
{% if token != False %}
|
||||
{% if user_data != None %}
|
||||
<details>
|
||||
<summary id="menu_title">{{user_login.name}}</summary>
|
||||
<div class="item"><a href="https://{{domain}}/welcome">Home</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/@{{user_login.handle}}">Profile</a></div>
|
||||
<summary id="menu_title">{{user_data.name}}</summary>
|
||||
<div class="item"><a href="https://{{domain}}/">Home</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/welcome">User Panel</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/@{{user_data.handle}}">Profile</a></div>
|
||||
<div class="submenu">
|
||||
<details>
|
||||
<summary class="item"><a class="text">Settings</a></summary>
|
||||
<div class="item"><a href="https://{{domain}}/settings#profile">Profile</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/settings#options">Options</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/settings#security">Security</a></div>
|
||||
{% if user_login.permissions < 2 %}<div class="item"><a href="https://{{domain}}/admin">Admin</a></div>{% endif %}
|
||||
{% if user_data.permissions < 2 %}<div class="item"><a href="https://{{domain}}/admin">Admin</a></div>{% endif %}
|
||||
</details>
|
||||
</div>
|
||||
{{colors}}
|
||||
|
@ -53,6 +64,7 @@
|
|||
{% else %}
|
||||
<details>
|
||||
<summary id="menu_title">Guest</summary>
|
||||
<div class="item"><a href="https://{{domain}}/">Home</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/login">Login</a></div>
|
||||
<div class="item"><a href="https://{{domain}}/register">Register</a></div>
|
||||
{{colors}}
|
||||
|
@ -60,18 +72,22 @@
|
|||
{% endif %}
|
||||
</div>
|
||||
|
||||
<div id="header">
|
||||
<h1 id="title"><a href="https://{{domain}}">{{name}}</a></h1>
|
||||
</div>
|
||||
|
||||
<div id="content">
|
||||
<center><h1 id="title"><a href="https://{{domain}}">{{name}}</a></h1></center>
|
||||
{% block content %}{% endblock %}
|
||||
<div id="footer">
|
||||
<table>
|
||||
<tr>
|
||||
<td class="col1"></td>
|
||||
<td class="col2">
|
||||
<a href="https://git.barkshark.tk/izaliamae/social">Barkshark Social</a>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
{% block content %}{% endblock %}
|
||||
<div id="footer">
|
||||
<table>
|
||||
<tr>
|
||||
<td class="col1"></td>
|
||||
<td class="col2">
|
||||
<a href="https://git.barkshark.tk/izaliamae/social">Barkshark Social</a>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
|
|
|
@ -17,7 +17,7 @@ a {
|
|||
}
|
||||
|
||||
input, textarea {
|
||||
margin-top: 15px;
|
||||
margin: 15px 0;
|
||||
font-size: 14pt;
|
||||
padding: 10px;
|
||||
}
|
||||
|
|
|
@ -9,7 +9,9 @@
|
|||
<form>
|
||||
<div class="grid-container">
|
||||
<div class="grid-item"><center>
|
||||
Display Name<br>
|
||||
<input type="text" name="display" placeholder="Display name" value="{{user.name}}"><br>
|
||||
Username<br>
|
||||
<input type="text" name="handle" placeholder="Username" value="{{user.handle}}" disabled>
|
||||
</center></div>
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import aiohttp, aiohttp_jinja2, json
|
|||
|
||||
from config import config, logging
|
||||
from functions import http_error, json_error
|
||||
from backend.database import SETTINGS, check_cookie, get_user
|
||||
from backend.database import SETTINGS, newtrans, get
|
||||
|
||||
# user home page (/welcome)
|
||||
async def welcome_get(request):
|
||||
|
@ -15,10 +15,13 @@ async def settings_get(request):
|
|||
login_token = request.cookies.get('login_token')
|
||||
data = {'login_token': login_token}
|
||||
|
||||
token = check_cookie(login_token)
|
||||
token = newtrans(get.login_cookie(login_token))
|
||||
handle = newtrans(get.userid_to_handle(token['userid']))
|
||||
|
||||
if token != False:
|
||||
user = get_user(token['userid'], fields='handle,name,email,bio,info_table,permissions,settings')
|
||||
if token == None:
|
||||
aiohttp.web.HTTPFound('/login?msg=InvalidToken')
|
||||
|
||||
user = newtrans(get.user(handle, fields='handle,name,email,bio,info_table,permissions,settings'))
|
||||
|
||||
return aiohttp_jinja2.render_template('pages/user/settings.html', request, {'data': data, 'user': user, 'settings': SETTINGS})
|
||||
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
import aiohttp, aiohttp_jinja2, json
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from config import config, logging
|
||||
from functions import color_css, http_error, json_error, _hash
|
||||
from backend.database import SETTINGS, newtrans, cookie_cache, get_profile, get_posts, get_post, create_cookie, get_user, delete_cookie, create_user
|
||||
from functions import color_css, http_error, json_error, mkhash, timestamp
|
||||
from backend.database import SETTINGS, newtrans, get, put, delete
|
||||
|
||||
|
||||
# home page (/)
|
||||
|
@ -18,6 +16,7 @@ async def register_get(request):
|
|||
query = request.rel_url.query
|
||||
data = {'login_token': request.cookies.get('login_token')}
|
||||
|
||||
# This can probably be optimized
|
||||
if 'msg' in query:
|
||||
message = query['msg']
|
||||
|
||||
|
@ -53,14 +52,14 @@ async def register_post(request):
|
|||
if None in [username, password]:
|
||||
aiohttp.web.HTTPFound('/register?msg=MissingData')
|
||||
|
||||
user_check = newtrans(get_user(username))
|
||||
user_check = newtrans(get.user(username))
|
||||
|
||||
if user_check != False:
|
||||
if user_check != None:
|
||||
aiohttp.web.HTTPFound('/register?msg=UserExists')
|
||||
|
||||
user = newtrans(create_user(username, email, password, name, bio, info_table, sig))
|
||||
user = newtrans(put.user(username, email, password, name, bio, info_table, sig))
|
||||
|
||||
token = newtrans(create_cookie(user['id'], user['password'], address, agent))
|
||||
token = newtrans(put.login_token(user['id'], user['password'], address, agent))
|
||||
|
||||
response = aiohttp.web.HTTPFound('/login')
|
||||
response.set_cookie('login_token', token, max_age=60*60*24*14)
|
||||
|
@ -104,16 +103,16 @@ async def login_post(request):
|
|||
if '' in [username, password, address] or address == None:
|
||||
return http_error(400, request)
|
||||
|
||||
userid = newtrans(get_user(username.lower()))
|
||||
pass_hash = _hash(password+config['salt'])
|
||||
userid = newtrans(get.user(username.lower()))
|
||||
pass_hash = mkhash(password+config['salt'])
|
||||
|
||||
if userid != False:
|
||||
if userid != None:
|
||||
if userid['password'] == pass_hash:
|
||||
token = newtrans(create_cookie(userid['id'], password, address, agent))
|
||||
login_token = newtrans(put.login_token(userid['id'], password, address, agent))
|
||||
response = aiohttp.web.HTTPFound('/welcome')
|
||||
|
||||
# Send login token. Lasts for 2 weeks (60 sec * 60 min * 24 hour * 14 day)
|
||||
response.set_cookie('login_token', token, max_age=60*60*24*14)
|
||||
response.set_cookie('login_token', login_token, max_age=60*60*24*14)
|
||||
|
||||
return response
|
||||
|
||||
|
@ -127,7 +126,7 @@ async def logout_get(request):
|
|||
response = aiohttp.web.HTTPFound('/login?msg=LoggedOut')
|
||||
|
||||
if token != None:
|
||||
newtrans(delete_cookie(token))
|
||||
newtrans(delete.cookie(token))
|
||||
|
||||
response.set_cookie('login_token', token, max_age=0)
|
||||
|
||||
|
@ -138,15 +137,13 @@ async def logout_get(request):
|
|||
async def user_get(request):
|
||||
user = request.match_info['user']
|
||||
postid = request.rel_url.query.get('id')
|
||||
user_data = newtrans(get.profile(user, postid=postid))
|
||||
|
||||
postid = postid if postid != None else None
|
||||
data = newtrans(get_profile(user, postid=postid))
|
||||
|
||||
if data == False:
|
||||
return http_error(404, request)
|
||||
if user_data == None:
|
||||
return http_error(404, request, msg='That user doesn\'t exist.')
|
||||
|
||||
try:
|
||||
posts = data['posts']
|
||||
posts = user_data['posts']
|
||||
if len(posts) < config['vars']['posts']:
|
||||
last_id = None
|
||||
|
||||
|
@ -156,16 +153,21 @@ async def user_get(request):
|
|||
except IndexError:
|
||||
last_id = None
|
||||
|
||||
data['last_id'] = last_id
|
||||
user_data['last_id'] = last_id
|
||||
bio = []
|
||||
|
||||
for line in data['bio'].split('\n'):
|
||||
bio.append(line)
|
||||
if isinstance(user_data['bio'], list):
|
||||
for line in user_data['bio']:
|
||||
bio.append(f'{line}\n')
|
||||
|
||||
data['bio'] = bio
|
||||
data['login_token'] = request.cookies.get('login_token')
|
||||
else:
|
||||
for line in user_data['bio'].split('\n'):
|
||||
bio.append(line)
|
||||
|
||||
return aiohttp_jinja2.render_template('pages/public/profile.html', request, {'data': data})
|
||||
user_data['bio'] = bio
|
||||
user_data['login_token'] = request.cookies.get('login_token')
|
||||
|
||||
return aiohttp_jinja2.render_template('pages/public/profile.html', request, {'data': user_data})
|
||||
|
||||
|
||||
# single post
|
||||
|
@ -173,11 +175,11 @@ async def user_post_get(request):
|
|||
user = request.match_info['user']
|
||||
postid = request.match_info['postid']
|
||||
|
||||
data = get_post(postid)
|
||||
post = get.post(postid)
|
||||
|
||||
if data == False:
|
||||
return http_error(404, request)
|
||||
if post == None:
|
||||
return http_error(404, request, msg='That post doesn\'t exist.')
|
||||
|
||||
data['data'] = {'login_token': request.cookies.get('login_token')}
|
||||
post['data'] = {'login_token': request.cookies.get('login_token')}
|
||||
|
||||
return aiohttp_jinja2.render_template('pages/public/post.html', request, data)
|
||||
return aiohttp_jinja2.render_template('pages/public/post.html', request, post)
|
||||
|
|
150
functions.py
150
functions.py
|
@ -3,6 +3,7 @@ import aiohttp_jinja2
|
|||
import json
|
||||
import yaml
|
||||
import os
|
||||
import re
|
||||
|
||||
from colour import Color
|
||||
from datetime import datetime
|
||||
|
@ -60,7 +61,7 @@ def json_error(code, error):
|
|||
raise eval('aiohttp.web.HTTP'+error_codes[code]+'(body=error_body, content_type=cont_type)')
|
||||
|
||||
|
||||
def http_error(code, request):
|
||||
def http_error(code, request, msg=None):
|
||||
cont_type = 'text/html'
|
||||
data = {'login_token': request.cookies.get('login_token')}
|
||||
|
||||
|
@ -70,12 +71,12 @@ def http_error(code, request):
|
|||
body = aiohttp_jinja2.render_template('errors/500.html', request, {}, status=500)
|
||||
|
||||
else:
|
||||
body = aiohttp_jinja2.render_template('errors/'+str(code)+'.html', request, {'data': data}, status=code)
|
||||
body = aiohttp_jinja2.render_template('errors/'+str(code)+'.html', request, {'data': data, msg: msg}, status=code)
|
||||
|
||||
return body
|
||||
|
||||
|
||||
def _hash(string, alg='sha512'):
|
||||
def mkhash(string, alg='sha512'):
|
||||
if alg == 'sha512':
|
||||
return SHA512.new(string.encode('UTF-8')).hexdigest()
|
||||
|
||||
|
@ -96,95 +97,8 @@ def genkey():
|
|||
return {'pubkey': pubkey, 'privkey': privkey}
|
||||
|
||||
|
||||
def color_check(color):
|
||||
return Color(color if color.startswith('#') else '#'+str(color))
|
||||
|
||||
|
||||
def lighten(color, multiplier):
|
||||
hsl = color_check(color)
|
||||
|
||||
if multiplier >= 1:
|
||||
multiplier = 1
|
||||
|
||||
elif multiplier <= 0:
|
||||
multiplier = 0
|
||||
|
||||
try:
|
||||
hsl.luminance += ((1 - hsl.luminance) * multiplier)
|
||||
return hsl.hex_l
|
||||
|
||||
except ValueError:
|
||||
hsl.luminance = 0.999
|
||||
return hsl.hex_l
|
||||
|
||||
def darken(color, multiplier):
|
||||
hsl = color_check(color)
|
||||
|
||||
if multiplier >= 1:
|
||||
multiplier = 1
|
||||
|
||||
elif multiplier <= 0:
|
||||
multiplier = 0
|
||||
|
||||
try:
|
||||
hsl.luminance -= (multiplier * hsl.luminance)
|
||||
return hsl.hex_l
|
||||
|
||||
except ValueError:
|
||||
hsl.luminance = 0.001
|
||||
return hsl.hex_l
|
||||
|
||||
|
||||
def saturate(color, multiplier):
|
||||
hsl = color_check(color)
|
||||
|
||||
if multiplier >= 1:
|
||||
multiplier = 1
|
||||
|
||||
elif multiplier <= 0:
|
||||
multiplier = 0
|
||||
|
||||
try:
|
||||
hsl.saturation += ((1 - hsl.saturation) * multiplier)
|
||||
return hsl.hex_l
|
||||
|
||||
except ValueError:
|
||||
hsl.saturation = 0.999
|
||||
return hsl.hex_l
|
||||
|
||||
|
||||
def desaturate(color, multiplier):
|
||||
hsl = color_check(color)
|
||||
|
||||
if multiplier >= 1:
|
||||
multiplier = 1
|
||||
|
||||
elif multiplier <= 0:
|
||||
multiplier = 0
|
||||
|
||||
try:
|
||||
hsl.saturation -= (multiplier * hsl.saturation)
|
||||
return hsl.hex_l
|
||||
|
||||
except ValueError:
|
||||
hsl.saturation = 0.001
|
||||
return hsl.hex_l
|
||||
|
||||
|
||||
def rgba(color, trans):
|
||||
col = color_check(color)
|
||||
|
||||
if trans >= 1:
|
||||
trans = 1
|
||||
|
||||
elif trans <= 0:
|
||||
trans = 0
|
||||
|
||||
return 'rgba({:0.2f}, {:0.2f}, {:0.2f}, {:0.2f})'.format(col.red*255, col.green*255, col.blue*255, trans)
|
||||
|
||||
|
||||
def todate(ts):
|
||||
return datetime.fromtimestamp(ts).strftime('%Y/%m/%d %H:%M:%S')
|
||||
return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M')
|
||||
|
||||
|
||||
def themes():
|
||||
|
@ -198,6 +112,12 @@ def themes():
|
|||
return theme_list
|
||||
|
||||
|
||||
def timestamp(integer=True):
|
||||
ts = datetime.timestamp(datetime.now())
|
||||
|
||||
return int(ts) if integer == True else ts
|
||||
|
||||
|
||||
# Generate css file for color styling
|
||||
def color_css(theme):
|
||||
try:
|
||||
|
@ -215,6 +135,52 @@ def color_css(theme):
|
|||
return colors
|
||||
|
||||
|
||||
class color:
|
||||
def __init__(self):
|
||||
self.check = lambda color: Color(f'#{str(color)}' if re.search(r'^(?:[0-9a-fA-F]{3}){1,2}$', color) else color)
|
||||
|
||||
def multi(self, multiplier):
|
||||
if multiplier >= 1:
|
||||
return 1
|
||||
|
||||
elif multiplier <= 0:
|
||||
return 0
|
||||
|
||||
return multiplier
|
||||
|
||||
def lighten(self, color, multiplier):
|
||||
col = self.check(color)
|
||||
col.luminance += ((1 - col.luminance) * self.multi(multiplier))
|
||||
|
||||
return col.hex_l
|
||||
|
||||
def darken(self, color, multiplier):
|
||||
col = self.check(color)
|
||||
col.luminance -= (col.luminance * self.multi(multiplier))
|
||||
|
||||
return col.hex_l
|
||||
|
||||
|
||||
__all__ = ['boolean', 'json_error', 'http_error', '_hash', 'genkey', 'lighten', 'darken', 'saturate', 'desaturate', 'rgba', 'todate', 'themes', 'color_css']
|
||||
def saturate(self, color, multiplier):
|
||||
col = self.check(color)
|
||||
col.saturation += ((1 - col.saturation) * self.multi(multiplier))
|
||||
|
||||
return col.hex_l
|
||||
|
||||
|
||||
def desaturate(self, color, multiplier):
|
||||
col = self.check(color)
|
||||
col.saturation -= (col.saturation * self.multi(multiplier))
|
||||
|
||||
return col.hex_l
|
||||
|
||||
|
||||
def rgba(self, color, transparency):
|
||||
col = self.check(color)
|
||||
|
||||
red = col.red*255
|
||||
green = col.green*255
|
||||
blue = col.blue*255
|
||||
trans = self.multi(transparency)
|
||||
|
||||
return f'rgba({red:0.2f}, {green:0.2f}, {blue:0.2f}, {trans:0.2f})'
|
||||
|
|
20
routes.py
20
routes.py
|
@ -8,9 +8,9 @@ from jinja2 import FileSystemLoader, select_autoescape
|
|||
from config import config, logging
|
||||
|
||||
from backend import mastodon_api, api, middleware, wellknown
|
||||
from backend.database import SETTINGS, check_cookie, get_user, newtrans
|
||||
from backend.database import SETTINGS, newtrans, get
|
||||
from frontend import views, resources, user, redirects
|
||||
from functions import *
|
||||
from functions import color, todate, themes
|
||||
from frontend.template_loader import CustomLoader
|
||||
|
||||
|
||||
|
@ -18,14 +18,14 @@ async def glob_vars(request):
|
|||
return {
|
||||
'name': SETTINGS['name'],
|
||||
'domain': config['web_domain'],
|
||||
'check_cookie': check_cookie,
|
||||
'get_user': get_user,
|
||||
'get_cookie': get.login_cookie,
|
||||
'get_user': get.user,
|
||||
'newtrans': newtrans,
|
||||
'lighten': lighten,
|
||||
'darken': darken,
|
||||
'saturate': saturate,
|
||||
'desaturate': desaturate,
|
||||
'rgba': rgba,
|
||||
'lighten': color().lighten,
|
||||
'darken': color().darken,
|
||||
'saturate': color().saturate,
|
||||
'desaturate': color().desaturate,
|
||||
'rgba': color().rgba,
|
||||
'todate': todate,
|
||||
'themes': themes
|
||||
}
|
||||
|
@ -42,6 +42,8 @@ aiohttp_jinja2.setup(web,
|
|||
loader=CustomLoader('frontend/templates'),
|
||||
autoescape=select_autoescape(['html', 'xml', 'css']),
|
||||
context_processors=[glob_vars],
|
||||
lstrip_blocks=True,
|
||||
trim_blocks=True
|
||||
)
|
||||
|
||||
env = aiohttp_jinja2.get_env(web)
|
||||
|
|
|
@ -1,19 +1,42 @@
|
|||
'''
|
||||
Original idea by Zoey Mae
|
||||
Improved by Xiretza (https://gitlab.com/xiretza)
|
||||
'''
|
||||
import re
|
||||
|
||||
from datetime import datetime
|
||||
from collections import OrderedDict
|
||||
|
||||
from config import logging
|
||||
|
||||
|
||||
def parse_ttl(ttl):
|
||||
units = {'s': 1, 'm': 60, 'h': 3600, 'd': 21600, 'd': 1512000}
|
||||
strip = True
|
||||
m = re.match(r'^(\d+)([smhdw]?)$', ttl)
|
||||
|
||||
if ttl[-1:] in units:
|
||||
multiplier = units[ttl[-1:]]
|
||||
if not m:
|
||||
logging.warning(f'Invalid TTL: {ttl}. Setting to default: 1h')
|
||||
amount = 1
|
||||
unit = 'h'
|
||||
|
||||
else:
|
||||
amount = m.group(1)
|
||||
unit = m.group(2)
|
||||
|
||||
units = {
|
||||
's': 1,
|
||||
'm': 60,
|
||||
'h': 60 * 60,
|
||||
'd': 24 * 60 * 60,
|
||||
'w': 7 * 24 * 60 * 60,
|
||||
}
|
||||
|
||||
if unit:
|
||||
multiplier = units[unit]
|
||||
|
||||
else:
|
||||
multiplier = 1
|
||||
strip = False
|
||||
|
||||
return multiplier * int(ttl if strip == False else ttl[:-1])
|
||||
return multiplier * int(amount)
|
||||
|
||||
|
||||
class TTLCache:
|
||||
|
@ -23,26 +46,7 @@ class TTLCache:
|
|||
self.maxsize = maxsize
|
||||
|
||||
|
||||
def store(self, key, value):
|
||||
while True:
|
||||
if len(self.items) >= self.maxsize:
|
||||
self.items.popitem(last=False)
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
if key not in self.items:
|
||||
timestamp = int(datetime.timestamp(datetime.now()))
|
||||
data = {'timestamp': timestamp + self.ttl, 'data': value}
|
||||
|
||||
self.items[key] = data
|
||||
|
||||
self.items.move_to_end(key)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def fetch(self, key):
|
||||
def __contains__(self, key):
|
||||
if key in self.items:
|
||||
timestamp = int(datetime.timestamp(datetime.now()))
|
||||
|
||||
|
@ -50,16 +54,37 @@ class TTLCache:
|
|||
del self.items[key]
|
||||
|
||||
else:
|
||||
self.items[key]['timestamp'] = timestamp + self.ttl
|
||||
self.items.move_to_end(key)
|
||||
|
||||
return self.items[key]['data']
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def pop(self, key):
|
||||
if self.items.get(key) != None:
|
||||
def invalidate(self, key):
|
||||
if key in self.items:
|
||||
del self.items[key]
|
||||
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def store(self, key, value):
|
||||
while len(self.items) >= self.maxsize:
|
||||
self.items.popitem(last=False)
|
||||
|
||||
if (key in self.items) == False:
|
||||
timestamp = int(datetime.timestamp(datetime.now()))
|
||||
data = {'data': value, 'timestamp': timestamp + self.ttl}
|
||||
self.items[key] = data
|
||||
|
||||
self.items.move_to_end(key)
|
||||
|
||||
|
||||
def fetch(self, key):
|
||||
if key in self:
|
||||
return self.items[key]['data']
|
||||
|
||||
return None
|
||||
|
|
Loading…
Reference in a new issue