izzylib/IzzyLib/http_server.py

334 lines
7.9 KiB
Python

import multiprocessing, sanic, signal, traceback
import logging as pylog
from jinja2.exceptions import TemplateNotFound
from multidict import CIMultiDict
from multiprocessing import cpu_count, current_process
from sanic.views import HTTPMethodView
from urllib.parse import parse_qsl, urlparse
from . import http, logging
from .misc import DotDict, DefaultDict, LowerDotDict
from .template import Template
log_path_ignore = [
'/media',
'/static'
]
log_ext_ignore = [
'js', 'ttf', 'woff2',
'ac3', 'aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma',
'apng', 'ico', 'jpeg', 'jpg', 'png', 'svg',
'divx', 'mov', 'mp4', 'webm', 'wmv'
]
class HttpServer(sanic.Sanic):
def __init__(self, name='sanic', host='0.0.0.0', port='4080', **kwargs):
self.host = host
self.port = int(port)
self.workers = int(kwargs.get('workers', cpu_count()))
self.sig_handler = kwargs.get('sig_handler')
super().__init__(name, request_class=kwargs.get('request_class', HttpRequest))
for log in ['sanic.root', 'sanic.access']:
pylog.getLogger(log).setLevel(pylog.ERROR)
self.template = Template(
kwargs.get('tpl_search', []),
kwargs.get('tpl_globals', {}),
kwargs.get('tpl_context'),
kwargs.get('tpl_autoescape', True)
)
self.template.addEnv('app', self)
self.error_handler.add(TemplateNotFound, NoTemplateError)
self.error_handler.add(Exception, kwargs.get('error_handler', GenericError))
self.register_middleware(MiddlewareAccessLog, attach_to='response')
signal.signal(signal.SIGHUP, self.finish)
signal.signal(signal.SIGINT, self.finish)
signal.signal(signal.SIGQUIT, self.finish)
signal.signal(signal.SIGTERM, self.finish)
## Sanic spits out a warning, so this is the workaround to stop it
def __setattr__(self, key, value):
object.__setattr__(self, key, value)
def add_method_route(self, method, *routes):
for route in routes:
self.add_route(method.as_view(), route)
def add_method_routes(self, routes: list):
for route in routes:
self.add_method_route(*route)
def start(self):
options = {
'host': self.host,
'port': self.port,
'workers': self.workers,
'access_log': False,
'debug': False
}
msg = f'Starting {self.name} at {self.host}:{self.port}'
if self.workers > 1:
msg += f' with {self.workers} workers'
logging.info(msg)
self.run(**options)
def finish(self):
if self.sig_handler:
self.sig_handler()
print('stopping.....')
self.stop()
logging.info('Bye! :3')
sys.exit()
class HttpRequest(sanic.request.Request):
def __init__(self, url_bytes, headers, version, method, transport, app):
super().__init__(url_bytes, headers, version, method, transport, app)
self.Headers = Headers(headers)
self.Data = Data(self)
self.template = self.app.template
self.__setup_defaults()
self.__parse_path()
#if self.paths.media:
#return
self.__parse_signature()
self.Run()
def Run(self):
pass
def response(self, tpl, *args, **kwargs):
return self.template.response(self, tpl, *args, **kwargs)
def alldata(self):
return self.__combine_dicts(self.content.json, self.data.query, self.data.form)
def verify(self, actor=None):
self.ap.valid = http.VerifyHeaders(self.headers, self.method, self.path, actor, self.body)
return self.ap.valid
def __combine_dicts(self, *dicts):
data = DotDict()
for item in dicts:
data.update(item)
return data
def __setup_defaults(self):
self.paths = DotDict({'media': False, 'json': False, 'ap': False, 'cookie': False})
self.ap = DotDict({'valid': False, 'signature': {}, 'actor': None, 'inbox': None, 'domain': None})
def __parse_path(self):
self.paths.media = any(map(self.path.startswith, log_path_ignore)) or any(map(self.path.startswith, log_ext_ignore))
self.paths.json = self.__json_check()
def __parse_signature(self):
sig = self.headers.getone('signature', None)
if sig:
self.ap.signature = http.ParseSig(sig)
if self.ap.signature:
self.ap.actor = self.ap.signature.get('keyid', '').split('#', 1)[0]
self.ap.domain = urlparse(self.ap.actor).netloc
def __json_check(self):
if self.path.endswith('.json'):
return True
accept = self.headers.getone('Accept', None)
if accept:
mimes = [v.strip() for v in accept.split(',')]
if any(mime in ['application/json', 'application/activity+json'] for mime in mimes):
return True
return False
class Headers(LowerDotDict):
def __init__(self, headers):
super().__init__()
for k,v in headers.items():
if not self.get(k):
self[k] = []
self[k].append(v)
def getone(self, key, default=None):
value = self.get(key)
if not value:
return default
return value[0]
def getall(self, key, default=[]):
return self.get(key.lower(), default)
class Data(object):
def __init__(self, request):
self.request = request
@property
def combined(self):
return DotDict(**self.form.asDict(), **self.query.asDict(), **self.json.asDict())
@property
def query(self):
data = {k: v for k,v in parse_qsl(self.request.query_string)}
return DotDict(data)
@property
def form(self):
data = {k: v[0] for k,v in self.request.form.items()}
return DotDict(data)
@property
def files(self):
return DotDict({k:v[0] for k,v in self.request.files.items()})
### body functions
@property
def raw(self):
try:
return self.request.body
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.raw: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
return b''
@property
def text(self):
try:
return self.raw.decode()
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.text: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
return ''
@property
def json(self):
try:
return DotDict(self.text)
except Exception as e:
logging.verbose('IzzyLib.http_server.Data.json: failed to get body')
logging.debug(f'{e.__class__.__name__}: {e}')
data = '{}'
return {}
async def MiddlewareAccessLog(request, response):
if request.paths.media:
return
uagent = request.headers.get('user-agent')
address = request.headers.get('x-real-ip', request.forwarded.get('for', request.remote_addr))
logging.info(f'({multiprocessing.current_process().name}) {address} {request.method} {request.path} {response.status} "{uagent}"')
def GenericError(request, exception):
try:
status = exception.status_code
except:
status = 500
if status not in range(200, 499):
traceback.print_exc()
msg = f'{exception.__class__.__name__}: {str(exception)}'
if request.paths.json:
return sanic.response.json({'error': {'status': status, 'message': msg}})
try:
return request.response('server_error.haml', status=status, context={'status': str(status), 'error': msg})
except TemplateNotFound:
return sanic.response.text(f'Error {status}: {msg}')
def NoTemplateError(request, exception):
logging.error('TEMPLATE_ERROR:', f'{exception.__class__.__name__}: {str(exception)}')
return sanic.response.html('I\'m a dumbass and forgot to create a template for this page', 500)
def ReplaceHeader(headers, key, value):
for k,v in headers.items():
if k.lower() == header.lower():
del headers[k]
class Response:
Text = sanic.response.text
Html = sanic.response.html
Json = sanic.response.json
Redir = sanic.response.redirect
def Css(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'text/css')
return sanic.response.text(*args, headers=headers, **kwargs)
def Js(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/javascript')
return sanic.response.text(*args, headers=headers, **kwargs)
def Ap(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/activity+json')
return sanic.response.json(*args, headers=headers, **kwargs)
def Jrd(*args, headers={}, **kwargs):
ReplaceHeader(headers, 'content-type', 'application/jrd+json')
return sanic.response.json(*args, headers=headers, **kwargs)
Resp = Response