Compare commits

...

4 commits

Author SHA1 Message Date
Izalia Mae 4d5da1ed50 router and request changes
* redo route parameters
* add `Application.route` method
* disable generics on `Application` in `Request` for now
2024-04-20 18:43:31 -04:00
Izalia Mae cad946555b finish documenting module 2024-04-20 14:38:13 -04:00
Izalia Mae d48cc7024a h 2024-04-19 22:10:55 -04:00
Izalia Mae 2a04991e5c add documentation for some classes and functions 2024-04-19 20:26:44 -04:00
22 changed files with 754 additions and 690 deletions

View file

@ -3,22 +3,17 @@ __version__ = "0.1.0"
from .application import Application, StaticHandler
from .client import Client
from .enums import HttpStatus, SassOutputStyle
from .error import HttpError
from .enums import PridePalette, SassOutputStyle
from .request import Request
from .response import Response, FileResponse, TemplateResponse
from .runner import Runner, UvicornRunner, GranianRunner
from .signal import Signal
from .template import SassExtension, Template
from .misc import (
Color,
Cookie,
StateProxy,
Stream,
convert_to_bytes,
convert_to_string,
is_loop_running
Stream
)
from .router import (
@ -44,5 +39,4 @@ from .misc import ReaderFunction, WriterFunction
from .request import AsgiVersionType, ScopeType
from .router import RouteHandler
from .runner import BackgroundTask
from .signal import SignalCallback
from .template import TemplateContextCallback

View file

@ -2,19 +2,18 @@ from __future__ import annotations
import traceback
from blib import HttpError, Signal, Url
from collections.abc import Callable
from functools import lru_cache
from mimetypes import guess_type
from os.path import normpath
from pathlib import Path
from typing import Any, Generic, TypeVar
from .client import Client
from .error import HttpError
from .misc import StateProxy, Stream, ReaderFunction, WriterFunction
from .request import Request, ScopeType
from .response import FileResponse, Response
from .router import Router, RouteHandler, get_router
from .signal import Signal
from .template import Template, TemplateContextCallback
@ -28,6 +27,8 @@ APPLICATIONS: dict[str, Application] = {} # type: ignore[type-arg]
class Application(Generic[R, RT, AT]):
"Represents an ASGI application"
def __init__(self,
name: str,
template_search: list[Path | str] | None = None,
@ -35,35 +36,67 @@ class Application(Generic[R, RT, AT]):
template_context: TemplateContextCallback | None = None,
request_class: type[R] = Request, # type: ignore[assignment]
request_state_class: type[RT] = StateProxy, # type: ignore[assignment]
app_state_class: type[AT] = StateProxy) -> None: # type: ignore[assignment]
app_state_class: type[AT] = StateProxy, # type: ignore[assignment]
http_proxy: Url | str | None = None) -> None:
"""
Create a new ``Application``
:param name: Identifier for the application. This is used for the ``Server`` header
and adding routes.
:param template_search: Directories to search when locating a template
:param template_env: Context data to include with every template render
:param template_context: Function that gets called on every template render
:param request_class: Class to use for requests
:param request_state_class: Class to use for the :attr:`basgi.Request.state` property
:param app_state_class: Class to use for the :attr:`basgi.Application.state` property
"""
if name in APPLICATIONS:
raise ValueError(f"Application with name '{name}' already exists")
APPLICATIONS[name] = self
self.name: str = name
"Identifier for the application"
self.state: AT = app_state_class({})
"Data to be stored with the application"
self.request_class: type[R] = request_class
"Class to use for requests"
self.request_state_class: type[RT] = request_state_class
"Class to use for the :attr:`basgi.Request.state` property"
# Not sure how to properly annotate `Exception` here, so just ignore the warnings
self.error_handlers: dict[type[ExceptionType], ExceptionCallback] = { # type: ignore
HttpError: self.handle_http_error
HttpError: self._handle_http_error
}
"Functions to be called on specific errors"
self.router: Router = get_router(self.name)
self.router.trim_last_slash = True
"Contains all of the routes the application will handle"
self.client: Client = Client(name, http_proxy)
"Customized httpx client"
self.client: Client = Client()
self.template: Template = Template(
*(template_search or []),
context_function = template_context,
**(template_env or {}))
APPLICATIONS[name] = self
**(template_env or {})
)
"Template environment"
@classmethod
def get(cls: type[Application[R, RT, AT]], name: str) -> Application[R, RT, AT]:
@staticmethod
def get(name: str) -> Application[R, RT, AT]:
"""
Get an application by its name
:param name: Identifier of the application
:raises KeyError: If an application by the specified name cannot be found
"""
return APPLICATIONS[name]
@ -94,7 +127,7 @@ class Application(Generic[R, RT, AT]):
await self.on_request.handle_emit(request)
except Exception as error:
response = self.handle_error(request, error)
response = self._handle_error(request, error)
try:
match = self.router(request.path, request.method)
@ -105,13 +138,13 @@ class Application(Generic[R, RT, AT]):
raise HttpError(500, "Empty response")
except Exception as error:
response = self.handle_error(request, error)
response = self._handle_error(request, error)
try:
await self.on_response.handle_emit(request, response)
except Exception as error:
response = self.handle_error(request, error)
response = self._handle_error(request, error)
if not response.headers.get("Server"):
response.headers["Server"] = self.name
@ -122,34 +155,79 @@ class Application(Generic[R, RT, AT]):
@Signal(5.0)
async def on_request(self, request: Request) -> None:
pass
"""
:class:`blib.Signal` that gets emitted just after the request is created
:param application: The application that emit the signal
:param request: The newly-created request
"""
...
@Signal(5.0)
async def on_response(self, request: Request, response: Response) -> None:
pass
"""
:class:`blib.Signal` that gets emitted just after the route handler is called
:param application: The application that emit the signal
:param request: The request
:param response: Response from the route handler
"""
...
@Signal(5.0)
async def on_startup(self) -> None:
pass
"""
:class:`blib.Signal` that gets emitted on server start
:param application: The application that emit the signal
"""
...
@Signal(5.0)
async def on_shutdown(self) -> None:
pass
"""
:class:`blib.Signal` that gets emitted just before server stop
:param application: The application that emit the signal
"""
...
def add_route(self, method: str, path: str, handler: RouteHandler) -> None:
self.router.bind(handler, path, methods = [method])
def add_route(self, handler: RouteHandler, method: str, *paths: str) -> None:
"""
Add a route handler
:param method: HTTP method the route will handle
:param path: Virtual HTTP path the route will handle
:param handler: Function to call when the route is requested
"""
self.router.bind(handler, method, *paths)
def add_static(self, path: str, location: Path | str, cached: bool = False) -> None:
"""
Add a route handler that servers a directory. If a directory is requested and
``index.html`` exists in that directory, it will be served instead.
:param path: Virtual HTTP path the route will handle
:param location: Directory to use for serving files
:param cached: Store all requested files in memory after being read from the disk for
the first time
"""
handler = StaticHandler(path, location, cached)
self.add_route("GET", handler.path, handler)
self.add_route(handler, "GET", handler.path)
def print_access_log(self, request: Request, response: Response) -> None:
"""
Prints the request line to the terminal. Override this in a sub-class to customize it.
"""
message = "{}: {} \"{} {}\" {} {} \"{}\"".format(
"INFO",
request.headers.get(
@ -169,32 +247,63 @@ class Application(Generic[R, RT, AT]):
print(message, flush = True)
def handle_error(self, request: Request, error: Exception) -> Response:
def route(self, method: str, *paths: str) -> Callable[[RouteHandler], RouteHandler]:
"""
Decorator for adding a route from a callable
:param method: HTTP method the route should handle
:param paths: Paths the route should handle
"""
def wrapper(func: RouteHandler) -> RouteHandler:
self.router.bind(func, method, *paths)
return func
return wrapper
def _handle_error(self, request: Request, error: Exception) -> Response:
try:
return self.error_handlers[type(error)](request, error)
except KeyError:
traceback.print_exception(error)
return Response(500, "Internal Server Error")
except Exception:
traceback.print_exc()
return Response(500, "Internal Server Error")
def handle_http_error(self, request: Request, error: HttpError) -> Response:
def _handle_http_error(self, request: Request, error: HttpError) -> Response:
return Response.new_from_http_error(error)
class StaticHandler:
"Provides a handler for a directory of static files"
def __init__(self, path: str, location: Path | str, cached: bool) -> None:
"""
Create a new static file handler
:param path: Virtual HTTP path the route will handle
:param location: Directory to use for serving files
:param cached: Store all requested files in memory after being read from the disk for
the first time
"""
if isinstance(location, str):
location = Path(location)
self.path: str = path.rstrip("/") + "/{filepath:.*}"
"Path regex used to determine if a path is handled or not"
self.location: Path = Path(location).expanduser().resolve()
"Directory to use for serving files"
self.cached: bool = cached
"Whether or not all requested files will be stored in memory"
self.cache: dict[str, Response] = {}
"Currently cached responses"
if not self.location.exists():
raise FileNotFoundError(self.location)
@ -204,27 +313,32 @@ class StaticHandler:
async def __call__(self, request: Request) -> Response:
filepath = request.params["filepath"]
"""
Function that gets called for incoming requests
if self.cached:
return await self.handle_call_cached(request, filepath)
:param request: The incoming request
:param filepath: Relative path of the requested file
:raises HttpError: If the file on disk cannot be found
"""
return await self.handle_call(request, filepath)
filepath = normpath(request.params["filepath"].lstrip("/"))
if self.cached and filepath in self.cache:
return self.cache[filepath]
async def handle_call(self, request: Request, filepath: str) -> Response:
filepath = normpath(filepath)
path = self.location.joinpath(filepath.lstrip("/"))
path = self.location.joinpath(filepath)
if path.is_dir():
path = path.joinpath("index.html")
if not path.is_file():
raise HttpError(404, str(filepath))
raise HttpError(404, filepath)
if self.cached:
with path.open("rb") as fd:
response = Response(200, fd.read(), mimetype = guess_type(path)[0])
self.cache[filepath] = response
return response
return FileResponse(path)
@lru_cache(maxsize = 128, typed = True)
async def handle_call_cached(self, request: Request, filepath: str) -> Response:
return await self.handle_call(request, filepath)

View file

@ -1,28 +1,50 @@
import aputils
import httpx
from typing import TypeVar
from aputils import (
AlgorithmType,
JsonBase,
Message,
Nodeinfo,
Webfinger,
WellKnownNodeinfo,
Signer,
register_signer
)
from . import __version__
T = TypeVar("T", bound = aputils.JsonBase)
T = TypeVar("T", bound = JsonBase)
class Client:
def __init__(self, useragent: str = f"BarksharkASGI/{__version__}"):
"HTTP client with useful methods for fetching ActivityPub resources"
def __init__(self, useragent: str = f"BarksharkASGI/{__version__}", proxy: str | None = None):
"""
Create a new ``Client`` object
:param useragent: ``User-Agent`` header to send with each request
"""
self._client = httpx.AsyncClient(
http2 = True,
timeout = 5,
max_redirects = 3,
proxies = proxy,
headers = {
"User-Agent": useragent
}
},
)
@property
def useragent(self) -> str:
"The ``User-Agent`` header to send with each request"
return self._client.headers["User-Agent"]
@ -32,22 +54,36 @@ class Client:
async def close(self) -> None:
"Close all active connections"
await self._client.aclose()
async def request(self,
method: str,
url: str,
body: aputils.JsonBase | None = None,
body: JsonBase | None = None,
headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256,
signer: Signer | None = None,
algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False,
follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP request
:param method: HTTP method to use
:param url: Resource to fetch
:param body: Data to send as the body
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
url = url.split("#", 1)[0]
if body is not None and not isinstance(body, aputils.JsonBase):
if body is not None and not isinstance(body, JsonBase):
raise TypeError("body must be a JsonBase object")
data = body.to_json() if body else None
@ -69,22 +105,34 @@ class Client:
async def fetch_json(self,
url: str,
cls: type[T] | None = None,
signer: aputils.Signer | None = None) -> T:
signer: Signer | None = None) -> T:
"""
Send a ``GET`` request and return a ``JsonBase`` object
message_class: type[T] = aputils.JsonBase if cls is None else cls # type: ignore
:param url: Location of the resource to fetch
:param cls: Class to use for parsing the JSON body
:param signer: Actor to sign the headers with
"""
message_class: type[T] = JsonBase if cls is None else cls # type: ignore
headers = {
"Accept": "application/" + ("activity+json" if cls is aputils.Message else "json")
"Accept": "application/" + ("activity+json" if cls is Message else "json")
}
response = await self.get(url, headers, signer)
return message_class.parse(response.json())
async def fetch_nodeinfo(self, domain: str) -> aputils.Nodeinfo:
async def fetch_nodeinfo(self, domain: str) -> Nodeinfo:
"""
Get the nodeinfo endpoint of a domain
:param domain: Domain to fetch from
"""
wk_nodeinfo = await self.fetch_json(
f"https://{domain}/.well-known/nodeinfo",
aputils.WellKnownNodeinfo
WellKnownNodeinfo
)
try:
@ -93,21 +141,38 @@ class Client:
except KeyError:
url = wk_nodeinfo.v20
return await self.fetch_json(url, aputils.Nodeinfo)
return await self.fetch_json(url, Nodeinfo)
async def fetch_webfinger(self, username: str, domain: str) -> aputils.Webfinger:
async def fetch_webfinger(self, username: str, domain: str) -> Webfinger:
"""
Query webfinger for a user
:param username: Name of the user to fetch
:param domain: Domain to fetch from
"""
url = f"https://{domain}/.well-known/webfinger?resource=acct:{username}@{domain}"
return await self.fetch_json(url, aputils.Webfinger)
return await self.fetch_json(url, Webfinger)
async def get(self,
url: str,
headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256,
signer: Signer | None = None,
algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False,
follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP GET request
:param url: Resource to fetch
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request(
"GET", url, None, headers, signer, algorithm, stream, follow_redirects
@ -117,9 +182,18 @@ class Client:
async def head(self,
url: str,
headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256,
signer: Signer | None = None,
algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP HEAD request
:param url: Resource to fetch
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request(
"HEAD", url, None, headers, signer, algorithm, False, follow_redirects
@ -128,23 +202,34 @@ class Client:
async def post(self,
url: str,
body: aputils.JsonBase,
body: JsonBase,
headers: dict[str, str] | None = None,
signer: aputils.Signer | None = None,
algorithm: aputils.AlgorithmType | str = aputils.AlgorithmType.RSASHA256,
signer: Signer | None = None,
algorithm: AlgorithmType | str = AlgorithmType.RSASHA256,
stream: bool = False,
follow_redirects: bool = False) -> httpx.Response:
"""
Send an HTTP POST request
:param url: Resource to fetch
:param body: Data to send as the body
:param headers: Headers to add to the request
:param signer: Actor to sign the headers with
:param algorithm: Signature standard to use when signing headers
:param stream: Whether or not to keep the conection open after the request is finished
:param follow_redirects: Send a new request to the location in any 3xx status responses
"""
return await self.request(
"POST", url, body, headers, signer, algorithm, stream, follow_redirects
)
@aputils.register_signer(httpx.Request)
@register_signer(httpx.Request)
def handle_httpx_sign(
signer: aputils.Signer,
signer: Signer,
request: httpx.Request,
algorithm: aputils.AlgorithmType) -> httpx.Request:
algorithm: AlgorithmType) -> httpx.Request:
headers = signer.sign_headers(
request.method,

View file

@ -1,89 +1,23 @@
import re
from aputils import IntEnum, StrEnum
from blib import Enum, StrEnum
CAPITAL = re.compile("[A-Z][^A-Z]")
class PridePalette(tuple[str, ...], Enum):
LGBT = ("#9400D3", "#4B0082", "#0000FF", "#00FF00", "#FFFF00", "#FF7F00", "#FF0000")
LESBIAN = ("#D52D00", "#EF7627", "#FF9A56", "#FFFFFF", "#D162A4", "#B55690", "#A30262")
BI = ("#D60270", "#9B4F96", "#0038A8")
GAY = ("#078D70", "#26CEAA", "#98E8C1", "#FFFFFF", "#7BADE2", "#5049CC", "#3D1A78")
PANSEXUAL = ("#FF218C", "#FFD800", "#21B1FF")
ASEXUAL = ("#000000", "#A3A3A3", "#FFFFFF", "#800080")
AROMANTIC = ("#3DA542", "#A7D379", "#FFFFFF", "#A9A9A9", "#000000")
TRANS = ("#55CDFC", "#F7A8B8", "#FFFFFF")
TRANS_BLACK = ("#55CDFC", "#F7A8B8", "#000000")
TRANSMASC = ("#FF8ABD", "#CDF5FE", "#9AEBFF", "#74DFFF")
NONBINARY = ("#FCF434", "#FFFFFF", "#9C59D1", "#2C2C2C")
class HttpStatus(IntEnum):
# 1xx
Continue = 100
SwitchingProtocols = 101
Processing = 102
# 2xx
Ok = 200
Created = 201
Accepted = 202
NonauthritativeInformation = 203
NoContent = 204
ResetContent = 205
PartialContent = 206
MultiStatus = 207
AlreadyReported = 208
ImUsed = 226
# 3xx
MultipleChoices = 300
MovedPermanently = 301
Found = 302
SeeOther = 303
NotModified = 304
UseProxy = 305
TemporaryRedirect = 307
PermanentRedirect = 308
# 4xx
BadRequest = 400
Unauthorized = 401
PaymentRequired = 402
Forbidden = 403
NotFound = 404
MethodNotAllowed = 405
NotAcceptable = 406
ProxyAuthenticationRequired = 407
RequestTimeout = 408
Conflict = 409
Gone = 410
LengthRequired = 411
PreconditionFailed = 412
PayloadTooLarge = 413
RequestUriTooLarge = 414
UnsupportedMediaType = 415
RequestRangeNotSatisfiable = 416
ExpectationFailed = 417
ImATeapot = 418
EnhanceYourCalm = 420
MisdirectedRequest = 421
UnprocessableEntity = 422
Locked = 423
FailedDependency = 424
UpgradeRequired = 426
PreconditionRequired = 428
TooManyRequests = 429
RequestHeaderFieldsTooLarge = 431
ConnectionClosedWithoutResponse = 444
UnavailableForLegalReasons = 451
ClientClosedRequest = 499
InternalServerError = 500
NotImplemented = 501
BadGateway = 502
ServiceUnavailable = 503
GatewayTimeout = 504
HttpVersionNotSupported = 505
VariantAlsoNegotiates = 506
InsufficientStorage = 507
LoopDetected = 508
NotExtended = 512
NetworkAuthenticationRequired = 511
NetworkConnectTimeoutError = 599
@property
def reason(self) -> str:
return " ".join(CAPITAL.findall(self.name))
# aliases
ACE = ASEXUAL
ARO = AROMANTIC
ENBY = NONBINARY
class SassOutputStyle(StrEnum):

View file

@ -1,89 +0,0 @@
from typing import Any, Self
from .enums import HttpStatus
class HttpError(Exception):
def __init__(self,
status: HttpStatus | int,
message: str,
headers: dict[str, str] | None = None) -> None:
Exception.__init__(self, f'HTTP ERROR {status}: {message[:100]}')
self.status: HttpStatus = HttpStatus.parse(status)
self.message: str = message
self.headers: dict[str, str] = headers or {}
@classmethod
def MOVED_PERMANENTLY(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(301, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def FOUND(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(302, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def TEMPORARY_REDIRECT(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(307, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def PERMANENT_REDIRECT(cls: type[Self], location: str, **kwargs: Any) -> Self:
err = cls(308, f'Resource moved to <a href="{location}">{location}</a>', **kwargs)
err.headers["Location"] = location
return err
@classmethod
def BAD_REQUEST(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(400, message, **kwargs)
@classmethod
def UNAUTHORIZED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(401, message, **kwargs)
@classmethod
def FORBIDDEN(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(403, message, **kwargs)
@classmethod
def NOT_FOUND(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(404, message, **kwargs)
@classmethod
def METHOD_NOT_ALLOWED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(405, message, **kwargs)
@classmethod
def GONE(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(410, message, **kwargs)
@classmethod
def SERVER_ERROR(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(500, message, **kwargs)
@classmethod
def NOT_IMPLEMENTED(cls: type[Self], message: str, **kwargs: Any) -> Self:
return cls(501, message, **kwargs)
@property
def reason(self) -> str:
return self.status.reason

View file

@ -1,7 +1,4 @@
import asyncio
import json
from aputils import HttpDate, JsonBase
from aputils import HttpDate
from colorsys import rgb_to_hls, hls_to_rgb
from datetime import datetime, timedelta
from functools import cached_property
@ -10,125 +7,59 @@ from typing import Any, Literal, Self
from collections.abc import (
Awaitable,
Callable,
Iterable,
Iterator,
ItemsView,
KeysView,
Mapping,
MutableMapping,
Sequence,
ValuesView
)
from .enums import PridePalette
ReaderFunction = Callable[..., Awaitable[dict[str, Any]]]
WriterFunction = Callable[[dict[str, Any]], Awaitable[None]]
PRIDE_COLORS = {
"lgbt": ["#9400D3", "#4B0082", "#0000FF", "#00FF00", "#FFFF00", "#FF7F00", "#FF0000"],
"lesbian": ["#D52D00", "#EF7627", "#FF9A56", "#FFFFFF", "#D162A4", "#B55690", "#A30262"],
"bi": ["#D60270", "#9B4F96", "#0038A8"],
"gay": ["#078D70", "#26CEAA", "#98E8C1", "#FFFFFF", "#7BADE2", "#5049CC", "#3D1A78"],
"pan": ["#FF218C", "#FFD800", "#21B1FF"],
"asexual": ["#000000", "#A3A3A3", "#FFFFFF", "#800080"],
"aromantic": ["#3DA542", "#A7D379", "#FFFFFF", "#A9A9A9", "#000000"],
"trans": ["#55CDFC", "#F7A8B8", "#FFFFFF"],
"trans-black": ["#55CDFC", "#F7A8B8", "#000000"],
"transmasc": ["#FF8ABD", "#CDF5FE", "#9AEBFF", "#74DFFF"],
"non-binary": ["#FCF434", "#FFFFFF", "#9C59D1", "#2C2C2C"]
}
def convert_to_bytes(value: Any, encoding: str = "utf-8") -> bytes:
"""
Convert an object to :class:`bytes`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a string or gets converted to
one in the process
:raises TypeError: If the object cannot be converted
"""
if isinstance(value, bytes):
return value
try:
return convert_to_string(value).encode(encoding)
except TypeError:
raise TypeError(f"Cannot convert '{type(value).__name__}' into bytes") from None
def convert_to_string(value: Any, encoding: str = 'utf-8') -> str:
"""
Convert an object to :class:`str`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a :class:`bytes` object
"""
if value is None:
return ''
if isinstance(value, bytes):
return value.decode(encoding)
if isinstance(value, bool):
return str(value)
if isinstance(value, str):
return value
if isinstance(value, JsonBase):
return value.to_json()
if isinstance(value, (dict, list, tuple, set)):
return json.dumps(value)
if isinstance(value, (int, float)):
return str(value)
raise TypeError(f'Cannot convert "{type(value).__name__}" into a string') from None
def is_loop_running() -> bool:
"Check if an event loop is running in the current thread"
try:
loop = asyncio.get_event_loop()
if not loop:
return False
return loop.is_running()
except Exception:
return False
class Color(str):
"Represents an HTML color value"
def __new__(cls, color: str) -> Self:
if not isinstance(color, str):
raise TypeError("Color must be a valid hex string")
"""
Create a new ``Color`` object
if not color.startswith("#"):
color = f"#{str(color)}"
:param color: Hex color string of 3 or 6 characters (``#`` character optional)
"""
if len(color) == 4:
color = f"#{color[1]*2}{color[2]*2}{color[3]*2}"
color = color.lstrip("#")
elif len(color) != 7:
if len(color) == 3:
color = f"{color[1]*2}{color[2]*2}{color[3]*2}"
elif len(color) != 6:
raise TypeError("Color must be 3 or 6 character hex string")
return str.__new__(cls, color)
return str.__new__(cls, "#" + color)
def __repr__(self) -> str:
return f"Color({self})"
return f"Color('{self}')"
@classmethod
def from_rgb(cls: type[Self], red: int, green: int, blue: int) -> Self:
"""
Create a new ``Color`` object from red, green, and blue values. The values must be
between ``0`` and ``255``
:param red: Red color value
:param green: Green color value
:param blue: Blue color value
"""
values = [red, green, blue]
for value in values:
@ -140,6 +71,10 @@ class Color(str):
@classmethod
def from_hsl(cls: type[Self], hue: int, saturation: int, luminance: int) -> Self:
"""
Create a new ``Color`` object from hue, saturation, and luminance values
"""
hsl_values: list[int | float] = [hue, saturation, luminance]
for idx, value in enumerate(hsl_values):
@ -155,23 +90,14 @@ class Color(str):
@classmethod
def new_pride_palette(cls: type[Self], flag: str) -> Iterable[Self]:
"Returns a `tuple` of `Color` objects which represents a pride flag color palette"
def new_pride_palette(cls: type[Self], flag: str) -> tuple[Self, ...]:
"Returns multiple ``Color`` objects which represents a pride flag color palette"
if flag == "ace":
flag = "asexual"
elif flag == "aro":
flag = "aromantic"
elif flag in ["enby", "nonbinary"]:
flag = "non-binary"
return tuple(cls(color) for color in PRIDE_COLORS[flag])
return tuple(cls(color) for color in PridePalette.parse(flag.replace("-", "")))
@staticmethod
def parse_multi(multiplier: int) -> float:
def _parse_multi(multiplier: int) -> float:
if multiplier >= 100:
return 1
@ -183,11 +109,15 @@ class Color(str):
@cached_property
def rgb(self) -> tuple[int, int, int]:
"Get the color as a tuple of red, green, and blue levels"
return (self.red, self.green, self.blue)
@cached_property
def hsl(self) -> tuple[int, int, int]:
"Get the color as a tuple of hue, saturation, and luminance levels"
rgb = [value / 255 for value in self.rgb]
values = [int(value * 255) for value in rgb_to_hls(*rgb)]
return (values[0], values[2], values[1])
@ -195,48 +125,70 @@ class Color(str):
@cached_property
def red(self) -> int:
"Get the red color level"
return int(self[1:3], 16)
@cached_property
def green(self) -> int:
"Get the green color level"
return int(self[3:5], 16)
@cached_property
def blue(self) -> int:
"Get the blue color level"
return int(self[5:7], 16)
@cached_property
def hue(self) -> int:
"Get the hue level"
return self.hsl[0]
@cached_property
def saturation(self) -> int:
"Get the saturation level"
return self.hsl[1]
@cached_property
def lightness(self) -> int:
def luminance(self) -> int:
"Get the luminance level"
return self.hsl[2]
def alter(self, action: str, multiplier: int) -> Self:
def alter(self,
action: Literal["lighten", "darken", "saturate", "desaturate"],
multiplier: int) -> Self:
"""
Change the lightness or saturation of the color
:param action: Modification action to take on the color
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
hue, saturation, luminance = self.hsl
if action == "lighten":
luminance += int((255 - luminance) * Color.parse_multi(multiplier))
luminance += int((255 - luminance) * Color._parse_multi(multiplier))
elif action == "darken":
luminance -= int(luminance * Color.parse_multi(multiplier))
luminance -= int(luminance * Color._parse_multi(multiplier))
elif action == "saturate":
saturation += int((255 - saturation) * Color.parse_multi(multiplier))
saturation += int((255 - saturation) * Color._parse_multi(multiplier))
elif action == "desaturate":
saturation -= int(saturation * Color.parse_multi(multiplier))
saturation -= int(saturation * Color._parse_multi(multiplier))
else:
raise ValueError(f"Invalid action: {action}")
@ -245,22 +197,57 @@ class Color(str):
def lighten(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('lighten', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("lighten", multiplier)
def darken(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('darken', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("darken", multiplier)
def saturate(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('saturate', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("saturate", multiplier)
def desaturate(self, multiplier: int) -> Self:
"""
Alias of ``Color.alter('desaturate', multiplier)``
:param multiplier: Amount to multiply by for the effect. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
return self.alter("desaturate", multiplier)
def rgba(self, opacity: int) -> str:
"""
Return the color as a CSS ``rgba`` value
:param opacity: Opacity level to apply to the color. Any value outside
0 - 100 will be changed to the nearest valid value.
"""
if 0 < opacity > 100:
raise ValueError("Opacity must anywhere from 0 to 100")
@ -270,6 +257,9 @@ class Color(str):
class Cookie:
"Represents an HTTP cookie"
def __init__(self,
key: str,
value: str | None,
@ -279,28 +269,65 @@ class Cookie:
domain: str | None = None,
secure: bool = False,
http_only: bool = False,
same_site: Literal["lax", "strict", "none"] | None = "lax") -> None:
same_site: Literal["lax", "strict", "none"] | None = None) -> None:
"""
Create a new cookie
:param key: Name of the cookie
:param value: Value of the cookie
:param max_age: Maximum amount of time for the cookie to be valid
:param expires: Date the cookie expires
:param path: Path base the cookie should be used on
:param domain: Domain the cookie should be used on
:param secure: Ensure the cookie is only used with ``HTTPS`` connections
:param http_only: Prevent on-page javascript from accessing the cookie
:param same_site: Make sure the cookie stays on the same website
"""
if isinstance(max_age, int):
max_age = timedelta(seconds = min(0, max_age))
self.key: str = key
"Name of the cookie"
self.value: str | None = value or None
"Value of the cookie"
self.max_age: timedelta | None = max_age
"Maximum amount of time for the cookie to be valid"
self.expires: HttpDate | None = HttpDate.parse(expires) if expires is not None else None
"Date the cookie expires"
self.path: str = path
"Path base the cookie should be used on"
self.domain: str | None = domain
self.same_site: Literal["lax", "strict", "none"] = same_site or "none"
"Domain the cookie should be used on"
self.secure: bool = secure
"Ensure the cookie is only used with ``HTTPS`` connections"
self.http_only: bool = False
"Prevent on-page javascript from accessing the cookie"
self.same_site: Literal["lax", "strict", "none"] | None = same_site
"Make sure the cookie stays on the same website"
@classmethod
def from_dict(cls: type[Self], data: dict[str, Any]) -> Self:
same_site = data.get("same_site", data.get("samesite", "none")).lower()
"""
Create a new cookie from a :class:`dict`. The keys can be HTTP cookie keys or
``Cookie`` property names.
if same_site not in {"lax", "strict", "none"}:
raise ValueError(f"same_site must be 'lax', 'strict', or 'none', not {same_site}")
:param data: Dictionary of cookie properties
"""
same_site = data.get("same_site", data.get("samesite", None))
if same_site not in {"lax", "strict", "none", None}:
raise ValueError(f"same_site must be 'lax', 'strict', 'none' or None, not {same_site}")
max_age = data.get("max_age", data.get("maxage"))
@ -313,12 +340,18 @@ class Cookie:
data.get("domain"),
data.get("secure", False),
data.get("http_only", data.get("httponly", False)),
same_site
same_site.lower() if isinstance(same_site, str) else None # type: ignore[arg-type]
)
@classmethod
def parse(cls: type[Self], raw_data: str) -> Self:
"""
Read a raw cookie string and create a new ``Cookie`` object
:param raw_data: Raw cookie data
"""
lines = tuple(line.strip() for line in raw_data.split(";"))
ckey, _, cvalue = lines[0].partition("=")
@ -338,14 +371,20 @@ class Cookie:
def copy(self) -> Self:
"Create a copy of the cookie"
return type(self).from_dict(self.to_dict())
def set_deleted(self) -> None:
"Set the expires property to a date in the past"
self.expires = HttpDate.parse("Thu, 01 Jan 1970 00:00:00 GMT")
def to_dict(self) -> dict[str, Any]:
"Return the cookie as a dict"
keys = (
"key", "value", "max_age", "expires", "path",
"domain", "same_site", "secure", "http_only"
@ -355,12 +394,16 @@ class Cookie:
def to_string(self) -> str:
"Convert the cookie to a raw cookie string"
values = [
f"{self.key}={self.value or ''}",
f"path={self.path}",
f"samesite={self.same_site}"
f"path={self.path}"
]
if self.same_site is not None:
values.append(f"samesite={self.same_site}")
if self.max_age is not None:
values.append(f"maxage={self.max_age.seconds}")
@ -380,7 +423,16 @@ class Cookie:
class StateProxy(MutableMapping[str, Any]):
"Proxy object for the ASGI state value"
def __init__(self, state: dict[str, str]) -> None:
"""
Create a new state proxy object
:param state: A dict to proxy (usually the ASGI state)
"""
self._state: dict[str, str] = state
@ -430,29 +482,55 @@ class StateProxy(MutableMapping[str, Any]):
def get(self, key: str, default: Any = None) -> Any:
"""
Get a value or return the default if it does not exist
:param key: Name of the value to get
:param default: Value to return if the key does not exist
"""
return self._state.get(key, default)
def items(self) -> ItemsView[str, Any]:
"Return all state items as key/value pairs"
return self._state.items()
def keys(self) -> KeysView[str]:
"Return all keys in the state"
return self._state.keys()
def set(self, key: str, value: Any) -> None:
"""
Set a state value
:param key: Name of the value to set
:param value: Data to set
"""
self._state[key] = value
def values(self) -> ValuesView[Any]:
"Return all values in the state"
return self._state.values()
class Stream:
"Contains the reader and writer functions. Can be used as an iterator for fetching data chunks."
def __init__(self, reader: ReaderFunction, writer: WriterFunction) -> None:
self.reader = reader
self.writer = writer
self.reader: ReaderFunction = reader
"Function for reading data from a client"
self.writer: WriterFunction = writer
"Function for writing data to a client"
self._sent_headers: bool = False
self._sent_body: bool = False
self.__next: bool = True
@ -474,10 +552,14 @@ class Stream:
async def close(self) -> None:
"Disconnect from the client"
await self.writer({"type": "http.disconnect"})
async def read(self) -> bytes:
"Read the whole request body"
body = b""
while True:
@ -491,27 +573,66 @@ class Stream:
async def read_chunk(self) -> tuple[bytes, bool]:
"Read a chunk of data from the request and return a tuple of (``body chunk``, ``more body``)"
data = await self.reader()
return data.get("body", b""), data.get("more_body", False)
async def write_headers(self, status: int, headers: Sequence[tuple[bytes, bytes]]) -> None:
async def write_headers(self,
status: int,
headers: Mapping[str, str],
cookies: Sequence[Cookie] | None = None) -> None:
"""
Write the response headers. If this was already called, nothing happens.
:param status: HTTP status to respond with
:param headers: A sequence of HTTP header values to be sent with the response
"""
if self._sent_headers:
return
raw_headers: list[tuple[bytes, bytes]] = []
for key, value in headers.items():
if not isinstance(value, str):
# this will be a `logging.warning` call in the future
print(f"not string: {key} {type(value)} {value}") # type: ignore[unreachable]
continue
raw_headers.append((key.encode("utf-8"), value.encode("utf-8")))
for cookie in (cookies or []):
raw_headers.append((b"Set-Cookie", cookie.to_string().encode("utf-8")))
await self.writer({
"type": "http.response.start",
"status": status,
"headers": headers
"headers": raw_headers
})
self._sent_headers = True
async def write_body(self, data: bytes, eof: bool = True) -> None:
"""
Send body data to the client. If an eof was already sent, this does nothing.
:param data: Raw data to send to the client
:param eof: Send an ``EOF`` to indicate the response is done.
"""
if not self._sent_headers:
raise RuntimeError("Response headers have not been sent yet")
async def write_body(self, data: bytes, eof: bool = False) -> None:
if self._sent_body:
return
await self.writer({
"type": "http.response.body",
"body": data,
"more_body": eof
"more_body": not eof
})
self._sent_body = eof

View file

@ -10,10 +10,10 @@ from multipart.multipart import Field, File, create_form_parser
from typing import Any, Literal, TypedDict, TypeVar
from urllib.parse import parse_qsl
from .misc import Cookie, Stream
from .misc import Cookie, StateProxy, Stream
if typing.TYPE_CHECKING:
from .application import Application, R, RT, AT
from .application import Application
T = TypeVar("T", bound = JsonBase)
@ -42,18 +42,69 @@ class ScopeType(TypedDict):
class Request:
"Represents an incoming client request"
# I'm not sure how to "forward" generics without needing to specify then on every request,
# so it'll just be static for now until I can figure it out
app: Application[Request, StateProxy, StateProxy]
"Application the request is associated with"
stream: Stream
"Stream object associated with the request"
method: str
"HTTP method set by the client"
path: str
"Requested HTTP path"
query: CIMultiDictProxy[str]
"Extra data pairs at the end of a request path"
headers: CIMultiDictProxy[str]
"Headers set by the client"
params: dict[str, Any]
"Path parameters for the route if they exist"
remote_raw: str | None
"The remote IP of the client"
local: str | None
"Address of the server"
cookies: dict[str, Cookie]
"Cookies sent by the client"
state: StateProxy
"Data to pass with the request"
extensions: dict[str, Any]
"ASGI extensions"
asgi: tuple[str, str]
"ASGI version and spec version"
version: float
"HTTP version of the request"
scheme: str
"Protocol of the request"
def __init__(self,
app: Application[R, RT, AT],
app: Application[Request, StateProxy, StateProxy],
scope: ScopeType,
stream: Stream) -> None:
self.app: Application[R, RT, AT] = app
self.stream: Stream = stream
self._body: bytes = b""
raw_query = parse_qsl(scope["query_string"].decode("utf-8"), keep_blank_values = True)
raw_headers = ((k.decode("utf-8").title(), v.decode("utf-8")) for k, v in scope["headers"])
self.app: Application[Request, StateProxy, StateProxy] = app
self.stream: Stream = stream
self.method: str = scope["method"].upper()
self.path: str = scope["path"]
self.query: CIMultiDictProxy[str] = CIMultiDictProxy(CIMultiDict(raw_query))
@ -62,8 +113,7 @@ class Request:
self.remote_raw: str | None = (scope.get("client") or (None,))[0]
self.local: str | None = (scope.get("server") or (None, ))[0]
self.cookies: dict[str, Cookie] = {}
self.state: RT = app.request_state_class(scope.get("state") or {})
self.state: StateProxy = app.request_state_class(scope.get("state") or {})
self.extensions: dict[str, Any] = scope.get("extensions", {}) # type: ignore[assignment]
# keep?
@ -78,25 +128,35 @@ class Request:
@property
def content_length(self) -> int:
"Get the ``Content-Length`` header"
return int(self.headers.get("Content-Length", "0"))
@property
def content_type(self) -> str:
"Get the first value of the ``Content-Type`` header"
return self.headers.getone("Content-Type", "")
@property
def remote(self) -> str | None:
"Get the real IP address of the client"
return self.headers.get("X-Forwarded-For", self.headers.get("X-Real-Ip", self.remote_raw))
@property
def user_agent(self) -> str | None:
"Get the ``User-Agent`` header"
return self.headers.get("User-Agent")
async def body(self) -> bytes:
"Read the request body. It will be stored in memory."
if not self._body:
self._body = await self.stream.read()
@ -104,14 +164,24 @@ class Request:
async def text(self) -> str:
"Read the request body as a ``str``"
return (await self.body()).decode("utf-8")
async def json(self, parser_class: type[T] = JsonBase) -> T: # type: ignore[assignment]
"""
Read the request body as a JSON document
:param parser_class: Class to use when parsing the body
"""
return parser_class.parse(await self.body())
async def form(self) -> CIMultiDictProxy[str | File]:
"Parse form data from the request body"
if self.content_type not in {"multipart/form-data", "application/x-www-form-urlencoded"}:
raise ValueError(f"Invalid mimetype for form data: {self.content_type}")
@ -140,4 +210,4 @@ class Request:
async def stream_response(self, status: int, headers: dict[str, str]) -> None:
pass
raise NotImplementedError

View file

@ -3,6 +3,7 @@ from __future__ import annotations
import os
from aputils import HttpDate, JsonBase
from blib import HttpError, HttpStatus, convert_to_bytes
from collections.abc import Sequence
from datetime import datetime, timedelta
from mimetypes import guess_type
@ -12,9 +13,7 @@ from pathlib import Path
from typing import Any, Literal, Self
from urllib.parse import quote
from .enums import HttpStatus
from .error import HttpError
from .misc import Cookie, Stream, convert_to_bytes
from .misc import Cookie, Stream
from .request import Request
@ -36,7 +35,7 @@ class Response:
:param headers: Header items to include in the message
"""
self.cookies: dict[str, Cookie] = {}
self.cookies: list[Cookie] = []
"New cookies to be sent to the client"
self.status: HttpStatus = HttpStatus.parse(status)
@ -157,6 +156,14 @@ class Response:
self.headers.update({"Content-Type": value})
def get_cookie(self, key: str) -> Cookie:
for cookie in self.cookies:
if cookie.key == key:
return cookie
raise KeyError(key)
def set_cookie(self,
key: str,
value: str | None,
@ -169,15 +176,19 @@ class Response:
same_site: Literal["lax", "strict", "none"] | None = "lax") -> Cookie:
cookie = Cookie(key, value, max_age, expires, path, domain, secure, http_only, same_site)
self.cookies[cookie.key] = cookie
self.cookies.append(cookie)
return cookie
def delete_cookie(self, cookie: Cookie) -> Cookie:
deleted_cookie = cookie.copy()
deleted_cookie.set_deleted()
self.cookies[deleted_cookie.key] = deleted_cookie
return deleted_cookie
def delete_cookie(self, key: str) -> Cookie:
try:
cookie = self.get_cookie(key)
except KeyError:
cookie = self.set_cookie(key, "")
cookie.set_deleted()
return cookie
async def send(self, stream: Stream, request: Request) -> None:
@ -188,21 +199,8 @@ class Response:
:param request: Request associated with the response
"""
headers = []
for key, value in self.headers.items():
if not isinstance(value, str):
# this will be a `logging.warning` call in the future
print(f"not string: {key} {type(value)} {value}") # type: ignore[unreachable]
continue
headers.append((key.encode("utf-8"), value.encode("utf-8")))
for cookie in self.cookies.values():
headers.append((b"Set-Cookie", cookie.to_string().encode("utf-8")))
await stream.write_headers(self.status, headers)
await stream.write_body(self.body)
await stream.write_headers(self.status, self.headers, self.cookies)
await stream.write_body(self.body, True)
class FileResponse(Response):
@ -211,7 +209,7 @@ class FileResponse(Response):
def __init__(self,
path: Path | str,
mimetype: str | None = None,
chunk_size: int = 8192,
chunk_size: int = 0,
status: HttpStatus | int = HttpStatus.Ok,
headers: dict[str, str] | None = None) -> None:
"""
@ -236,7 +234,7 @@ class FileResponse(Response):
if isinstance(path, str):
path = Path(path)
self.chunk_size: int = chunk_size if chunk_size > 0 else 8192
self.chunk_size: int = chunk_size if chunk_size > 0 else 131_072
self.path: Path = path.expanduser().resolve()
if not self.path.exists():
@ -245,18 +243,14 @@ class FileResponse(Response):
if not self.path.is_file():
raise ValueError(f"Path is a directory: {self.path}")
self.length = self.path.stat().st_size
async def send(self, stream: Stream, request: Request) -> None:
await stream.write_headers(200, tuple([]))
await stream.write_headers(200, self.headers, self.cookies)
with self.path.open("rb") as fd:
while True:
if not (data := fd.read(self.chunk_size)):
break
await stream.write_body(data, True)
await stream.write_body(b"", False)
await stream.write_body(fd.read(), True)
class TemplateResponse(Response):
@ -268,7 +262,7 @@ class TemplateResponse(Response):
status: HttpStatus | int = HttpStatus.Ok,
mimetype: str | None = None,
headers: dict[str, Any] | None = None,
pretty_print: bool = False) -> None:
pprint: bool = False) -> None:
"""
Create a new ``TemplateResponse`` object
@ -283,12 +277,17 @@ class TemplateResponse(Response):
Response.__init__(self, status, b"", mimetype or self.detect_mimetype(name), headers)
self.name: str = name
self.pretty_print: bool = pretty_print
self.pprint: bool = pprint
self.context: dict[str, Any] = context or {}
def detect_mimetype(self, path: str) -> str:
"Return the mimetype of the template based on the file extension"
@staticmethod
def detect_mimetype(path: str) -> str:
"""
Return the mimetype of the template based on the file extension
:param path: Path to detect the mimetype of
"""
_, ext = os.path.splitext(path)
@ -306,7 +305,9 @@ class TemplateResponse(Response):
"request": request,
**self.context
}
text = request.app.template.render(self.name, context, self.pretty_print)
text = request.app.template.render(self.name, context, self.pprint)
self.body = text.encode("utf-8")
self.length = len(self.body)
await Response.send(self, stream, request)

View file

@ -1,10 +1,10 @@
import http_router
from collections.abc import Awaitable, Callable, Iterable
from blib import HttpError
from collections.abc import Awaitable, Callable
from http_router.routes import RouteMatch
from typing import Any, Pattern
from .error import HttpError
from .request import Request
from .response import Response
@ -55,23 +55,25 @@ class Router(http_router.Router):
def bind(self, # type: ignore[override]
target: RouteHandler,
*paths: str | Pattern[str],
methods: Iterable[str] | str | None = None) -> list[http_router.Route]:
method: str,
*paths: str | Pattern[str]) -> list[http_router.Route]:
"""
Add a route handler
:param target: Function to call on request
:param method: HTTP method the route should handle
:param paths: List of paths the route should handle
:param methods: List of methods the route should handle
"""
if isinstance(methods, list):
methods = [method.upper() for method in methods]
return http_router.Router.bind(self, target, *paths, methods = [method.upper()])
elif isinstance(methods, str):
methods = [methods.upper()]
return http_router.Router.bind(self, target, *paths, methods = methods)
def route(self, method: str, *paths: str) -> Callable[[RouteHandler], RouteHandler]: # type: ignore[override] # noqa: E501
def wrapper(handler: RouteHandler) -> RouteHandler:
self.bind(handler, method, *paths)
return handler
return wrapper
ROUTERS: dict[str, Router] = {
@ -120,7 +122,7 @@ def route(router: str, method: str, *paths: str) -> Callable[[RouteHandler], Rou
"""
def wrapper(func: RouteHandler) -> RouteHandler:
get_router(router).bind(func, *paths, methods = method)
get_router(router).bind(func, method, *paths)
return func
return wrapper

View file

@ -1,156 +0,0 @@
import asyncio
import inspect
import traceback
from collections.abc import Awaitable, Callable
from typing import Any, Self
from .misc import is_loop_running
SignalCallback = Callable[..., Awaitable[bool | None]]
MainSignalCallback = Callable[..., Awaitable[None]]
class Signal(list[SignalCallback]):
"Allows a series of callbacks to get called via async. Use as a decorator for the base function."
def __init__(self, timeout: float | int = 5.0):
"""
:param timeout: Time in seconds to wait before cancelling the callback
"""
self.timeout: float | int = timeout
self.callback: MainSignalCallback | None = None
self.object: Any = None
def __get__(self, obj: Any, objtype: type[Any]) -> Self:
if obj and not self.object:
self.object = obj
return self
def __call__(self, callback: MainSignalCallback) -> Self:
if not inspect.iscoroutinefunction(callback):
raise RuntimeError(f"Not a coroutine: {callback.__name__}")
self.callback = callback
self.__doc__ = callback.__doc__
self.__annotations__ = callback.__annotations__
return self
def append(self, value: SignalCallback) -> None:
"""
Add a callback
:param value: Callback
"""
self.connect(value)
def remove(self, value: SignalCallback) -> None:
"""
Remove a callback
:param value: Callback
"""
self.disconnect(value)
def emit(self, *args: Any, **kwargs: Any) -> None:
"""
Call all of the callbacks in the order they were added as well as the associated
function.
If any callback returns `True`, all other callbacks get skipped.
:param args: Positional arguments to pass to all of the callbacks
:param kwargs: Keyword arguments to pass to all of the callbacks
"""
if not is_loop_running():
raise RuntimeError("Event loop is not running")
asyncio.create_task(self.handle_emit(*args, **kwargs))
def connect(self, callback: SignalCallback) -> SignalCallback:
"""
Add a function to the list of callbacks. Can be used as a decorator.
:param callback: A callable or coroutine
"""
if callback not in self:
list.append(self, callback)
return callback
def disconnect(self, callback: SignalCallback) -> None:
"""
Remove a function from the list of callbacks
:param callback: A callable or coroutine
"""
if not self.callback:
# oh boy something really goofed
return
try:
list.remove(self, callback)
except ValueError:
cbname = callback.__name__
signame = self.callback.__name__
print(f"WARNING: '{cbname}' was not connted to signal '{signame}'")
async def handle_emit(self, *args: Any, **kwargs: Any) -> None:
"""
This gets called by :meth:`Signal.emit` as an :class:`asyncio.Task`.
:param args: Positional arguments to pass to all of the callbacks
:param kwargs: Keyword arguments to pass to all of the callbacks
"""
if not self.callback:
# oh boy something really goofed
return
for callback in self:
if await self.handle_callback(callback, *args, **kwargs):
break
await self.handle_callback(self.callback, *args, **kwargs)
async def handle_callback(self,
callback: Callable[..., Awaitable[bool | None]],
*args: Any,
**kwargs: Any) -> bool | None:
try:
# asyncio.timeout and asyncio.wait_for complain, so need to find a new way
# async with asyncio.timeout(self.timeout):
if args and args[0] == self.object:
return await callback(*args, **kwargs)
else:
return await callback(self.object, *args, **kwargs)
except TimeoutError:
print(f"Callback '{callback.__name__}' timed out")
return True
except Exception:
traceback.print_exc()
return True

View file

@ -48,7 +48,7 @@ exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
intersphinx_mapping = {
"python": (f"https://docs.python.org/{pyversion}", None),
"aputils": ("https://docs.barkshark.xyz/aputils", None),
"bsql": ("https://docs.barkshark.xyz/barkshark-sql", None),
"blib": ("https://docs.barkshark.xyz/blib", None),
"gemi": ("https://docs.barkshark.xyz/gemi", None),
"hamlish-jinja": ("https://docs.barkshark.xyz/hamlish-jinja", None),
"jinja2": ("https://jinja.palletsprojects.com/en/3.1.x/", None)

View file

@ -1,14 +0,0 @@
Application
===========
.. autoclass:: basgi.Application
:members:
:show-inheritance:
.. autoclass:: basgi.Signal
:members:
:show-inheritance:
.. autoclass:: basgi.Stream
:members:
:show-inheritance:

View file

@ -1,7 +1,7 @@
Enums
=====
.. autoclass:: basgi.HttpStatus
.. autoclass:: basgi.PridePalette
:members:
:show-inheritance:
:undoc-members:

View file

@ -1,6 +0,0 @@
Exceptions
==========
.. autoclass:: basgi.HttpError
:members:
:show-inheritance:

View file

@ -5,50 +5,45 @@ API
Application
-----------
:class:`basgi.Application`
:class:`basgi.Signal`
:class:`basgi.Stream`
.. autoclass:: basgi.Application
:members:
:show-inheritance:
HTTP Messages
-------------
:class:`basgi.Request`
.. autoclass:: basgi.Request
:members:
:show-inheritance:
:exclude-members: __init__
:class:`basgi.Response`
.. autoclass:: basgi.Response
:members:
:show-inheritance:
:class:`basgi.FileResponse`
.. autoclass:: basgi.FileResponse
:members:
:show-inheritance:
:class:`basgi.TemplateResponse`
.. autoclass:: basgi.TemplateResponse
:members:
:show-inheritance:
Application Runners
-------------------
:class:`basgi.GranianRunner`
.. autoclass:: basgi.Runner
:members:
:show-inheritance:
:class:`basgi.UvicornRunner`
.. autoclass:: basgi.GranianRunner
:members:
:show-inheritance:
:exclude-members: setup_module
Templates
---------
:class:`basgi.Template`
:class:`basgi.SassExtension`
Enums
-----
:class:`basgi.HttpStatus`
:class:`basgi.SassOutputStyle`
Exceptions
----------
:class:`basgi.HttpError`
.. autoclass:: basgi.UvicornRunner
:members:
:show-inheritance:
:exclude-members: setup_module

View file

@ -1,19 +0,0 @@
Messages
========
.. autoclass:: basgi.Request
:members:
:show-inheritance:
.. autoclass:: basgi.Response
:members:
:show-inheritance:
.. autoclass:: basgi.FileResponse
:members:
:show-inheritance:
.. autoclass:: basgi.TemplateResponse
:members:
:show-inheritance:

36
docs/src/api/misc.rst Normal file
View file

@ -0,0 +1,36 @@
Misc
====
.. autoclass:: basgi.Client
:members:
:show-inheritance:
.. autoclass:: basgi.Color
:members:
:show-inheritance:
.. autoclass:: basgi.Cookie
:members:
:show-inheritance:
.. autoclass:: basgi.SassExtension
:members:
:show-inheritance:
:exclude-members: __init__
.. autoclass:: basgi.StateProxy
:members:
:show-inheritance:
.. autoclass:: basgi.StaticHandler
:members:
:show-inheritance:
.. autoclass:: basgi.Stream
:members:
:show-inheritance:
:exclude-members: __init__
.. autoclass:: basgi.Template
:members:
:show-inheritance:

View file

@ -1,16 +0,0 @@
App Runners
===========
.. autoclass:: basgi.Runner
:members:
:show-inheritance:
.. autoclass:: basgi.GranianRunner
:members:
:show-inheritance:
:exclude-members: setup_module
.. autoclass:: basgi.UvicornRunner
:members:
:show-inheritance:
:exclude-members: setup_module

View file

@ -1,4 +0,0 @@
Signal
======

View file

@ -1,2 +1,22 @@
Usage
=====
Application
-----------
:class:`basgi.Application` is the base for a web server. Be sure to set a unique name.
.. code-block:: python
import basgi
@basgi.get("xyz.barkshark.BASGI", "/")
async def handle_home(request: basgi.Request) -> basgi.Response:
return basgi.Response(200, "Merp!")
app = basgi.Application("xyz.barkshark.BASGI")
@app.route("GET", "/about")
async def handle_about(request: basgi.Request) -> basgi.Response:
return basgi.Response(200, "")

View file

@ -5,13 +5,9 @@ subtrees:
- file: src/api/index.rst
subtrees:
- entries:
- file: src/api/app.rst
- file: src/api/runners.rst
- file: src/api/messages.rst
- file: src/api/router.rst
- file: src/api/template.rst
- file: src/api/misc.rst
- file: src/api/enums.rst
- file: src/api/exceptions.rst
- url: https://git.barkshark.xyz/barkshark/basgi
title: Git Repo
- url: https://barkshark.xyz/@izalia

View file

@ -36,6 +36,7 @@ classifiers = [
requires-python = ">= 3.11"
dependencies = [
"activitypub-utils == 0.2.3",
"barkshark-lib == 0.1.2",
"gemi-python == 0.1.1",
"http-router == 4.1.2",
"httpx[http2] == 0.27.0",
@ -98,6 +99,5 @@ disallow_untyped_decorators = true
warn_redundant_casts = true
warn_unreachable = true
warn_unused_ignores = true
follow_imports = "silent"
strict = true
implicit_reexport = true