forgot to add some files

This commit is contained in:
Izalia Mae 2021-08-22 18:06:20 -04:00
parent 749b1f3619
commit 25a6d1454a
11 changed files with 548 additions and 0 deletions

View file

@ -0,0 +1,102 @@
import sys
from functools import partial
from izzylib import DotDict, boolean, logging
from izzylib.sql import OperationalError, SqlDatabase, SqlSession, SqlColumn as Col
from .get_func import Get
from .put_func import Put
from .rem_func import Rem
from ..config import dbconfig
version = 20210821
tables = DotDict(
config = [
Col('id'),
Col('key', 'text', nullable=False),
Col('value', 'text', nullable=False)
],
user = [
Col('id'),
Col('handle', 'text', nullable=False),
Col('display', 'text'),
Col('password', 'text', nullable=False),
Col('level', 'integer', default=10),
Col('config', 'json', default={})
],
browser = [
Col('id'),
Col('userid', 'integer', nullable=False, fkey='user.id'),
Col('uuid', 'text', nullable=False),
Col('useragent', 'text'),
Col('address', 'text'),
Col('pubkey', nullable=False),
Col('last_seen', 'datetime'),
Col('timestamp')
],
action = [
Col('id'),
Col('action', 'text', nullable=False),
Col('data', 'json', nullable=False),
Col('userid', 'integer', nullable=False, fkey='user.id'),
Col('browserid', 'integer', nullable=False, fkey='browser.id'),
Col('timestamp')
],
token = [
Col('id'),
Col('code', 'text', nullable=False),
Col('userid', 'integer', nullable=False, fkey='user.id'),
Col('last_seen', 'datetime'),
Col('timestamp')
],
fedi_acct = [
Col('id'),
Col('userid', 'text', nullable=False, fkey='user.id'),
Col('handle', 'text', nullable=False),
Col('domain', 'text', nullable=False),
Col('token', 'text', nullable=False),
Col('last_updated', 'datetime'),
Col('timestamp')
]
)
class Database(SqlDatabase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, session_class=Session)
self.get = partial(self.session_func, 'get')
self.put = partial(self.session_func, 'put')
self.rem = partial(self.session_func, 'rem')
def session_func(self, action, func, *args, **kwargs):
with db.session as s:
return s[action][func](*args, **kwargs)
class Session(SqlSession):
config_defaults = dict(
version = (0, int),
enable_registration = (True, boolean)
)
# Don't need this yet
config_store_func = dict()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.get = Get(self)
self.put = Put(self)
self.rem = Rem(self)
def __getitem__(self, key):
return getattr(self, key)
db = Database(tables=tables, **dbconfig)

View file

@ -0,0 +1,117 @@
from datetime import datetime, timedelta
from izzylib.hasher import PasswordHasher
hasher = PasswordHasher(iterations=64)
class SessionFunctionBase:
def __init__(self, session):
self.session = session
self.database = session.database
self.cache = self.database.cache
self.defaults = session.config_defaults
# aliases
self.s = session
self.db = session.database
def __getitem__(self, key):
return getattr(self, key)
class ItemBase:
def __init__(self, db, row):
self.db = db
self.row = row
for k,v in row.items():
setattr(self, k, v)
def __getitem__(self, key):
return self.row[key]
def __setitem__(self, key, value):
self.row[key]
self.row[key] = value
class Token(ItemBase):
_user = None
def get_cookie_data(self, **kwargs):
return DotDict(
expires = kwargs.get('expires', self.last_access+timedelta(weeks=2)),
samesite = kwargs.get('samesite', 'strict'),
https = kwargs.get('https', True),
secure = kwargs.get('secure', True)
)
def set_cookie_data(self, response, **kwargs):
data = self.get_cookie_data()
if response.__class__.__name__ == 'HTTPResponse':
response.cookies['token'] = self.code
response.cookies['token']['expires'] = data.expires
response.cookies['token']['samesite'] = data.samesite
response.cookies['token']['https'] = data.https
response.cookies['token']['secure'] = data.secure
else:
response.set_cookie('token', self.code, **data)
def save_access_time(self):
with self.db.session as s:
self.save_access_time_session(s)
def save_access_time_session(self, s):
s.update(row=self.row, last_access=self.last_access)
def set_cookie(self, response):
self.set_cookie_data(response)
def del_cookie(self, response):
self.set_cookie_data(response, expires=datetime.fromtimestamp(0))
self.db.rem('token', self['code'])
@property
def user(self):
if not self._user:
with self.db.session as s:
self._user = s.fetch('user', id=self.userid)
return self._user
class User(ItemBase):
@property
def accounts(self):
return self.get_data('fedi_acct')
@property
def actions(self):
return self.get_data('action')
@property
def browsers(self):
return self.get_data('browser')
@property
def tokens(self):
return self.get_data('token')
def get_data(self, table):
with self.db.session as s:
return s.fetch(table, userid=self.row.id)

View file

@ -0,0 +1,67 @@
from .func_base import SessionFunctionBase, Token, User, hasher
class Get(SessionFunctionBase):
def config(self, key, default=None):
cache = self.cache.config.fetch(key)
if cache:
return cache
row = self.s.fetch('config', key=key)
if not row:
value = default or self.defaults[key][0]
else:
value = self.defaults[key][1](row.value)
self.cache.config.store(key, value)
return value
def configs(self):
data = DotDict()
for key in self.defaults:
data[key] = self.config(key)
return data
def password(self, handle, password):
user = self.user(handle)
return hasher.verify(user.password, password)
def token(self, code):
cache = self.cache.token.fetch(code)
if cache:
return cache
row = self.s.fetch('token')
if not row:
return
token = Token(self.db, row)
self.cache.token.store(token, token)
return token
def user(self, handle):
handle = handle.lower()
cache = self.cache.user.fetch(handle)
if cache:
return cache
row = self.s.fetch('user', handle=handle)
if not row:
return
user = User(self.db, row)
self.cache.token.store(handle, user)
return user

View file

@ -0,0 +1,67 @@
from datetime import datetime, timedelta
from izzylib import random_gen
from .func_base import SessionFunctionBase, Token, User, hasher
class Put(SessionFunctionBase):
def config(self, key, value):
row = self.s.fetch('config', key=key)
if row:
row = self.s.update(row=row, value=value, return_row=True)
else:
row = self.s.insert('config', key=key, value=value, return_row=True)
self.cache.config.store(key, value)
return value
def token(self, userid, token=None):
if not token:
token = random_gen(40)
row = None
else:
row = self.s.get.token(token)
now = datetime.now()
if row:
row = self.s.update(row=row, last_seen=now, return_row=True)
else:
row = self.s.insert('token', userid=userid,
code = random_gen(40),
last_seen = now,
timestamp = now,
return_row=True
)
token = Token(self.db, row)
self.cache.token.store(token, token)
return token
# Completely forgot to hash the password. This'll be fixed later
def user(self, handle, password, display=None):
user = self.s.get.user(handle)
display = display or handle
passhash = hasher.hash(password)
if user:
row = self.s.update(row=user.row, password=passhash, display=display, return_row=True)
else:
row = self.s.insert('user',
handle = handle,
password = passhash,
display = display,
config = {},
return_row=True
)
user = User(self.db, row)
self.cache.user.store(handle, user)
return user

View file

@ -0,0 +1,13 @@
from .func_base import SessionFunctionBase
class Rem(SessionFunctionBase):
def token(self, code):
token = self.s.get.token(code)
if not token:
logging.verbose('Cannot find token to delete:', code)
return
self.s.remove(row=token.row)
self.cache.token.remove(code)

View file

@ -0,0 +1,12 @@
.item -> %a(href='/') << Home
-if request.user_level == 30:
.item -> %a(href='/admin') << Admin
-if not request.ctx.user
.item -> %a(href='/login') << Login
.item -> %a(href='/register') << Register
-else
.item -> %a(href='/account') << Account
.item -> %a(href='/logout') << Logout

View file

@ -0,0 +1,5 @@
-extends 'base.haml'
-set page = 'Account'
-block content
#title -> =page
Hello, {{request.ctx.user['display']}}

View file

@ -0,0 +1,4 @@
-extends 'base.haml'
-set page = 'Admin'
-block content
#title -> =page

123
pyweb_sync/functions.py Normal file
View file

@ -0,0 +1,123 @@
import random
from izzylib import DotDict, Path, http_server
from sicaptcha.sicaptcha import Sicaptcha
from urllib.request import urlopen
from .config import path
font_path = Path(http_server.__file__).resolve.parent.join('frontend/static/nunito/NunitoSans-ExtraBold.ttf')
class Captcha(Sicaptcha):
word_cache_path = path.data.join('word_cache')
word_cache_path_local = Path('/etc/dictionaries-common/words')
word_cache = []
defaults = {
'bgcolor': ((0, 0, 0, 0), tuple),
'linecolor': ((225, 225, 225, 255), tuple),
'textcolor': ((225, 175, 225, 255), tuple),
'font': (font_path, str),
'fontSize': (24, int),
'format': ('base64', str),
'imagepath': ('', str),
'text': ('', str)
}
def __init__(self, **kwargs):
data = {k: v[0] for k,v in self.defaults.items()}
data.update(kwargs)
if kwargs.get('imagepath'):
kwargs['format'] = 'image'
if self.word_cache_path_local.exists:
with self.word_cache_path_local.open() as fd:
self.word_cache = [v for v in fd.read().splitlines() if "'" not in v and len(v) <= 10]
elif not self.word_cache_path.exists:
resp = urlopen('https://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain')
data = resp.read().decode()
with self.word_cache_path.open('w') as fd:
fd.write(data)
self.word_cache = data.splitlines()
self.update(data)
if not kwargs.get('text'):
self.word_gen(1)
def __str__(self):
return f'Captcha(text={self.text})'
def __getitem__(self, key):
if key not in self.defaults:
raise KeyError(f'Not a valid option: {key}')
return getattr(self, key)
def __setitem__(self, key, value):
if key not in self.defaults:
raise KeyError(f'Not a valid option: {key}')
valid_type = self.defaults[key][1]
if not isinstance(value, valid_type):
raise TypeError(f'Value for {key} should be a {valid_type.__name__}, not a {value.__class__.__name__}')
setattr(self, key, value)
def word_gen(self, num=2):
result = []
for _ in range(num):
result.append(random.choice(self.word_cache))
self.text = ' '.join(result)
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def items(self):
return ((key, self[key]) for key in self.defaults.keys())
def values(self):
return (v for k, v in self.items())
def keys(self):
return (k for k, v in self.items())
def reset(self):
for k, data in self.defaults.items():
self[key] = data[0]
def update(self, data):
for k, v in data.items():
self[k] = v
def as_dict(self):
return DotDict({k: v for k,v in self.items()})
def to_json(self, indent=4):
return self.as_dict().to_json(indent)

37
pyweb_sync/manage.py Normal file
View file

@ -0,0 +1,37 @@
import argparse, sys
from izzylib import logging
from .database import db, version
class Commands:
def __getitem__(self, key):
return getattr(self, f'command_{key}')
def command_setup(self, *args):
db.create_database()
db.put('config', 'version', version)
logging.info('Created database')
def command_migrate(self, *args):
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(dest='command', help='The management command to run')
parser.add_argument(dest='args', nargs='*', help='Arguments to pass to the management command')
args = parser.parse_args()
cmd = Commands()
try:
command = cmd[args.command]
except AttributeError:
logging.error('Invalid management command:', args.command)
sys.exit()
command(*args.args)

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
sicaptcha==0.0.7