basgi/basgi/response.py

319 lines
8.4 KiB
Python

from __future__ import annotations
import os
from aputils import HttpDate, JsonBase
from blib import convert_to_bytes
from collections.abc import Sequence
from datetime import datetime, timedelta
from mimetypes import guess_type
from multidict import CIMultiDict
from pathlib import Path
from typing import Any, Literal, Self
from urllib.parse import quote
from .enums import HttpStatus
from .error import HttpError
from .misc import Cookie, Stream
from .request import Request
class Response:
"Represents an HTTP response message"
def __init__(self,
status: HttpStatus | int,
body: Any,
mimetype: str | None = None,
headers: CIMultiDict[str] | dict[str, str] | None = None) -> None:
"""
Create a new ``Response`` object
:param status: Status code to send with the message
:param body: Body data to sent to the client
:param mimetype: Mimetype to set as the ``Content-Type`` header
:param headers: Header items to include in the message
"""
self.cookies: dict[str, Cookie] = {}
"New cookies to be sent to the client"
self.status: HttpStatus = HttpStatus.parse(status)
"Status code of the response"
self._body = b""
if isinstance(headers, CIMultiDict):
self.headers = headers
else:
self.headers = CIMultiDict(headers or {})
self.body = body
self.mimetype = mimetype
@classmethod
def new_json(cls: type[Self],
status: HttpStatus | int,
body: JsonBase | dict[str, Any] | Sequence[Any] | str,
mimetype: str = "application/json",
headers: dict[str, str] | None = None) -> Self:
"""
Create a new JSON ``Response`` object
:param status: Status code to send with the message
:param body: JSON data to be sent to the client
:param mimetype: Mimetype to set as the ``Content-Type`` header
:param headers: Header items to include in the message
"""
return cls(status, body, mimetype, headers)
@classmethod
def new_activity(cls: type[Self],
status: HttpStatus | int,
body: JsonBase | dict[str, Any] | str,
mimetype: str = "application/activity+json",
headers: dict[str, str] | None = None) -> Self:
"""
Create a new ActivityPub object ``Response`` object
:param status: Status code to send with the message
:param body: ActivityPub object to be sent to the client
:param mimetype: Mimetype to set as the ``Content-Type`` header
:param headers: Header items to include in the message
"""
return cls(status, body, mimetype, headers)
@classmethod
def new_redirect(cls: type[Self],
url: str,
status: HttpStatus | int = HttpStatus.TemporaryRedirect,
body: Any = None,
mimetype: str | None = None) -> Self:
link = f"<a href=\"{url}\">{url}</a>"
message = f"You are being redirected to {link}"
headers = {"Location": quote(str(url), safe=":/%#?=@[]!$&'()*+,;")}
return cls(status, body or message, mimetype or "text/html", headers)
@classmethod
def new_from_http_error(cls: type[Self], error: HttpError) -> Self:
"""
Create a new ``Response`` from an ``HttpError`` exception
:param error: Error to be converted to a response
"""
message = f"HTTP Error {error.status.value}: {str(error)}"
return cls(error.status, message, headers = error.headers)
@property
def body(self) -> bytes:
"Data to be sent in the response"
return self._body
@body.setter
def body(self, value: Any) -> None:
self._body = convert_to_bytes(value)
self.length = len(self._body)
@property
def length(self) -> int:
"Length of the response body in bytes"
return int(self.headers.getone("Content-Length", "0"))
@length.setter
def length(self, value: int) -> None:
self.headers.update({"Content-Length": str(value)})
@property
def mimetype(self) -> str | None:
"Mimetype of the response data"
return self.headers.getone("Content-Type")
@mimetype.setter
def mimetype(self, value: str | None) -> None:
if value is None:
self.headers.popall("Content-Type", None)
return
self.headers.update({"Content-Type": value})
def set_cookie(self,
key: str,
value: str | None,
max_age: timedelta | int | None = None,
expires: HttpDate | datetime | int | str | None = None,
path: str = "/",
domain: str | None = None,
secure: bool = False,
http_only: bool = False,
same_site: Literal["lax", "strict", "none"] | None = "lax") -> Cookie:
cookie = Cookie(key, value, max_age, expires, path, domain, secure, http_only, same_site)
self.cookies[cookie.key] = cookie
return cookie
def delete_cookie(self, cookie: Cookie) -> Cookie:
deleted_cookie = cookie.copy()
deleted_cookie.set_deleted()
self.cookies[deleted_cookie.key] = deleted_cookie
return deleted_cookie
async def send(self, stream: Stream, request: Request) -> None:
"""
Send the response to the client. This can be called multiple times without issues.
:param stream: Stream object to be used to send the response
:param request: Request associated with the response
"""
headers = []
for key, value in self.headers.items():
if not isinstance(value, str):
# this will be a `logging.warning` call in the future
print(f"not string: {key} {type(value)} {value}") # type: ignore[unreachable]
continue
headers.append((key.encode("utf-8"), value.encode("utf-8")))
for cookie in self.cookies.values():
headers.append((b"Set-Cookie", cookie.to_string().encode("utf-8")))
await stream.write_headers(self.status, headers)
await stream.write_body(self.body)
class FileResponse(Response):
"Sends a file as a response"
def __init__(self,
path: Path | str,
mimetype: str | None = None,
chunk_size: int = 8192,
status: HttpStatus | int = HttpStatus.Ok,
headers: dict[str, str] | None = None) -> None:
"""
Create a new ``FileResponse`` object
:param path: Path to the file to send
:param mimetype: Mimetype of the file
:param chunk_size: Size in bytes of the chunks to split the file into when sending
:param status: HTTP status code to respond with
:param headers: Header items to include in the message
:raises FileNotFoundError: When the file does not exist
:raises ValueError: If the path is not a file
"""
Response.__init__(
self,
status, b"",
mimetype or guess_type(path)[0] or "application/octet+stream",
headers
)
if isinstance(path, str):
path = Path(path)
self.chunk_size: int = chunk_size if chunk_size > 0 else 8192
self.path: Path = path.expanduser().resolve()
if not self.path.exists():
raise FileNotFoundError(self.path)
if not self.path.is_file():
raise ValueError(f"Path is a directory: {self.path}")
async def send(self, stream: Stream, request: Request) -> None:
await stream.write_headers(200, tuple([]))
with self.path.open("rb") as fd:
while True:
if not (data := fd.read(self.chunk_size)):
break
await stream.write_body(data, True)
await stream.write_body(b"", False)
class TemplateResponse(Response):
"Represents a server response with a body built from a Jinja template"
def __init__(self,
name: str,
context: dict[str, Any] | None = None,
status: HttpStatus | int = HttpStatus.Ok,
mimetype: str | None = None,
headers: dict[str, Any] | None = None,
pprint: bool = False) -> None:
"""
Create a new ``TemplateResponse`` object
:param name: Path to the template file
:param context: Data to be passed to the template
:param status: Status code to send with the message
:param mimetype: Mimetype to set as the ``Content-Type`` header
:param headers: Header items to include in the message
:param pprint: Format the resulting html/xml to be pretty
"""
Response.__init__(self, status, b"", mimetype or self.detect_mimetype(name), headers)
self.name: str = name
self.pprint: bool = pprint
self.context: dict[str, Any] = context or {}
@staticmethod
def detect_mimetype(path: str) -> str:
"""
Return the mimetype of the template based on the file extension
:param path: Path to detect the mimetype of
"""
_, ext = os.path.splitext(path)
if ext in {".haml", ".jhaml", ".jaml"}:
return "text/html"
if ext in {".scss", ".sass"}:
return "text/css"
return guess_type(path)[0] or "text/plain"
async def send(self, stream: Stream, request: Request) -> None:
context = {
"request": request,
**self.context
}
text = request.app.template.render(self.name, context, self.pprint)
self.body = text.encode("utf-8")
await Response.send(self, stream, request)