Compare commits

...

4 commits

Author SHA1 Message Date
Izalia Mae 4d5da1ed50 router and request changes
* redo route parameters
* add `Application.route` method
* disable generics on `Application` in `Request` for now
2024-04-20 18:43:31 -04:00
Izalia Mae cad946555b finish documenting module 2024-04-20 14:38:13 -04:00
Izalia Mae d48cc7024a h 2024-04-19 22:10:55 -04:00
Izalia Mae 2a04991e5c add documentation for some classes and functions 2024-04-19 20:26:44 -04:00
22 changed files with 754 additions and 690 deletions

View file

@ -3,22 +3,17 @@ __version__ = "0.1.0"
from .application import Application, StaticHandler from .application import Application, StaticHandler
from .client import Client from .client import Client
from .enums import HttpStatus, SassOutputStyle from .enums import PridePalette, SassOutputStyle
from .error import HttpError
from .request import Request from .request import Request
from .response import Response, FileResponse, TemplateResponse from .response import Response, FileResponse, TemplateResponse
from .runner import Runner, UvicornRunner, GranianRunner from .runner import Runner, UvicornRunner, GranianRunner
from .signal import Signal
from .template import SassExtension, Template from .template import SassExtension, Template
from .misc import ( from .misc import (
Color, Color,
Cookie, Cookie,
StateProxy, StateProxy,
Stream, Stream
convert_to_bytes,
convert_to_string,
is_loop_running
) )
from .router import ( from .router import (
@ -44,5 +39,4 @@ from .misc import ReaderFunction, WriterFunction
from .request import AsgiVersionType, ScopeType from .request import AsgiVersionType, ScopeType
from .router import RouteHandler from .router import RouteHandler
from .runner import BackgroundTask from .runner import BackgroundTask
from .signal import SignalCallback
from .template import TemplateContextCallback from .template import TemplateContextCallback

View file

@ -2,19 +2,18 @@ from __future__ import annotations
import traceback import traceback
from blib import HttpError, Signal, Url
from collections.abc import Callable from collections.abc import Callable
from functools import lru_cache from mimetypes import guess_type
from os.path import normpath from os.path import normpath
from pathlib import Path from pathlib import Path
from typing import Any, Generic, TypeVar from typing import Any, Generic, TypeVar
from .client import Client from .client import Client
from .error import HttpError
from .misc import StateProxy, Stream, ReaderFunction, WriterFunction from .misc import StateProxy, Stream, ReaderFunction, WriterFunction
from .request import Request, ScopeType from .request import Request, ScopeType
from .response import FileResponse, Response from .response import FileResponse, Response
from .router import Router, RouteHandler, get_router from .router import Router, RouteHandler, get_router
from .signal import Signal
from .template import Template, TemplateContextCallback from .template import Template, TemplateContextCallback
@ -28,6 +27,8 @@ APPLICATIONS: dict[str, Application] = {} # type: ignore[type-arg]
class Application(Generic[R, RT, AT]): class Application(Generic[R, RT, AT]):
"Represents an ASGI application"
def __init__(self, def __init__(self,
name: str, name: str,
template_search: list[Path | str] | None = None, template_search: list[Path | str] | None = None,
@ -35,35 +36,67 @@ class Application(Generic[R, RT, AT]):
template_context: TemplateContextCallback | None = None, template_context: TemplateContextCallback | None = None,
request_class: type[R] = Request, # type: ignore[assignment] request_class: type[R] = Request, # type: ignore[assignment]
request_state_class: type[RT] = StateProxy, # type: ignore[assignment] request_state_class: type[RT] = StateProxy, # type: ignore[assignment]
app_state_class: type[AT] = StateProxy) -> None: # type: ignore[assignment] app_state_class: type[AT] = StateProxy, # type: ignore[assignment]
http_proxy: Url | str | None = None) -> None:
"""
Create a new ``Application``
:param name: Identifier for the application. This is used for the ``Server`` header
and adding routes.
:param template_search: Directories to search when locating a template
:param template_env: Context data to include with every template render
:param template_context: Function that gets called on every template render
:param request_class: Class to use for requests
:param request_state_class: Class to use for the :attr:`basgi.Request.state` property
:param app_state_class: Class to use for the :attr:`basgi.Application.state` property
"""
if name in APPLICATIONS: if name in APPLICATIONS:
raise ValueError(f"Application with name '{name}' already exists") raise ValueError(f"Application with name '{name}' already exists")
APPLICATIONS[name] = self
self.name: str = name self.name: str = name
"Identifier for the application"
self.state: AT = app_state_class({}) self.state: AT = app_state_class({})
"Data to be stored with the application"
self.request_class: type[R] = request_class self.request_class: type[R] = request_class
"Class to use for requests"
self.request_state_class: type[RT] = request_state_class self.request_state_class: type[RT] = request_state_class
"Class to use for the :attr:`basgi.Request.state` property"
# Not sure how to properly annotate `Exception` here, so just ignore the warnings # Not sure how to properly annotate `Exception` here, so just ignore the warnings
self.error_handlers: dict[type[ExceptionType], ExceptionCallback] = { # type: ignore self.error_handlers: dict[type[ExceptionType], ExceptionCallback] = { # type: ignore
HttpError: self.handle_http_error HttpError: self._handle_http_error
} }
"Functions to be called on specific errors"
self.router: Router = get_router(self.name) self.router: Router = get_router(self.name)
self.router.trim_last_slash = True "Contains all of the routes the application will handle"
self.client: Client = Client(name, http_proxy)
"Customized httpx client"
self.client: Client = Client()
self.template: Template = Template( self.template: Template = Template(
*(template_search or []), *(template_search or []),
context_function = template_context, context_function = template_context,
**(template_env or {})) **(template_env or {})
)
APPLICATIONS[name] = self "Template environment"
@classmethod @staticmethod
def get(cls: type[Application[R, RT, AT]], name: str) -> Application[R, RT, AT]: def get(name: str) -> Application[R, RT, AT]:
"""
Get an application by its name
:param name: Identifier of the application
:raises KeyError: If an application by the specified name cannot be found
"""
return APPLICATIONS[name] return APPLICATIONS[name]
@ -94,7 +127,7 @@ class Application(Generic[R, RT, AT]):
await self.on_request.handle_emit(request) await self.on_request.handle_emit(request)
except Exception as error: except Exception as error:
response = self.handle_error(request, error) response = self._handle_error(request, error)
try: try:
match = self.router(request.path, request.method) match = self.router(request.path, request.method)
@ -105,13 +138,13 @@ class Application(Generic[R, RT, AT]):
raise HttpError(500, "Empty response") raise HttpError(500, "Empty response")
except Exception as error: except Exception as error:
response = self.handle_error(request, error) response = self._handle_error(request, error)
try: try:
await self.on_response.handle_emit(request, response) await self.on_response.handle_emit(request, response)
except Exception as error: except Exception as error:
response = self.handle_error(request, error) response = self._handle_error(request, error)
if not response.headers.get("Server"): if not response.headers.get("Server"):
response.headers["Server"] = self.name response.headers["Server"] = self.name
@ -122,34 +155,79 @@ class Application(Generic[R, RT, AT]):
@Signal(5.0) @Signal(5.0)
async def on_request(self, request: Request) -> None: async def on_request(self, request: Request) -> None:
pass """
:class:`blib.Signal` that gets emitted just after the request is created
:param application: The application that emit the signal
:param request: The newly-created request
"""
...
@Signal(5.0) @Signal(5.0)
async def on_response(self, request: Request, response: Response) -> None: async def on_response(self, request: Request, response: Response) -> None:
pass """
:class:`blib.Signal` that gets emitted just after the route handler is called
:param application: The application that emit the signal
:param request: The request
:param response: Response from the route handler
"""
...
@Signal(5.0) @Signal(5.0)
async def on_startup(self) -> None: async def on_startup(self) -> None:
pass """
:class:`blib.Signal` that gets emitted on server start
:param application: The application that emit the signal
"""
...
@Signal(5.0) @Signal(5.0)
async def on_shutdown(self) -> None: async def on_shutdown(self) -> None:
pass """
:class:`blib.Signal` that gets emitted just before server stop
:param application: The application that emit the signal
"""
...
def add_route(self, method: str, path: str, handler: RouteHandler) -> None: def add_route(self, handler: RouteHandler, method: str, *paths: str) -> None:
self.router.bind(handler, path, methods = [method]) """
Add a route handler
:param method: HTTP method the route will handle
:param path: Virtual HTTP path the route will handle
:param handler: Function to call when the route is requested
"""
self.router.bind(handler, method, *paths)
def add_static(self, path: str, location: Path | str, cached: bool = False) -> None: def add_static(self, path: str, location: Path | str, cached: bool = False) -> None:
"""
Add a route handler that servers a directory. If a directory is requested and
``index.html`` exists in that directory, it will be served instead.
:param path: Virtual HTTP path the route will handle
:param location: Directory to use for serving files
:param cached: Store all requested files in memory after being read from the disk for
the first time
"""
handler = StaticHandler(path, location, cached) handler = StaticHandler(path, location, cached)
self.add_route("GET", handler.path, handler) self.add_route(handler, "GET", handler.path)
def print_access_log(self, request: Request, response: Response) -> None: def print_access_log(self, request: Request, response: Response) -> None:
"""
Prints the request line to the terminal. Override this in a sub-class to customize it.
"""
message = "{}: {} \"{} {}\" {} {} \"{}\"".format( message = "{}: {} \"{} {}\" {} {} \"{}\"".format(
"INFO", "INFO",
request.headers.get( request.headers.get(
@ -169,32 +247,63 @@ class Application(Generic[R, RT, AT]):
print(message, flush = True) print(message, flush = True)
def handle_error(self, request: Request, error: Exception) -> Response: def route(self, method: str, *paths: str) -> Callable[[RouteHandler], RouteHandler]:
"""
Decorator for adding a route from a callable
:param method: HTTP method the route should handle
:param paths: Paths the route should handle
"""
def wrapper(func: RouteHandler) -> RouteHandler:
self.router.bind(func, method, *paths)
return func
return wrapper
def _handle_error(self, request: Request, error: Exception) -> Response:
try: try:
return self.error_handlers[type(error)](request, error) return self.error_handlers[type(error)](request, error)
except KeyError:
traceback.print_exception(error)
return Response(500, "Internal Server Error")
except Exception: except Exception:
traceback.print_exc() traceback.print_exc()
return Response(500, "Internal Server Error") return Response(500, "Internal Server Error")
def handle_http_error(self, request: Request, error: HttpError) -> Response: def _handle_http_error(self, request: Request, error: HttpError) -> Response:
return Response.new_from_http_error(error) return Response.new_from_http_error(error)
class StaticHandler: class StaticHandler:
"Provides a handler for a directory of static files"
def __init__(self, path: str, location: Path | str, cached: bool) -> None: def __init__(self, path: str, location: Path | str, cached: bool) -> None:
"""
Create a new static file handler
:param path: Virtual HTTP path the route will handle
:param location: Directory to use for serving files
:param cached: Store all requested files in memory after being read from the disk for
the first time
"""
if isinstance(location, str): if isinstance(location, str):
location = Path(location) location = Path(location)
self.path: str = path.rstrip("/") + "/{filepath:.*}" self.path: str = path.rstrip("/") + "/{filepath:.*}"
"Path regex used to determine if a path is handled or not"
self.location: Path = Path(location).expanduser().resolve() self.location: Path = Path(location).expanduser().resolve()
"Directory to use for serving files"
self.cached: bool = cached self.cached: bool = cached
"Whether or not all requested files will be stored in memory"
self.cache: dict[str, Response] = {}
"Currently cached responses"
if not self.location.exists(): if not self.location.exists():
raise FileNotFoundError(self.location) raise FileNotFoundError(self.location)
@ -204,27 +313,32 @@ class StaticHandler:
async def __call__(self, request: Request) -> Response: async def __call__(self, request: Request) -> Response:
filepath = request.params["filepath"] """
Function that gets called for incoming requests
if self.cached: :param request: The incoming request
return await self.handle_call_cached(request, filepath) :param filepath: Relative path of the requested file
:raises HttpError: If the file on disk cannot be found
"""
return await self.handle_call(request, filepath) filepath = normpath(request.params["filepath"].lstrip("/"))
if self.cached and filepath in self.cache:
return self.cache[filepath]
async def handle_call(self, request: Request, filepath: str) -> Response: path = self.location.joinpath(filepath)
filepath = normpath(filepath)
path = self.location.joinpath(filepath.lstrip("/"))
if path.is_dir(): if path.is_dir():
path = path.joinpath("index.html") path = path.joinpath("index.html")
if not path.is_file(): if not path.is_file():
raise HttpError(404, str(filepath)) raise HttpError(404, filepath)
if self.cached:
with path.open("rb") as fd:
response = Response(200, fd.read(), mimetype = guess_type(path)[0])
self.cache[filepath] = response
return response
return FileResponse(path) return FileResponse(path)
@lru_cache(maxsize = 128, typed = True)
async def handle_call_cached(self, request: Request, filepath: str) -> Response:
return await self.handle_call(request, filepath)

View file

@ -1,28 +1,50 @@
import aputils
import httpx import httpx
from typing import TypeVar from typing import TypeVar
from aputils import (
AlgorithmType,
JsonBase,
Message,
Nodeinfo,
Webfinger,
WellKnownNodeinfo,
Signer,
register_signer
)
from . import __version__ from . import __version__
T = TypeVar("T", bound = aputils.JsonBase) T = TypeVar("T", bound = JsonBase)
class Client: class Client:
def __init__(self, useragent: str = f"BarksharkASGI/{__version__}"): "HTTP client with useful methods for fetching ActivityPub resources"
def __init__(self, useragent: str = f"BarksharkASGI/{__version__}", proxy: str | None = None):
"""
Create a new ``Client`` object
:param useragent: ``User-Agent`` header to send with each request
"""
self._client = httpx.AsyncClient( self._client = httpx.AsyncClient(
http2 = True, http2 = True,
timeout = 5, timeout = 5,
max_redirects = 3, max_redirects = 3,
proxies = proxy,
headers = { headers = {
"User-Agent": useragent "User-Agent": useragent
} },
) )
@property @property
def useragent(self) -> str: def useragent(self) -> str:
"The ``User-Agent`` header to send with each request"
return self._client.headers["User-Agent"] return self._client.headers["User-Agent"]
@ -32,22 +54,36 @@ class Client:
async def close(self) -> None: async def close(self) -> None:
"Close all active connections"
await self._client.aclose() await self._client.aclose()
async def request(self, async def request(self,
method: str, method: str,
url: str, url: str,
body: aputils.JsonBase | None = None, body: JsonBase | None = None,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None, signer: Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256, algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False, stream: bool = False,
follow_redirects: bool = False) -> httpx.Response: follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP request
:param method: HTTP method to use
:param url: Resource to fetch
:param body: Data to send as the body
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
url = url.split("#", 1)[0] url = url.split("#", 1)[0]
if body is not None and not isinstance(body, aputils.JsonBase): if body is not None and not isinstance(body, JsonBase):
raise TypeError("body must be a JsonBase object") raise TypeError("body must be a JsonBase object")
data = body.to_json() if body else None data = body.to_json() if body else None
@ -69,22 +105,34 @@ class Client:
async def fetch_json(self, async def fetch_json(self,
url: str, url: str,
cls: type[T] | None = None, cls: type[T] | None = None,
signer: aputils.Signer | None = None) -> T: signer: Signer | None = None) -> T:
"""
Send a ``GET`` request and return a ``JsonBase`` object
message_class: type[T] = aputils.JsonBase if cls is None else cls # type: ignore :param url: Location of the resource to fetch
:param cls: Class to use for parsing the JSON body
:param signer: Actor to sign the headers with
"""
message_class: type[T] = JsonBase if cls is None else cls # type: ignore
headers = { headers = {
"Accept": "application/" + ("activity+json" if cls is aputils.Message else "json") "Accept": "application/" + ("activity+json" if cls is Message else "json")
} }
response = await self.get(url, headers, signer) response = await self.get(url, headers, signer)
return message_class.parse(response.json()) return message_class.parse(response.json())
async def fetch_nodeinfo(self, domain: str) -> aputils.Nodeinfo: async def fetch_nodeinfo(self, domain: str) -> Nodeinfo:
"""
Get the nodeinfo endpoint of a domain
:param domain: Domain to fetch from
"""
wk_nodeinfo = await self.fetch_json( wk_nodeinfo = await self.fetch_json(
f"https://{domain}/.well-known/nodeinfo", f"https://{domain}/.well-known/nodeinfo",
aputils.WellKnownNodeinfo WellKnownNodeinfo
) )
try: try:
@ -93,21 +141,38 @@ class Client:
except KeyError: except KeyError:
url = wk_nodeinfo.v20 url = wk_nodeinfo.v20
return await self.fetch_json(url, aputils.Nodeinfo) return await self.fetch_json(url, Nodeinfo)
async def fetch_webfinger(self, username: str, domain: str) -> aputils.Webfinger: async def fetch_webfinger(self, username: str, domain: str) -> Webfinger:
"""
Query webfinger for a user
:param username: Name of the user to fetch
:param domain: Domain to fetch from
"""
url = f"https://{domain}/.well-known/webfinger?resource=acct:{username}@{domain}" url = f"https://{domain}/.well-known/webfinger?resource=acct:{username}@{domain}"
return await self.fetch_json(url, aputils.Webfinger) return await self.fetch_json(url, Webfinger)
async def get(self, async def get(self,
url: str, url: str,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None, signer: Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256, algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False, stream: bool = False,
follow_redirects: bool = False) -> httpx.Response: follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP GET request
:param url: Resource to fetch
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request( return await self.request(
"GET", url, None, headers, signer, algorithm, stream, follow_redirects "GET", url, None, headers, signer, algorithm, stream, follow_redirects
@ -117,9 +182,18 @@ class Client:
async def head(self, async def head(self,
url: str, url: str,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None, signer: Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256, algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
follow_redirects: bool = False) -> httpx.Response: follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP HEAD request
:param url: Resource to fetch
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request( return await self.request(
"HEAD", url, None, headers, signer, algorithm, False, follow_redirects "HEAD", url, None, headers, signer, algorithm, False, follow_redirects
@ -128,23 +202,34 @@ class Client:
async def post(self, async def post(self,
url: str, url: str,
body: aputils.JsonBase, body: JsonBase,
headers: dict[str, str] | None = None, headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None, signer: Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256, algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False, stream: bool = False,
follow_redirects: bool = False) -> httpx.Response: follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP POST request
:param url: Resource to fetch
:param body: Data to send as the body
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request( return await self.request(
"POST", url, body, headers, signer, algorithm, stream, follow_redirects "POST", url, body, headers, signer, algorithm, stream, follow_redirects
) )
@aputils.register_signer(httpx.Request) @register_signer(httpx.Request)
def handle_httpx_sign( def handle_httpx_sign(
signer: aputils.Signer, signer: Signer,
request: httpx.Request, request: httpx.Request,
algorithm: aputils.AlgorithmType) -> httpx.Request: algorithm: AlgorithmType) -> httpx.Request:
headers = signer.sign_headers( headers = signer.sign_headers(
request.method, request.method,

View file

@ -1,89 +1,23 @@
import re from blib import Enum, StrEnum
from aputils import IntEnum, StrEnum
CAPITAL = re.compile("[A-Z][^A-Z]") class PridePalette(tuple[str, ...], Enum):
LGBT = ("#9400D3", "#4B0082", "#0000FF", "#00FF00", "#FFFF00", "#FF7F00", "#FF0000")
LESBIAN = ("#D52D00", "#EF7627", "#FF9A56", "#FFFFFF", "#D162A4", "#B55690", "#A30262")
BI = ("#D60270", "#9B4F96", "#0038A8")
GAY = ("#078D70", "#26CEAA", "#98E8C1", "#FFFFFF", "#7BADE2", "#5049CC", "#3D1A78")
PANSEXUAL = ("#FF218C", "#FFD800", "#21B1FF")
ASEXUAL = ("#000000", "#A3A3A3", "#FFFFFF", "#800080")
AROMANTIC = ("#3DA542", "#A7D379", "#FFFFFF", "#A9A9A9", "#000000")
TRANS = ("#55CDFC", "#F7A8B8", "#FFFFFF")
TRANS_BLACK = ("#55CDFC", "#F7A8B8", "#000000")
TRANSMASC = ("#FF8ABD", "#CDF5FE", "#9AEBFF", "#74DFFF")
NONBINARY = ("#FCF434", "#FFFFFF", "#9C59D1", "#2C2C2C")
# aliases
class HttpStatus(IntEnum): ACE = ASEXUAL
# 1xx ARO = AROMANTIC
Continue = 100 ENBY = NONBINARY
SwitchingProtocols = 101
Processing = 102
# 2xx
Ok = 200
Created = 201
Accepted = 202
NonauthritativeInformation = 203
NoContent = 204
ResetContent = 205
PartialContent = 206
MultiStatus = 207
AlreadyReported = 208
ImUsed = 226
# 3xx
MultipleChoices = 300
MovedPermanently = 301
Found = 302
SeeOther = 303
NotModified = 304
UseProxy = 305
TemporaryRedirect = 307
PermanentRedirect = 308
# 4xx
BadRequest = 400
Unauthorized = 401
PaymentRequired = 402
Forbidden = 403
NotFound = 404
MethodNotAllowed = 405
NotAcceptable = 406
ProxyAuthenticationRequired = 407
RequestTimeout = 408
Conflict = 409
Gone = 410
LengthRequired = 411
PreconditionFailed = 412
PayloadTooLarge = 413
RequestUriTooLarge = 414
UnsupportedMediaType = 415
RequestRangeNotSatisfiable = 416
ExpectationFailed = 417
ImATeapot = 418
EnhanceYourCalm = 420
MisdirectedRequest = 421
UnprocessableEntity = 422
Locked = 423
FailedDependency = 424
UpgradeRequired = 426
PreconditionRequired = 428
TooManyRequests = 429
RequestHeaderFieldsTooLarge = 431
ConnectionClosedWithoutResponse = 444
UnavailableForLegalReasons = 451
ClientClosedRequest = 499
InternalServerError = 500
NotImplemented = 501
BadGateway = 502
ServiceUnavailable = 503
GatewayTimeout = 504
HttpVersionNotSupported = 505
VariantAlsoNegotiates = 506
InsufficientStorage = 507
LoopDetected = 508
NotExtended = 512
NetworkAuthenticationRequired = 511
NetworkConnectTimeoutError = 599
@property
def reason(self) -> str:
return " ".join(CAPITAL.findall(self.name))
class SassOutputStyle(StrEnum): class SassOutputStyle(StrEnum):

View file

@ -1,89 +0,0 @@
from typing import Any, Self
from .enums import HttpStatus
class HttpError(Exception):
def __init__(self,
status: HttpStatus | int,
message: str,
headers: dict[str, str] | None = None) -> None:
Exception.__init__(self, f'HTTP ERROR {status}: {message[:100]}')
self.status: HttpStatus = HttpStatus.parse(status)
self.message: str = message
self.headers: dict[str, str] = headers or {}
@classmethod
def MOVED_PERMANENTLY(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(301, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def FOUND(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(302, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def TEMPORARY_REDIRECT(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(307, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def PERMANENT_REDIRECT(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(308, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def BAD_REQUEST(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(400, message, **kwargs)
@classmethod
def UNAUTHORIZED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(401, message, **kwargs)
@classmethod
def FORBIDDEN(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(403, message, **kwargs)
@classmethod
def NOT_FOUND(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(404, message, **kwargs)
@classmethod
def METHOD_NOT_ALLOWED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(405, message, **kwargs)
@classmethod
def GONE(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(410, message, **kwargs)
@classmethod
def SERVER_ERROR(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(500, message, **kwargs)
@classmethod
def NOT_IMPLEMENTED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(501, message, **kwargs)
@property
def reason(self) -> str:
return self.status.reason

View file

@ -1,7 +1,4 @@
import asyncio from aputils import HttpDate
import json
from aputils import HttpDate, JsonBase
from colorsys import rgb_to_hls, hls_to_rgb from colorsys import rgb_to_hls, hls_to_rgb
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import cached_property from functools import cached_property
@ -10,125 +7,59 @@ from typing import Any, Literal, Self
from collections.abc import ( from collections.abc import (
Awaitable, Awaitable,
Callable, Callable,
Iterable,
Iterator, Iterator,
ItemsView, ItemsView,
KeysView, KeysView,
Mapping,
MutableMapping, MutableMapping,
Sequence, Sequence,
ValuesView ValuesView
) )
from .enums import PridePalette
ReaderFunction = Callable[..., Awaitable[dict[str, Any]]] ReaderFunction = Callable[..., Awaitable[dict[str, Any]]]
WriterFunction = Callable[[dict[str, Any]], Awaitable[None]] WriterFunction = Callable[[dict[str, Any]], Awaitable[None]]
PRIDE_COLORS = {
"lgbt": ["#9400D3", "#4B0082", "#0000FF", "#00FF00", "#FFFF00", "#FF7F00", "#FF0000"],
"lesbian": ["#D52D00", "#EF7627", "#FF9A56", "#FFFFFF", "#D162A4", "#B55690", "#A30262"],
"bi": ["#D60270", "#9B4F96", "#0038A8"],
"gay": ["#078D70", "#26CEAA", "#98E8C1", "#FFFFFF", "#7BADE2", "#5049CC", "#3D1A78"],
"pan": ["#FF218C", "#FFD800", "#21B1FF"],
"asexual": ["#000000", "#A3A3A3", "#FFFFFF", "#800080"],
"aromantic": ["#3DA542", "#A7D379", "#FFFFFF", "#A9A9A9", "#000000"],
"trans": ["#55CDFC", "#F7A8B8", "#FFFFFF"],
"trans-black": ["#55CDFC", "#F7A8B8", "#000000"],
"transmasc": ["#FF8ABD", "#CDF5FE", "#9AEBFF", "#74DFFF"],
"non-binary": ["#FCF434", "#FFFFFF", "#9C59D1", "#2C2C2C"]
}
def convert_to_bytes(value: Any, encoding: str = "utf-8") -> bytes:
"""
Convert an object to :class:`bytes`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a string or gets converted to
one in the process
:raises TypeError: If the object cannot be converted
"""
if isinstance(value, bytes):
return value
try:
return convert_to_string(value).encode(encoding)
except TypeError:
raise TypeError(f"Cannot convert '{type(value).__name__}' into bytes") from None
def convert_to_string(value: Any, encoding: str = 'utf-8') -> str:
"""
Convert an object to :class:`str`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a :class:`bytes` object
"""
if value is None:
return ''
if isinstance(value, bytes):
return value.decode(encoding)
if isinstance(value, bool):
return str(value)
if isinstance(value, str):
return value
if isinstance(value, JsonBase):
return value.to_json()
if isinstance(value, (dict, list, tuple, set)):
return json.dumps(value)
if isinstance(value, (int, float)):
return str(value)
raise TypeError(f'Cannot convert "{type(value).__name__}" into a string') from None
def is_loop_running() -> bool:
"Check if an event loop is running in the current thread"
try:
loop = asyncio.get_event_loop()
if not loop:
return False
return loop.is_running()
except Exception:
return False
class Color(str): class Color(str):
"Represents an HTML color value"
def __new__(cls, color: str) -> Self: def __new__(cls, color: str) -> Self:
if not isinstance(color, str): """
raise TypeError("Color must be a valid hex string") Create a new ``Color`` object
if not color.startswith("#"): :param color: Hex color string of 3 or 6 characters (``#`` character optional)
color = f"#{str(color)}" """
if len(color) == 4: color = color.lstrip("#")
color = f"#{color[1]*2}{color[2]*2}{color[3]*2}"
elif len(color) != 7: if len(color) == 3:
color = f"{color[1]*2}{color[2]*2}{color[3]*2}"
elif len(color) != 6:
raise TypeError("Color must be 3 or 6 character hex string") raise TypeError("Color must be 3 or 6 character hex string")
return str.__new__(cls, color) return str.__new__(cls, "#" + color)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Color({self})" return f"Color('{self}')"
@classmethod @classmethod
def from_rgb(cls: type[Self], red: int, green: int, blue: int) -> Self: def from_rgb(cls: type[Self], red: int, green: int, blue: int) -> Self:
"""
Create a new ``Color`` object from red, green, and blue values. The values must be
between ``0`` and ``255``
:param red: Red color value
:param green: Green color value
:param blue: Blue color value
"""
values = [red, green, blue] values = [red, green, blue]
for value in values: for value in values:
@ -140,6 +71,10 @@ class Color(str):
@classmethod @classmethod
def from_hsl(cls: type[Self], hue: int, saturation: int, luminance: int) -> Self: def from_hsl(cls: type[Self], hue: int, saturation: int, luminance: int) -> Self:
"""
Create a new ``Color`` object from hue, saturation, and luminance values
"""
hsl_values: list[int | float] = [hue, saturation, luminance] hsl_values: list[int | float] = [hue, saturation, luminance]
for idx, value in enumerate(hsl_values): for idx, value in enumerate(hsl_values):
@ -155,23 +90,14 @@ class Color(str):
@classmethod @classmethod
def new_pride_palette(cls: type[Self], flag: str) -> Iterable[Self]: def new_pride_palette(cls: type[Self], flag: str) -> tuple[Self, ...]:
"Returns a `tuple` of `Color` objects which represents a pride flag color palette" "Returns multiple ``Color`` objects which represents a pride flag color palette"
if flag == "ace": return tuple(cls(color) for color in PridePalette.parse(flag.replace("-", "")))
flag = "asexual"
elif flag == "aro":
flag = "aromantic"
elif flag in ["enby", "nonbinary"]:
flag = "non-binary"
return tuple(cls(color) for color in PRIDE_COLORS[flag])
@staticmethod @staticmethod
def parse_multi(multiplier: int) -> float: def _parse_multi(multiplier: int) -> float:
if multiplier >= 100: if multiplier >= 100:
return 1 return 1
@ -183,11 +109,15 @@ class Color(str):
@cached_property @cached_property
def rgb(self) -> tuple[int, int, int]: def rgb(self) -> tuple[int, int, int]:
"Get the color as a tuple of red, green, and blue levels"
return (self.red, self.green, self.blue) return (self.red, self.green, self.blue)
@cached_property @cached_property
def hsl(self) -> tuple[int, int, int]: def hsl(self) -> tuple[int, int, int]:
"Get the color as a tuple of hue, saturation, and luminance levels"
rgb = [value / 255 for value in self.rgb] rgb = [value / 255 for value in self.rgb]
values = [int(value * 255) for value in rgb_to_hls(*rgb)] values = [int(value * 255) for value in rgb_to_hls(*rgb)]
return (values[0], values[2], values[1]) return (values[0], values[2], values[1])
@ -195,48 +125,70 @@ class Color(str):
@cached_property @cached_property
def red(self) -> int: def red(self) -> int:
"Get the red color level"
return int(self[1:3], 16) return int(self[1:3], 16)
@cached_property @cached_property
def green(self) -> int: def green(self) -> int:
"Get the green color level"
return int(self[3:5], 16) return int(self[3:5], 16)
@cached_property @cached_property
def blue(self) -> int: def blue(self) -> int:
"Get the blue color level"
return int(self[5:7], 16) return int(self[5:7], 16)
@cached_property @cached_property
def hue(self) -> int: def hue(self) -> int:
"Get the hue level"
return self.hsl[0] return self.hsl[0]
@cached_property @cached_property
def saturation(self) -> int: def saturation(self) -> int:
"Get the saturation level"
return self.hsl[1] return self.hsl[1]
@cached_property @cached_property
def lightness(self) -> int: def luminance(self) -> int:
"Get the luminance level"
return self.hsl[2] return self.hsl[2]
def alter(self, action: str, multiplier: int) -> Self: def alter(self,
action: Literal["lighten", "darken", "saturate", "desaturate"],
multiplier: int) -> Self:
"""
Change the lightness or saturation of the color
:param action: Modification action to take on the color
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
hue, saturation, luminance = self.hsl hue, saturation, luminance = self.hsl
if action == "lighten": if action == "lighten":
luminance += int((255 - luminance) * Color.parse_multi(multiplier)) luminance += int((255 - luminance) * Color._parse_multi(multiplier))
elif action == "darken": elif action == "darken":
luminance -= int(luminance * Color.parse_multi(multiplier)) luminance -= int(luminance * Color._parse_multi(multiplier))
elif action == "saturate": elif action == "saturate":
saturation += int((255 - saturation) * Color.parse_multi(multiplier)) saturation += int((255 - saturation) * Color._parse_multi(multiplier))
elif action == "desaturate": elif action == "desaturate":
saturation -= int(saturation * Color.parse_multi(multiplier)) saturation -= int(saturation * Color._parse_multi(multiplier))
else: else:
raise ValueError(f"Invalid action: {action}") raise ValueError(f"Invalid action: {action}")
@ -245,22 +197,57 @@ class Color(str):
def lighten(self, multiplier: int) -> Self: def lighten(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('lighten', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("lighten", multiplier) return self.alter("lighten", multiplier)
def darken(self, multiplier: int) -> Self: def darken(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('darken', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("darken", multiplier) return self.alter("darken", multiplier)
def saturate(self, multiplier: int) -> Self: def saturate(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('saturate', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("saturate", multiplier) return self.alter("saturate", multiplier)
def desaturate(self, multiplier: int) -> Self: def desaturate(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('desaturate', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("desaturate", multiplier) return self.alter("desaturate", multiplier)
def rgba(self, opacity: int) -> str: def rgba(self, opacity: int) -> str:
"""
Return the color as a CSS ``rgba`` value
:param opacity: Opacity level to apply to the color. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
if 0 < opacity > 100: if 0 < opacity > 100:
raise ValueError("Opacity must anywhere from 0 to 100") raise ValueError("Opacity must anywhere from 0 to 100")
@ -270,6 +257,9 @@ class Color(str):
class Cookie: class Cookie:
"Represents an HTTP cookie"
def __init__(self, def __init__(self,
key: str, key: str,
value: str | None, value: str | None,
@ -279,28 +269,65 @@ class Cookie:
domain: str | None = None, domain: str | None = None,
secure: bool = False, secure: bool = False,
http_only: bool = False, http_only: bool = False,
same_site: Literal["lax", "strict", "none"] | None = "lax") -> None: same_site: Literal["lax", "strict", "none"] | None = None) -> None:
"""
Create a new cookie
:param key: Name of the cookie
:param value: Value of the cookie
:param max_age: Maximum amount of time for the cookie to be valid
:param expires: Date the cookie expires
:param path: Path base the cookie should be used on
:param domain: Domain the cookie should be used on
:param secure: Ensure the cookie is only used with ``HTTPS`` connections
:param http_only: Prevent on-page javascript from accessing the cookie
:param same_site: Make sure the cookie stays on the same website
"""
if isinstance(max_age, int): if isinstance(max_age, int):
max_age = timedelta(seconds = min(0, max_age)) max_age = timedelta(seconds = min(0, max_age))
self.key: str = key self.key: str = key
"Name of the cookie"
self.value: str | None = value or None self.value: str | None = value or None
"Value of the cookie"
self.max_age: timedelta | None = max_age self.max_age: timedelta | None = max_age
"Maximum amount of time for the cookie to be valid"
self.expires: HttpDate | None = HttpDate.parse(expires) if expires is not None else None self.expires: HttpDate | None = HttpDate.parse(expires) if expires is not None else None
"Date the cookie expires"
self.path: str = path self.path: str = path
"Path base the cookie should be used on"
self.domain: str | None = domain self.domain: str | None = domain
self.same_site: Literal["lax", "strict", "none"] = same_site or "none" "Domain the cookie should be used on"
self.secure: bool = secure self.secure: bool = secure
"Ensure the cookie is only used with ``HTTPS`` connections"
self.http_only: bool = False self.http_only: bool = False
"Prevent on-page javascript from accessing the cookie"
self.same_site: Literal["lax", "strict", "none"] | None = same_site
"Make sure the cookie stays on the same website"
@classmethod @classmethod
def from_dict(cls: type[Self], data: dict[str, Any]) -> Self: def from_dict(cls: type[Self], data: dict[str, Any]) -> Self:
same_site = data.get("same_site", data.get("samesite", "none")).lower() """
Create a new cookie from a :class:`dict`. The keys can be HTTP cookie keys or
``Cookie`` property names.
if same_site not in {"lax", "strict", "none"}: :param data: Dictionary of cookie properties
raise ValueError(f"same_site must be 'lax', 'strict', or 'none', not {same_site}") """
same_site = data.get("same_site", data.get("samesite", None))
if same_site not in {"lax", "strict", "none", None}:
raise ValueError(f"same_site must be 'lax', 'strict', 'none' or None, not {same_site}")
max_age = data.get("max_age", data.get("maxage")) max_age = data.get("max_age", data.get("maxage"))
@ -313,12 +340,18 @@ class Cookie:
data.get("domain"), data.get("domain"),
data.get("secure", False), data.get("secure", False),
data.get("http_only", data.get("httponly", False)), data.get("http_only", data.get("httponly", False)),
same_site same_site.lower() if isinstance(same_site, str) else None # type: ignore[arg-type]
) )
@classmethod @classmethod
def parse(cls: type[Self], raw_data: str) -> Self: def parse(cls: type[Self], raw_data: str) -> Self:
"""
Read a raw cookie string and create a new ``Cookie`` object
:param raw_data: Raw cookie data
"""
lines = tuple(line.strip() for line in raw_data.split(";")) lines = tuple(line.strip() for line in raw_data.split(";"))
ckey, _, cvalue = lines[0].partition("=") ckey, _, cvalue = lines[0].partition("=")
@ -338,14 +371,20 @@ class Cookie:
def copy(self) -> Self: def copy(self) -> Self:
"Create a copy of the cookie"
return type(self).from_dict(self.to_dict()) return type(self).from_dict(self.to_dict())
def set_deleted(self) -> None: def set_deleted(self) -> None:
"Set the expires property to a date in the past"
self.expires = HttpDate.parse("Thu, 01 Jan 1970 00:00:00 GMT") self.expires = HttpDate.parse("Thu, 01 Jan 1970 00:00:00 GMT")
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
"Return the cookie as a dict"
keys = ( keys = (
"key", "value", "max_age", "expires", "path", "key", "value", "max_age", "expires", "path",
"domain", "same_site", "secure", "http_only" "domain", "same_site", "secure", "http_only"
@ -355,12 +394,16 @@ class Cookie:
def to_string(self) -> str: def to_string(self) -> str:
"Convert the cookie to a raw cookie string"
values = [ values = [
f"{self.key}={self.value or ''}", f"{self.key}={self.value or ''}",
f"path={self.path}", f"path={self.path}"
f"samesite={self.same_site}"
] ]
if self.same_site is not None:
values.append(f"samesite={self.same_site}")
if self.max_age is not None: if self.max_age is not None:
values.append(f"maxage={self.max_age.seconds}") values.append(f"maxage={self.max_age.seconds}")
@ -380,7 +423,16 @@ class Cookie:
class StateProxy(MutableMapping[str, Any]): class StateProxy(MutableMapping[str, Any]):
"Proxy object for the ASGI state value"
def __init__(self, state: dict[str, str]) -> None: def __init__(self, state: dict[str, str]) -> None:
"""
Create a new state proxy object
:param state: A dict to proxy (usually the ASGI state)
"""
self._state: dict[str, str] = state self._state: dict[str, str] = state
@ -430,29 +482,55 @@ class StateProxy(MutableMapping[str, Any]):
def get(self, key: str, default: Any = None) -> Any: def get(self, key: str, default: Any = None) -> Any:
"""
Get a value or return the default if it does not exist
:param key: Name of the value to get
:param default: Value to return if the key does not exist
"""
return self._state.get(key, default) return self._state.get(key, default)
def items(self) -> ItemsView[str, Any]: def items(self) -> ItemsView[str, Any]:
"Return all state items as key/value pairs"
return self._state.items() return self._state.items()
def keys(self) -> KeysView[str]: def keys(self) -> KeysView[str]:
"Return all keys in the state"
return self._state.keys() return self._state.keys()
def set(self, key: str, value: Any) -> None: def set(self, key: str, value: Any) -> None:
"""
Set a state value
:param key: Name of the value to set
:param value: Data to set
"""
self._state[key] = value self._state[key] = value
def values(self) -> ValuesView[Any]: def values(self) -> ValuesView[Any]:
"Return all values in the state"
return self._state.values() return self._state.values()
class Stream: class Stream:
"Contains the reader and writer functions. Can be used as an iterator for fetching data chunks."
def __init__(self, reader: ReaderFunction, writer: WriterFunction) -> None: def __init__(self, reader: ReaderFunction, writer: WriterFunction) -> None:
self.reader = reader self.reader: ReaderFunction = reader
self.writer = writer "Function for reading data from a client"
self.writer: WriterFunction = writer
"Function for writing data to a client"
self._sent_headers: bool = False self._sent_headers: bool = False
self._sent_body: bool = False self._sent_body: bool = False
self.__next: bool = True self.__next: bool = True
@ -474,10 +552,14 @@ class Stream:
async def close(self) -> None: async def close(self) -> None:
"Disconnect from the client"
await self.writer({"type": "http.disconnect"}) await self.writer({"type": "http.disconnect"})
async def read(self) -> bytes: async def read(self) -> bytes:
"Read the whole request body"
body = b"" body = b""
while True: while True:
@ -491,27 +573,66 @@ class Stream:
async def read_chunk(self) -> tuple[bytes, bool]: async def read_chunk(self) -> tuple[bytes, bool]:
"Read a chunk of data from the request and return a tuple of (``body chunk``, ``more body``)"
data = await self.reader() data = await self.reader()
return data.get("body", b""), data.get("more_body", False) return data.get("body", b""), data.get("more_body", False)
async def write_headers(self, status: int, headers: Sequence[tuple[bytes, bytes]]) -> None: async def write_headers(self,
status: int,
headers: Mapping[str, str],
cookies: Sequence[Cookie] | None = None) -> None:
"""
Write the response headers. If this was already called, nothing happens.
:param status: HTTP status to respond with
:param headers: A sequence of HTTP header values to be sent with the response
"""
if self._sent_headers: if self._sent_headers:
return return
raw_headers: list[tuple[bytes, bytes]] = []
for key, value in headers.items():
if not isinstance(value, str):
# this will be a `logging.warning` call in the future
print(f"not string: {key} {type(value)} {value}") # type: ignore[unreachable]
continue
raw_headers.append((key.encode("utf-8"), value.encode("utf-8")))
for cookie in (cookies or []):
raw_headers.append((b"Set-Cookie", cookie.to_string().encode("utf-8")))
await self.writer({ await self.writer({
"type": "http.response.start", "type": "http.response.start",
"status": status, "status": status,
"headers": headers "headers": raw_headers
}) })
self._sent_headers = True
async def write_body(self, data: bytes, eof: bool = True) -> None:
"""
Send body data to the client. If an eof was already sent, this does nothing.
:param data: Raw data to send to the client
:param eof: Send an ``EOF`` to indicate the response is done.
"""
if not self._sent_headers:
raise RuntimeError("Response headers have not been sent yet")
async def write_body(self, data: bytes, eof: bool = False) -> None:
if self._sent_body: if self._sent_body:
return return
await self.writer({ await self.writer({
"type": "http.response.body", "type": "http.response.body",
"body": data, "body": data,
"more_body": eof "more_body": not eof
}) })
self._sent_body = eof

View file

@ -10,10 +10,10 @@ from multipart.multipart import Field, File, create_form_parser
from typing import Any, Literal, TypedDict, TypeVar from typing import Any, Literal, TypedDict, TypeVar
from urllib.parse import parse_qsl from urllib.parse import parse_qsl
from .misc import Cookie, Stream from .misc import Cookie, StateProxy, Stream
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from .application import Application, R, RT, AT from .application import Application
T = TypeVar("T", bound = JsonBase) T = TypeVar("T", bound = JsonBase)
@ -42,18 +42,69 @@ class ScopeType(TypedDict):
class Request: class Request:
"Represents an incoming client request"
# I'm not sure how to "forward" generics without needing to specify then on every request,
# so it'll just be static for now until I can figure it out
app: Application[Request, StateProxy, StateProxy]
"Application the request is associated with"
stream: Stream
"Stream object associated with the request"
method: str
"HTTP method set by the client"
path: str
"Requested HTTP path"
query: CIMultiDictProxy[str]
"Extra data pairs at the end of a request path"
headers: CIMultiDictProxy[str]
"Headers set by the client"
params: dict[str, Any]
"Path parameters for the route if they exist"
remote_raw: str | None
"The remote IP of the client"
local: str | None
"Address of the server"
cookies: dict[str, Cookie]
"Cookies sent by the client"
state: StateProxy
"Data to pass with the request"
extensions: dict[str, Any]
"ASGI extensions"
asgi: tuple[str, str]
"ASGI version and spec version"
version: float
"HTTP version of the request"
scheme: str
"Protocol of the request"
def __init__(self, def __init__(self,
app: Application[R, RT, AT], app: Application[Request, StateProxy, StateProxy],
scope: ScopeType, scope: ScopeType,
stream: Stream) -> None: stream: Stream) -> None:
self.app: Application[R, RT, AT] = app
self.stream: Stream = stream
self._body: bytes = b"" self._body: bytes = b""
raw_query = parse_qsl(scope["query_string"].decode("utf-8"), keep_blank_values = True) raw_query = parse_qsl(scope["query_string"].decode("utf-8"), keep_blank_values = True)
raw_headers = ((k.decode("utf-8").title(), v.decode("utf-8")) for k, v in scope["headers"]) raw_headers = ((k.decode("utf-8").title(), v.decode("utf-8")) for k, v in scope["headers"])
self.app: Application[Request, StateProxy, StateProxy] = app
self.stream: Stream = stream
self.method: str = scope["method"].upper() self.method: str = scope["method"].upper()
self.path: str = scope["path"] self.path: str = scope["path"]
self.query: CIMultiDictProxy[str] = CIMultiDictProxy(CIMultiDict(raw_query)) self.query: CIMultiDictProxy[str] = CIMultiDictProxy(CIMultiDict(raw_query))
@ -62,8 +113,7 @@ class Request:
self.remote_raw: str | None = (scope.get("client") or (None,))[0] self.remote_raw: str | None = (scope.get("client") or (None,))[0]
self.local: str | None = (scope.get("server") or (None, ))[0] self.local: str | None = (scope.get("server") or (None, ))[0]
self.cookies: dict[str, Cookie] = {} self.cookies: dict[str, Cookie] = {}
self.state: StateProxy = app.request_state_class(scope.get("state") or {})
self.state: RT = app.request_state_class(scope.get("state") or {})
self.extensions: dict[str, Any] = scope.get("extensions", {}) # type: ignore[assignment] self.extensions: dict[str, Any] = scope.get("extensions", {}) # type: ignore[assignment]
# keep? # keep?
@ -78,25 +128,35 @@ class Request:
@property @property
def content_length(self) -> int: def content_length(self) -> int:
"Get the ``Content-Length`` header"
return int(self.headers.get("Content-Length", "0")) return int(self.headers.get("Content-Length", "0"))
@property @property
def content_type(self) -> str: def content_type(self) -> str:
"Get the first value of the ``Content-Type`` header"
return self.headers.getone("Content-Type", "") return self.headers.getone("Content-Type", "")
@property @property
def remote(self) -> str | None: def remote(self) -> str | None:
"Get the real IP address of the client"
return self.headers.get("X-Forwarded-For", self.headers.get("X-Real-Ip", self.remote_raw)) return self.headers.get("X-Forwarded-For", self.headers.get("X-Real-Ip", self.remote_raw))
@property @property
def user_agent(self) -> str | None: def user_agent(self) -> str | None:
"Get the ``User-Agent`` header"
return self.headers.get("User-Agent") return self.headers.get("User-Agent")
async def body(self) -> bytes: async def body(self) -> bytes:
"Read the request body. It will be stored in memory."
if not self._body: if not self._body:
self._body = await self.stream.read() self._body = await self.stream.read()
@ -104,14 +164,24 @@ class Request:
async def text(self) -> str: async def text(self) -> str:
"Read the request body as a ``str``"
return (await self.body()).decode("utf-8") return (await self.body()).decode("utf-8")
async def json(self, parser_class: type[T] = JsonBase) -> T: # type: ignore[assignment] async def json(self, parser_class: type[T] = JsonBase) -> T: # type: ignore[assignment]
"""
Read the request body as a JSON document
:param parser_class: Class to use when parsing the body
"""
return parser_class.parse(await self.body()) return parser_class.parse(await self.body())
async def form(self) -> CIMultiDictProxy[str | File]: async def form(self) -> CIMultiDictProxy[str | File]:
"Parse form data from the request body"
if self.content_type not in {"multipart/form-data", "application/x-www-form-urlencoded"}: if self.content_type not in {"multipart/form-data", "application/x-www-form-urlencoded"}:
raise ValueError(f"Invalid mimetype for form data: {self.content_type}") raise ValueError(f"Invalid mimetype for form data: {self.content_type}")
@ -140,4 +210,4 @@ class Request:
async def stream_response(self, status: int, headers: dict[str, str]) -> None: async def stream_response(self, status: int, headers: dict[str, str]) -> None:
pass raise NotImplementedError

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import os import os
from aputils import HttpDate, JsonBase from aputils import HttpDate, JsonBase
from blib import HttpError, HttpStatus, convert_to_bytes
from collections.abc import Sequence from collections.abc import Sequence
from datetime import datetime, timedelta from datetime import datetime, timedelta
from mimetypes import guess_type from mimetypes import guess_type
@ -12,9 +13,7 @@ from pathlib import Path
from typing import Any, Literal, Self from typing import Any, Literal, Self
from urllib.parse import quote from urllib.parse import quote
from .enums import HttpStatus from .misc import Cookie, Stream
from .error import HttpError
from .misc import Cookie, Stream, convert_to_bytes
from .request import Request from .request import Request
@ -36,7 +35,7 @@ class Response:
:param headers: Header items to include in the message :param headers: Header items to include in the message
""" """
self.cookies: dict[str, Cookie] = {} self.cookies: list[Cookie] = []
"New cookies to be sent to the client" "New cookies to be sent to the client"
self.status: HttpStatus = HttpStatus.parse(status) self.status: HttpStatus = HttpStatus.parse(status)
@ -157,6 +156,14 @@ class Response:
self.headers.update({"Content-Type": value}) self.headers.update({"Content-Type": value})
def get_cookie(self, key: str) -> Cookie:
for cookie in self.cookies:
if cookie.key == key:
return cookie
raise KeyError(key)
def set_cookie(self, def set_cookie(self,
key: str, key: str,
value: str | None, value: str | None,
@ -169,15 +176,19 @@ class Response:
same_site: Literal["lax", "strict", "none"] | None = "lax") -> Cookie: same_site: Literal["lax", "strict", "none"] | None = "lax") -> Cookie:
cookie = Cookie(key, value, max_age, expires, path, domain, secure, http_only, same_site) cookie = Cookie(key, value, max_age, expires, path, domain, secure, http_only, same_site)
self.cookies[cookie.key] = cookie self.cookies.append(cookie)
return cookie return cookie
def delete_cookie(self, cookie: Cookie) -> Cookie: def delete_cookie(self, key: str) -> Cookie:
deleted_cookie = cookie.copy() try:
deleted_cookie.set_deleted() cookie = self.get_cookie(key)
self.cookies[deleted_cookie.key] = deleted_cookie
return deleted_cookie except KeyError:
cookie = self.set_cookie(key, "")
cookie.set_deleted()
return cookie
async def send(self, stream: Stream, request: Request) -> None: async def send(self, stream: Stream, request: Request) -> None:
@ -188,21 +199,8 @@ class Response:
:param request: Request associated with the response :param request: Request associated with the response
""" """
headers = [] await stream.write_headers(self.status, self.headers, self.cookies)
await stream.write_body(self.body, True)
for key, value in self.headers.items():
if not isinstance(value, str):
# this will be a `logging.warning` call in the future
print(f"not string: {key} {type(value)} {value}") # type: ignore[unreachable]
continue
headers.append((key.encode("utf-8"), value.encode("utf-8")))
for cookie in self.cookies.values():
headers.append((b"Set-Cookie", cookie.to_string().encode("utf-8")))
await stream.write_headers(self.status, headers)
await stream.write_body(self.body)
class FileResponse(Response): class FileResponse(Response):
@ -211,7 +209,7 @@ class FileResponse(Response):
def __init__(self, def __init__(self,
path: Path | str, path: Path | str,
mimetype: str | None = None, mimetype: str | None = None,
chunk_size: int = 8192, chunk_size: int = 0,
status: HttpStatus | int = HttpStatus.Ok, status: HttpStatus | int = HttpStatus.Ok,
headers: dict[str, str] | None = None) -> None: headers: dict[str, str] | None = None) -> None:
""" """
@ -236,7 +234,7 @@ class FileResponse(Response):
if isinstance(path, str): if isinstance(path, str):
path = Path(path) path = Path(path)
self.chunk_size: int = chunk_size if chunk_size > 0 else 8192 self.chunk_size: int = chunk_size if chunk_size > 0 else 131_072
self.path: Path = path.expanduser().resolve() self.path: Path = path.expanduser().resolve()
if not self.path.exists(): if not self.path.exists():
@ -245,18 +243,14 @@ class FileResponse(Response):
if not self.path.is_file(): if not self.path.is_file():
raise ValueError(f"Path is a directory: {self.path}") raise ValueError(f"Path is a directory: {self.path}")
self.length = self.path.stat().st_size
async def send(self, stream: Stream, request: Request) -> None: async def send(self, stream: Stream, request: Request) -> None:
await stream.write_headers(200, tuple([])) await stream.write_headers(200, self.headers, self.cookies)
with self.path.open("rb") as fd: with self.path.open("rb") as fd:
while True: await stream.write_body(fd.read(), True)
if not (data := fd.read(self.chunk_size)):
break
await stream.write_body(data, True)
await stream.write_body(b"", False)
class TemplateResponse(Response): class TemplateResponse(Response):
@ -268,7 +262,7 @@ class TemplateResponse(Response):
status: HttpStatus | int = HttpStatus.Ok, status: HttpStatus | int = HttpStatus.Ok,
mimetype: str | None = None, mimetype: str | None = None,
headers: dict[str, Any] | None = None, headers: dict[str, Any] | None = None,
pretty_print: bool = False) -> None: pprint: bool = False) -> None:
""" """
Create a new ``TemplateResponse`` object Create a new ``TemplateResponse`` object
@ -283,12 +277,17 @@ class TemplateResponse(Response):
Response.__init__(self, status, b"", mimetype or self.detect_mimetype(name), headers) Response.__init__(self, status, b"", mimetype or self.detect_mimetype(name), headers)
self.name: str = name self.name: str = name
self.pretty_print: bool = pretty_print self.pprint: bool = pprint
self.context: dict[str, Any] = context or {} self.context: dict[str, Any] = context or {}
def detect_mimetype(self, path: str) -> str: @staticmethod
"Return the mimetype of the template based on the file extension" def detect_mimetype(path: str) -> str:
"""
Return the mimetype of the template based on the file extension
:param path: Path to detect the mimetype of
"""
_, ext = os.path.splitext(path) _, ext = os.path.splitext(path)
@ -306,7 +305,9 @@ class TemplateResponse(Response):
"request": request, "request": request,
**self.context **self.context
} }
text = request.app.template.render(self.name, context, self.pretty_print)
text = request.app.template.render(self.name, context, self.pprint)
self.body = text.encode("utf-8") self.body = text.encode("utf-8")
self.length = len(self.body)
await Response.send(self, stream, request) await Response.send(self, stream, request)

View file

@ -1,10 +1,10 @@
import http_router import http_router
from collections.abc import Awaitable, Callable, Iterable from blib import HttpError
from collections.abc import Awaitable, Callable
from http_router.routes import RouteMatch from http_router.routes import RouteMatch
from typing import Any, Pattern from typing import Any, Pattern
from .error import HttpError
from .request import Request from .request import Request
from .response import Response from .response import Response
@ -55,23 +55,25 @@ class Router(http_router.Router):
def bind(self, # type: ignore[override] def bind(self, # type: ignore[override]
target: RouteHandler, target: RouteHandler,
*paths: str | Pattern[str], method: str,
methods: Iterable[str] | str | None = None) -> list[http_router.Route]: *paths: str | Pattern[str]) -> list[http_router.Route]:
""" """
Add a route handler Add a route handler
:param target: Function to call on request :param target: Function to call on request
:param method: HTTP method the route should handle
:param paths: List of paths the route should handle :param paths: List of paths the route should handle
:param methods: List of methods the route should handle
""" """
if isinstance(methods, list): return http_router.Router.bind(self, target, *paths, methods = [method.upper()])
methods = [method.upper() for method in methods]
elif isinstance(methods, str):
methods = [methods.upper()]
return http_router.Router.bind(self, target, *paths, methods = methods) def route(self, method: str, *paths: str) -> Callable[[RouteHandler], RouteHandler]: # type: ignore[override] # noqa: E501
def wrapper(handler: RouteHandler) -> RouteHandler:
self.bind(handler, method, *paths)
return handler
return wrapper
ROUTERS: dict[str, Router] = { ROUTERS: dict[str, Router] = {
@ -120,7 +122,7 @@ def route(router: str, method: str, *paths: str) -> Callable[[RouteHandler], Rou
""" """
def wrapper(func: RouteHandler) -> RouteHandler: def wrapper(func: RouteHandler) -> RouteHandler:
get_router(router).bind(func, *paths, methods = method) get_router(router).bind(func, method, *paths)
return func return func
return wrapper return wrapper

View file

@ -1,156 +0,0 @@
import asyncio
import inspect
import traceback
from collections.abc import Awaitable, Callable
from typing import Any, Self
from .misc import is_loop_running
SignalCallback = Callable[..., Awaitable[bool | None]]
MainSignalCallback = Callable[..., Awaitable[None]]
class Signal(list[SignalCallback]):
"Allows a series of callbacks to get called via async. Use as a decorator for the base function."
def __init__(self, timeout: float | int = 5.0):
"""
:param timeout: Time in seconds to wait before cancelling the callback
"""
self.timeout: float | int = timeout
self.callback: MainSignalCallback | None = None
self.object: Any = None
def __get__(self, obj: Any, objtype: type[Any]) -> Self:
if obj and not self.object:
self.object = obj
return self
def __call__(self, callback: MainSignalCallback) -> Self:
if not inspect.iscoroutinefunction(callback):
raise RuntimeError(f"Not a coroutine: {callback.__name__}")
self.callback = callback
self.__doc__ = callback.__doc__
self.__annotations__ = callback.__annotations__
return self
def append(self, value: SignalCallback) -> None:
"""
Add a callback
:param value: Callback
"""
self.connect(value)
def remove(self, value: SignalCallback) -> None:
"""
Remove a callback
:param value: Callback
"""
self.disconnect(value)
def emit(self, *args: Any, **kwargs: Any) -> None:
"""
Call all of the callbacks in the order they were added as well as the associated
function.
If any callback returns `True`, all other callbacks get skipped.
:param args: Positional arguments to pass to all of the callbacks
:param kwargs: Keyword arguments to pass to all of the callbacks
"""
if not is_loop_running():
raise RuntimeError("Event loop is not running")
asyncio.create_task(self.handle_emit(*args, **kwargs))
def connect(self, callback: SignalCallback) -> SignalCallback:
"""
Add a function to the list of callbacks. Can be used as a decorator.
:param callback: A callable or coroutine
"""
if callback not in self:
list.append(self, callback)
return callback
def disconnect(self, callback: SignalCallback) -> None:
"""
Remove a function from the list of callbacks
:param callback: A callable or coroutine
"""
if not self.callback:
# oh boy something really goofed
return
try:
list.remove(self, callback)
except ValueError:
cbname = callback.__name__
signame = self.callback.__name__
print(f"WARNING: '{cbname}' was not connted to signal '{signame}'")
async def handle_emit(self, *args: Any, **kwargs: Any) -> None:
"""
This gets called by :meth:`Signal.emit` as an :class:`asyncio.Task`.
:param args: Positional arguments to pass to all of the callbacks
:param kwargs: Keyword arguments to pass to all of the callbacks
"""
if not self.callback:
# oh boy something really goofed
return
for callback in self:
if await self.handle_callback(callback, *args, **kwargs):
break
await self.handle_callback(self.callback, *args, **kwargs)
async def handle_callback(self,
callback: Callable[..., Awaitable[bool | None]],
*args: Any,
**kwargs: Any) -> bool | None:
try:
# asyncio.timeout and asyncio.wait_for complain, so need to find a new way
# async with asyncio.timeout(self.timeout):
if args and args[0] == self.object:
return await callback(*args, **kwargs)
else:
return await callback(self.object, *args, **kwargs)
except TimeoutError:
print(f"Callback '{callback.__name__}' timed out")
return True
except Exception:
traceback.print_exc()
return True

View file

@ -48,7 +48,7 @@ exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
intersphinx_mapping = { intersphinx_mapping = {
"python": (f"https://docs.python.org/{pyversion}", None), "python": (f"https://docs.python.org/{pyversion}", None),
"aputils": ("https://docs.barkshark.xyz/aputils", None), "aputils": ("https://docs.barkshark.xyz/aputils", None),
"bsql": ("https://docs.barkshark.xyz/barkshark-sql", None), "blib": ("https://docs.barkshark.xyz/blib", None),
"gemi": ("https://docs.barkshark.xyz/gemi", None), "gemi": ("https://docs.barkshark.xyz/gemi", None),
"hamlish-jinja": ("https://docs.barkshark.xyz/hamlish-jinja", None), "hamlish-jinja": ("https://docs.barkshark.xyz/hamlish-jinja", None),
"jinja2": ("https://jinja.palletsprojects.com/en/3.1.x/", None) "jinja2": ("https://jinja.palletsprojects.com/en/3.1.x/", None)

View file

@ -1,14 +0,0 @@
Application
===========
.. autoclass:: basgi.Application
:members:
:show-inheritance:
.. autoclass:: basgi.Signal
:members:
:show-inheritance:
.. autoclass:: basgi.Stream
:members:
:show-inheritance:

View file

@ -1,7 +1,7 @@
Enums Enums
===== =====
.. autoclass:: basgi.HttpStatus .. autoclass:: basgi.PridePalette
:members: :members:
:show-inheritance: :show-inheritance:
:undoc-members: :undoc-members:

View file

@ -1,6 +0,0 @@
Exceptions
==========
.. autoclass:: basgi.HttpError
:members:
:show-inheritance:

View file

@ -5,50 +5,45 @@ API
Application Application
----------- -----------
:class:`basgi.Application` .. autoclass:: basgi.Application
:members:
:class:`basgi.Signal` :show-inheritance:
:class:`basgi.Stream`
HTTP Messages HTTP Messages
------------- -------------
:class:`basgi.Request` .. autoclass:: basgi.Request
:members:
:show-inheritance:
:exclude-members: __init__
:class:`basgi.Response` .. autoclass:: basgi.Response
:members:
:show-inheritance:
:class:`basgi.FileResponse` .. autoclass:: basgi.FileResponse
:members:
:show-inheritance:
:class:`basgi.TemplateResponse` .. autoclass:: basgi.TemplateResponse
:members:
:show-inheritance:
Application Runners Application Runners
------------------- -------------------
:class:`basgi.GranianRunner` .. autoclass:: basgi.Runner
:members:
:show-inheritance:
:class:`basgi.UvicornRunner` .. autoclass:: basgi.GranianRunner
:members:
:show-inheritance:
:exclude-members: setup_module
.. autoclass:: basgi.UvicornRunner
Templates :members:
--------- :show-inheritance:
:exclude-members: setup_module
:class:`basgi.Template`
:class:`basgi.SassExtension`
Enums
-----
:class:`basgi.HttpStatus`
:class:`basgi.SassOutputStyle`
Exceptions
----------
:class:`basgi.HttpError`

View file

@ -1,19 +0,0 @@
Messages
========
.. autoclass:: basgi.Request
:members:
:show-inheritance:
.. autoclass:: basgi.Response
:members:
:show-inheritance:
.. autoclass:: basgi.FileResponse
:members:
:show-inheritance:
.. autoclass:: basgi.TemplateResponse
:members:
:show-inheritance:

36
docs/src/api/misc.rst Normal file
View file

@ -0,0 +1,36 @@
Misc
====
.. autoclass:: basgi.Client
:members:
:show-inheritance:
.. autoclass:: basgi.Color
:members:
:show-inheritance:
.. autoclass:: basgi.Cookie
:members:
:show-inheritance:
.. autoclass:: basgi.SassExtension
:members:
:show-inheritance:
:exclude-members: __init__
.. autoclass:: basgi.StateProxy
:members:
:show-inheritance:
.. autoclass:: basgi.StaticHandler
:members:
:show-inheritance:
.. autoclass:: basgi.Stream
:members:
:show-inheritance:
:exclude-members: __init__
.. autoclass:: basgi.Template
:members:
:show-inheritance:

View file

@ -1,16 +0,0 @@
App Runners
===========
.. autoclass:: basgi.Runner
:members:
:show-inheritance:
.. autoclass:: basgi.GranianRunner
:members:
:show-inheritance:
:exclude-members: setup_module
.. autoclass:: basgi.UvicornRunner
:members:
:show-inheritance:
:exclude-members: setup_module

View file

@ -1,4 +0,0 @@
Signal
======

View file

@ -1,2 +1,22 @@
Usage Usage
===== =====
Application
-----------
:class:`basgi.Application` is the base for a web server. Be sure to set a unique name.
.. code-block:: python
import basgi
@basgi.get("xyz.barkshark.BASGI", "/")
async def handle_home(request: basgi.Request) -> basgi.Response:
return basgi.Response(200, "Merp!")
app = basgi.Application("xyz.barkshark.BASGI")
@app.route("GET", "/about")
async def handle_about(request: basgi.Request) -> basgi.Response:
return basgi.Response(200, "")

View file

@ -5,13 +5,9 @@ subtrees:
- file: src/api/index.rst - file: src/api/index.rst
subtrees: subtrees:
- entries: - entries:
- file: src/api/app.rst
- file: src/api/runners.rst
- file: src/api/messages.rst
- file: src/api/router.rst - file: src/api/router.rst
- file: src/api/template.rst - file: src/api/misc.rst
- file: src/api/enums.rst - file: src/api/enums.rst
- file: src/api/exceptions.rst
- url: https://git.barkshark.xyz/barkshark/basgi - url: https://git.barkshark.xyz/barkshark/basgi
title: Git Repo title: Git Repo
- url: https://barkshark.xyz/@izalia - url: https://barkshark.xyz/@izalia

View file

@ -36,6 +36,7 @@ classifiers = [
requires-python = ">= 3.11" requires-python = ">= 3.11"
dependencies = [ dependencies = [
"activitypub-utils == 0.2.3", "activitypub-utils == 0.2.3",
"barkshark-lib == 0.1.2",
"gemi-python == 0.1.1", "gemi-python == 0.1.1",
"http-router == 4.1.2", "http-router == 4.1.2",
"httpx[http2] == 0.27.0", "httpx[http2] == 0.27.0",
@ -98,6 +99,5 @@ disallow_untyped_decorators = true
warn_redundant_casts = true warn_redundant_casts = true
warn_unreachable = true warn_unreachable = true
warn_unused_ignores = true warn_unused_ignores = true
follow_imports = "silent"
strict = true strict = true
implicit_reexport = true implicit_reexport = true