izzylib-http-async/barkshark_http_async/server/response.py
2022-02-18 16:45:32 -05:00

202 lines
3.9 KiB
Python

import json, traceback
from datetime import datetime
from . import get_app
from .misc import Cookies, Headers, CookieItem
from ..dotdict import MultiDotDict
from ..http_utils import convert_to_bytes, create_message, first_line
class Response:
__slots__ = ['_app', '_body', 'headers', 'cookies', 'status', 'request', 'version']
def __init__(self, body=b'', status=200, headers={}, cookies={}, content_type='text/plain', request=None):
self._app = None
self._body = b''
self.headers = Headers(headers)
self.cookies = Cookies(cookies)
self.version = '1.1'
self.body = body
self.status = status
self.content_type = content_type
self.request = request
@property
def app(self):
return self._app or get_app()
@app.setter
def app(self, app):
self._app = app
@property
def body(self):
return self._body
@body.setter
def body(self, data):
self._body = convert_to_bytes(data)
@property
def content_type(self):
return self.headers.getone('Content-Type')
@content_type.setter
def content_type(self, data):
try:
self.headers['Content-Type'].set(data)
except KeyError:
self.headers['Content-Type'] = data
@property
def content_length(self):
return len(self.body)
@property
def streaming(self):
return self.headers.getone('Transfer-Encoding') == 'chunked'
def append(self, data):
self._body += convert_to_bytes(data)
def delete_cookie(self, cookie):
cookie.set_delete()
self.cookies[cookie.key] = cookie
@classmethod
def new_html(cls, body, **kwargs):
response = cls(**kwargs)
response.set_html(body)
return response
@classmethod
def new_json(cls, body, activity=False, **kwargs):
response = cls(**kwargs)
response.set_json(body, activity)
return response
@classmethod
def new_error(cls, message, status=500, **kwargs):
response = cls(**kwargs)
response.set_error(message, status)
return response
@classmethod
def new_redir(cls, path, status=302, **kwargs):
response = cls(**kwargs)
response.set_redir(path, status)
return response
def set_text(self, body=b'', status=None):
self.body = body
if status:
self.status = status
return self
def set_html(self, body=b'', status=None):
self.content_type = 'text/html'
self.body = body
if status:
self.status = status
return self
def set_template(self, template, context={}, content_type='text/html', status=None, request=None):
if not request:
request = self.request
if status:
self.status = status
self.body = request.app.render(template, context_data=context, request=request)
self.content_type = content_type
return self
def set_json(self, body={}, status=None, activity=False):
self.content_type = 'application/activity+json' if activity else 'application/json'
self.body = body
if status:
self.status = status
return self
def set_redir(self, path, status=302):
self.headers['Location'] = path
self.status = status
return self
def set_error(self, message, status=500):
try:
if self.request and 'json' in self.request.headers.getone('accept', ''):
return self.set_json({'error': message, 'code': status}, status=status)
return self.set_template('error.haml',
context = {
'error_message': message,
'response': self
},
status = status
)
except AttributeError:
pass
except Exception:
traceback.print_exc()
self.body = f'HTTP Error {status}: {message}'
self.status = status
return self
async def set_streaming(self, transport, headers={}):
self.headers.update(headers)
self.headers.update(transport.app.cfg.default_headers)
self.headers.setall('Transfer-encoding', 'chunked')
transport.write(self.compile(body=False))
def set_cookie(self, key, value, **kwargs):
self.cookies[key] = CookieItem(key, value, **kwargs)
def compile(self, body=True):
first = first_line(status=self.status)
return create_message(first, self.headers, self.cookies, self.body if body else None)