add barkshark-lib

This commit is contained in:
Izalia Mae 2024-04-22 23:38:33 -04:00
parent 3d509a8dce
commit 7eba8ba68d
14 changed files with 167 additions and 566 deletions

200
dev.py
View file

@ -1,57 +1,104 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import platform import platform
import shlex import shlex
import shutil import shutil
import subprocess import subprocess
import sys import sys
import time import time
import tomllib
from datetime import datetime from collections.abc import Callable
from gemi import __version__ from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from shutil import rmtree
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import TypedDict
try: try:
import click import watchfiles
from watchdog.observers import Observer from click import echo, group, option
from watchdog.events import PatternMatchingEventHandler
except ImportError: except ImportError:
CMD = f"{sys.executable} -m pip install watchdog click" print("Installing missing dependencies...")
PROC = subprocess.run(shlex.split(CMD), check = False) deps = " ".join(["build", "click", "watchfiles"])
subprocess.run(shlex.split(f"{sys.executable} -m pip install {deps}"))
if PROC.returncode != 0: print("Restarting script...")
sys.exit() subprocess.run([sys.executable, *sys.argv])
sys.exit()
print("Successfully installed click and watchdog")
import click
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
REPO = Path(__file__).resolve().parent REPO = Path(__file__).resolve().parent
CLEAN_DIRS = ["build", "dist", "dist-pypi", "gemi_python.egg-info", "docs/_build"]
IGNORE_DIRS = ["build", "dist", "dist-pypi", "docs", ".git", "gemi_python.egg-info"]
IGNORE_PATHS = tuple(str(REPO.joinpath(path)) for path in IGNORE_DIRS)
@click.group("cli") class WatchfilesOptions(TypedDict):
def cli(): watch_filter: Callable[[watchfiles.Change, str], bool]
pass recursive: bool
ignore_permission_denied: bool
rust_timeout: int
@group("cli")
def cli() -> None:
...
@cli.command("clean") @cli.command("clean")
def cli_clean(): def cli_clean() -> None:
for directory in {"build", "dist", "dist-pypi", "gemi_python.egg-info"}: for directory in CLEAN_DIRS:
try: try:
shutil.rmtree(directory) rmtree(REPO.joinpath(directory))
except FileNotFoundError: except FileNotFoundError:
pass pass
echo("Cleaned up build files")
@cli.command("install")
def cli_install_deps() -> None:
with open("pyproject.toml", "rb") as fd:
pyproject = tomllib.load(fd)
dependencies = pyproject["project"]["dependencies"]
dependencies.extend(pyproject["project"]["optional-dependencies"]["dev"])
dependencies.extend(pyproject["project"]["optional-dependencies"]["docs"])
dependencies = list(dep.replace(" ", "") for dep in dependencies)
run_python("-m", "pip", "install", "-U", "pip", "setuptools", "wheel")
run_python("-m", "pip", "install", *dependencies)
echo("Installed dependencies :3")
@cli.command("lint")
@option("--path", "-p", type = Path, default = REPO.joinpath("gemi"))
@option("--watch", "-w", is_flag = True, help = "Watch for changes to the source")
def cli_lint(path: Path, watch: bool) -> None:
path = path.expanduser().resolve()
if watch:
script = str(Path(__file__).resolve())
handle_run_watcher(script, "lint", "--path", str(path))
return
echo("----- flake8 -----")
run_python("-m", "flake8", str(path))
echo("\n----- mypy -----")
run_python("-m", "mypy", str(path))
@cli.command("build") @cli.command("build")
def cli_build(): def cli_build_gemi():
with open("pyproject.toml", "rb") as fd:
pyproject = tomllib.load(fd)
__version__ = pyproject["project"]["version"]
with TemporaryDirectory() as tmp: with TemporaryDirectory() as tmp:
arch = "amd64" if sys.maxsize >= 2**32 else "i386" arch = "amd64" if sys.maxsize >= 2**32 else "i386"
bins = ( bins = (
@ -89,20 +136,6 @@ def cli_build():
specfile.unlink() specfile.unlink()
@cli.command("lint")
def cli_lint():
click.echo("--- flake8 ---")
subprocess.run(shlex.split(f"{sys.executable} -m flake8 gemi"))
click.echo("\n--- mypy ---")
subprocess.run(shlex.split(f"{sys.executable} -m mypy gemi"))
@cli.command("install")
def cli_install():
subprocess.run(shlex.split(f"{sys.executable} -m pip install -e .[dev,doc]"), check = False)
@cli.command("run") @cli.command("run")
def cli_run(): def cli_run():
print("Starting process watcher") print("Starting process watcher")
@ -127,60 +160,65 @@ def cli_run():
watcher.join() watcher.join()
class WatchHandler(PatternMatchingEventHandler): @cli.command("build-package")
patterns = ["*.py"] def cli_build_package() -> None:
cmd = [sys.executable, "-m", "gemi.server"] run_python("-m", " build", "--outdir", "dist-pypi")
def __init__(self): def run_python(*arguments: str) -> subprocess.CompletedProcess[bytes]:
PatternMatchingEventHandler.__init__(self) return subprocess.run([sys.executable, *arguments])
self.proc: subprocess.Popen | None = None
self.last_restart: datetime | None = None
def kill_proc(self): def handle_run_watcher(*command: str) -> None:
if not self.proc or self.proc.poll() is not None: asyncio.run(_handle_run_watcher(*command))
return
print(f"Terminating process {self.proc.pid}")
self.proc.terminate()
sec = 0.0
while self.proc.poll() is None: async def _handle_run_watcher(*command: str) -> None:
time.sleep(0.1) proc: subprocess.Popen[bytes] = subprocess.Popen([sys.executable, *command])
sec += 0.1 last_restart: datetime = datetime.now()
if sec >= 5: options: WatchfilesOptions = {
print("Failed to terminate. Killing process...") "watch_filter": lambda _, path: path.endswith(".py"),
self.proc.kill() "recursive": True,
"ignore_permission_denied": True,
"rust_timeout": 1000
}
async for changes in watchfiles.awatch(REPO, **options):
skip = False
for _, path in changes:
if path.startswith(IGNORE_PATHS):
skip = True
if skip:
continue
if datetime.now() - timedelta(seconds = 3) < last_restart:
continue
if proc.poll() is None:
echo(f"Terminating process {proc.pid}")
proc.terminate()
sec = 0.0
while proc.poll() is None:
time.sleep(0.1)
sec += 0.1
if sec < 5.0:
continue
echo("Failed to terminate. Killing process...")
proc.kill()
break break
print("Process terminated") echo("Process terminated")
proc = subprocess.Popen([sys.executable, *command])
def run_proc(self, restart=False): last_restart = datetime.now()
timestamp = datetime.timestamp(datetime.now()) echo(f"Started processes with PID: {proc.pid}")
self.last_restart = timestamp if not self.last_restart else 0
if restart:
if timestamp - 3 < self.last_restart:
return
self.kill_proc()
self.proc = subprocess.Popen(self.cmd, stdin = subprocess.PIPE)
self.last_restart = timestamp
print(f"Started process with PID {self.proc.pid}", self.proc.pid)
print("Command:", " ".join(self.cmd))
def on_any_event(self, event):
if event.event_type not in ["modified", "created", "deleted"]:
return
self.run_proc(restart = True)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,15 +1,15 @@
__software__ = "Gemi" __software__ = "Gemi"
__version__ = "0.1.1" __version__ = "0.1.2"
import mimetypes import mimetypes
mimetypes.add_type("text/gemini", ".gmi", strict = True) mimetypes.add_type("text/gemini", ".gmi", strict = True)
from .client import AsyncClient from .client import AsyncClient
from .enums import AppType, FileSizeUnit, OutputFormat, StatusCode, Enum, IntEnum, StrEnum from .enums import AppType, OutputFormat, StatusCode
from .error import BodyTooLargeError, GeminiError, ParsingError from .error import BodyTooLargeError, GeminiError, ParsingError
from .message import Message, Request, Response from .message import Message, Request, Response
from .misc import BaseApp, SslContext, Url, resolve_path
from .server import AsyncServer, Router, BaseRoute, Route, FileRoute, route from .server import AsyncServer, Router, BaseRoute, Route, FileRoute, route
from .transport import AsyncTransport
from .document import ( from .document import (
Document, Document,
@ -21,13 +21,3 @@ from .document import (
Quote, Quote,
Text Text
) )
from .misc import (
BaseApp,
FileSize,
SslContext,
Url,
convert_to_bytes,
convert_to_string,
resolve_path
)

View file

@ -1,10 +1,11 @@
import asyncio import asyncio
import sys import sys
from blib import AsyncTransport
from ..enums import AppType from ..enums import AppType
from ..message import Request, Response from ..message import Request, Response
from ..misc import BaseApp, Url from ..misc import BaseApp, Url
from ..transport import AsyncTransport
class AsyncClient(BaseApp): class AsyncClient(BaseApp):
@ -63,7 +64,7 @@ class AsyncClient(BaseApp):
return response return response
async def main(args: list, timeout: int) -> None: async def main(timeout: int) -> None:
client = AsyncClient() client = AsyncClient()
try: try:
@ -81,4 +82,4 @@ async def main(args: list, timeout: int) -> None:
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main(sys.argv, 5)) asyncio.run(main(5))

View file

@ -17,7 +17,7 @@ parser.add_argument("-t", "--timeout", type = int, default = 30)
parser.add_argument("-v", "--version", action = "store_true") parser.add_argument("-v", "--version", action = "store_true")
async def async_main(args: argparse.Namespace): async def async_main(args: argparse.Namespace) -> None:
client = gemi.AsyncClient(args.timeout) client = gemi.AsyncClient(args.timeout)
resp = await client.request(args.url) resp = await client.request(args.url)
@ -34,7 +34,7 @@ async def async_main(args: argparse.Namespace):
print(await resp.text()) print(await resp.text())
def main(): def main() -> None:
args = parser.parse_args() args = parser.parse_args()
if args.version: if args.version:

View file

@ -1,169 +1,19 @@
from __future__ import annotations from __future__ import annotations
import enum from blib import Enum, IntEnum
import typing
if typing.TYPE_CHECKING:
from typing import Any, Self
class Enum(enum.Enum):
"Base enum class for all other enums"
@classmethod
def from_index(cls: type[Self], index: int) -> Self:
return list(cls)[index]
@classmethod
def parse(cls: type[Self], data: Any) -> Self:
"""
Get an enum item by name or value
:param data: Name or value
:raises AttributeError: If an item could not be found
"""
if isinstance(data, cls):
return data
try:
return cls[data]
except KeyError:
pass
try:
return cls(data)
except ValueError:
pass
if isinstance(data, str):
for item in cls:
if issubclass(cls, StrEnum) and data.lower() == item.value.lower():
return item
if data.lower() == item.name.lower():
return item
raise AttributeError(f'Invalid enum property for {cls.__name__}: {data}')
class IntEnum(enum.IntEnum, Enum):
"Enum where items can be treated like an :class:`int`"
class StrEnum(str, Enum):
"Enum where items can be treated like a :class:`str`"
def __str__(self):
return self.value
class AppType(Enum): class AppType(Enum):
SERVER = enum.auto() SERVER = 0
CLIENT = enum.auto() CLIENT = 1
class FileSizeUnit(StrEnum):
"Unit identifier for various file sizes"
BYTE = 'B'
KIBIBYTE = 'KiB'
MEBIBYTE = 'MiB'
GIBIBYTE = 'GiB'
TEBIBYTE = 'TiB'
PEBIBYTE = 'PiB'
EXBIBYTE = 'EiB'
ZEBIBYTE = 'ZiB'
YOBIBYTE = 'YiB'
KILOBYTE = 'KB'
MEGABYTE = 'MB'
GIGABYTE = 'GB'
TERABYTE = 'TB'
PETABYTE = 'PB'
EXABYTE = 'EB'
ZETTABYTE = 'ZB'
YOTTABYTE = 'YB'
B = BYTE
K = KIBIBYTE
M = MEBIBYTE
G = GIBIBYTE
T = TEBIBYTE
P = PEBIBYTE
E = EXBIBYTE
Z = ZEBIBYTE
Y = YOBIBYTE
@property
def multiplier(self) -> int:
"Get the multiplier for the unit"
match str(self):
case "B":
return 1
case "KiB":
return 1024
case "MiB":
return 1024 ** 2
case "GiB":
return 1024 ** 3
case "TiB":
return 1024 ** 4
case "PiB":
return 1024 ** 5
case "EiB":
return 1024 ** 6
case "ZiB":
return 1024 ** 7
case "YiB":
return 1024 ** 8
case "KB":
return 1000
case "MB":
return 1000 ** 2
case "GB":
return 1000 ** 3
case "TB":
return 1000 ** 4
case "PB":
return 1000 ** 5
case "EB":
return 1000 ** 6
case "ZB":
return 1000 ** 7
case "YB":
return 1000 ** 8
# *shrugs*
return 69_420
def multiply(self, size: int | float) -> int | float:
"""
Multiply a file size to get the size in bytes
:param size: File size to be multiplied
"""
return self.multiplier * size
class OutputFormat(Enum): class OutputFormat(Enum):
"Text format to use when dumping a document" "Text format to use when dumping a document"
GEMTEXT = enum.auto() GEMTEXT = 0
HTML = enum.auto() HTML = 1
MARKDOWN = enum.auto() MARKDOWN = 2
class StatusCode(IntEnum): class StatusCode(IntEnum):

View file

@ -92,9 +92,9 @@ def set_level(level: LogLevel | str) -> None:
logger.setLevel(LogLevel.parse(level)) logger.setLevel(LogLevel.parse(level))
debug: Callable = logger.debug debug: Callable[..., Any] = logger.debug
verbose: Callable = logger.verbose verbose: Callable[..., Any] = logger.verbose
info: Callable = logger.info info: Callable[..., Any] = logger.info
warning: Callable = logger.warning warning: Callable[..., Any] = logger.warning
error: Callable = logger.error error: Callable[..., Any] = logger.error
critical: Callable = logger.critical critical: Callable[..., Any] = logger.critical

View file

@ -2,15 +2,17 @@ from __future__ import annotations
import typing import typing
from blib import FileSize
from .document import Document from .document import Document
from .enums import StatusCode from .enums import StatusCode
from .error import BodyTooLargeError, GeminiError from .error import BodyTooLargeError, GeminiError
from .misc import FileSize, Url from .misc import Url
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from blib import AsyncTransport
from typing import Self from typing import Self
from .server import AsyncServer from .server import AsyncServer
from .transport import AsyncTransport
class Message: class Message:
@ -26,11 +28,11 @@ class Message:
"Main part of the message" "Main part of the message"
async def __aenter__(self): async def __aenter__(self) -> Self:
return self return self
async def __aexit__(self, *_: None): async def __aexit__(self, *_: None) -> None:
try: try:
await self.transport.close() await self.transport.close()

View file

@ -1,19 +1,13 @@
from __future__ import annotations from __future__ import annotations
import json
import ssl import ssl
import typing import typing
from OpenSSL import crypto
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from .enums import AppType, FileSizeUnit from .enums import AppType
try:
from OpenSSL import crypto
except ImportError:
crypto = None # type: ignore
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from collections.abc import Sequence from collections.abc import Sequence
@ -22,55 +16,6 @@ if typing.TYPE_CHECKING:
from .server import AsyncServer from .server import AsyncServer
def convert_to_bytes(value: Any, encoding: str = "utf-8") -> bytes:
"""
Convert an object to :class:`bytes`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a string or gets converted to
one in the process
:raises TypeError: If the object cannot be converted
"""
if isinstance(value, bytes):
return value
try:
return convert_to_string(value).encode(encoding)
except TypeError:
raise TypeError(f"Cannot convert '{type(value).__name__}' into bytes") from None
def convert_to_string(value: Any, encoding: str = 'utf-8') -> str:
"""
Convert an object to :class:`str`
:param value: Object to be converted
:param encoding: Character encoding to use if the object is a :class:`bytes` object
"""
if isinstance(value, bytes):
return value.decode(encoding)
if isinstance(value, bool):
return str(value)
if isinstance(value, str):
return value
if isinstance(value, (dict, list, tuple, set)):
return json.dumps(value)
if isinstance(value, (int, float)):
return str(value)
if value is None:
return ''
raise TypeError(f'Cannot convert "{type(value).__name__}" into a string') from None
def resolve_path(path: Path | str) -> Path: def resolve_path(path: Path | str) -> Path:
if isinstance(path, str): if isinstance(path, str):
path = Path(path) path = Path(path)
@ -104,86 +49,6 @@ class BaseApp:
"Context object used for SSL actions" "Context object used for SSL actions"
class FileSize(int):
"Converts a human-readable file size to bytes"
def __init__(self, size: int | float, unit: FileSizeUnit | str = FileSizeUnit.B):
"""
Create a new FileSize object
:param size: Size of the file
:param unit: Unit notation
"""
self.size: int | float = size
self.unit: FileSizeUnit = FileSizeUnit.parse(unit)
def __new__(cls, size: int | float, unit: FileSizeUnit | str = FileSizeUnit.B):
return int.__new__(cls, FileSizeUnit.parse(unit).multiply(size))
def __repr__(self):
value = int(self)
return f"FileSize({value:,} bytes)"
def __str__(self):
return str(int(self))
@classmethod
def parse(cls: type[Self], text: str) -> Self:
"""
Parse a file size string
:param text: String representation of a file size
:raises AttributeError: If the text cannot be parsed
"""
size, unit = text.strip().split(" ", 1)
return cls(float(size), FileSizeUnit.parse(unit))
def to_optimal_string(self) -> str:
"""
Attempts to display the size as the highest whole unit
"""
index = 0
size: int | float = int(self)
while True:
if size < 1024 or index == 8:
unit = FileSizeUnit.from_index(index)
return f'{size:.2f} {unit}'
try:
index += 1
size = self / FileSizeUnit.from_index(index).multiplier
except IndexError:
raise ValueError('File size is too large to convert to a string') from None
def to_string(self, unit: FileSizeUnit, decimals: int = 2) -> str:
"""
Convert to the specified file size unit
:param unit: Unit to convert to
:param decimals: Number of decimal points to round to
"""
unit = FileSizeUnit.parse(unit)
if unit == FileSizeUnit.BYTE:
return f'{self} B'
size = round(self / unit.multiplier, decimals)
return f'{size} {unit}'
class SslContext(ssl.SSLContext): class SslContext(ssl.SSLContext):
client: AsyncClient client: AsyncClient
"Client object the context is associated with" "Client object the context is associated with"
@ -221,7 +86,7 @@ class SslContext(ssl.SSLContext):
self.server = app # type: ignore self.server = app # type: ignore
def __new__(cls: type[Self], app: BaseApp, *_) -> Self: def __new__(cls: type[Self], app: BaseApp, *_: Any) -> Self:
if app.apptype == AppType.CLIENT: if app.apptype == AppType.CLIENT:
protocol = ssl.PROTOCOL_TLS_CLIENT protocol = ssl.PROTOCOL_TLS_CLIENT

View file

@ -41,7 +41,7 @@ def get_config_dir() -> Path:
return user return user
def main(): def main() -> None:
args = parser.parse_args() args = parser.parse_args()
if args.version: if args.version:

View file

@ -58,7 +58,7 @@ class Router:
Router.set(self) Router.set(self)
def __repr__(self): def __repr__(self) -> str:
return f"Router('{self.name}', trailing_slash={self.trailing_slash})" return f"Router('{self.name}', trailing_slash={self.trailing_slash})"
@ -190,11 +190,11 @@ class BaseRoute:
path: str path: str
"Path the route will handle" "Path the route will handle"
regex: re.Pattern regex: re.Pattern[str]
"Regex pattern to use when matching paths" "Regex pattern to use when matching paths"
def __repr__(self): def __repr__(self) -> str:
return f"{type(self).__name__}('{self.path}')" return f"{type(self).__name__}('{self.path}')"

View file

@ -9,6 +9,7 @@ import traceback
import typing import typing
from asyncio.exceptions import CancelledError from asyncio.exceptions import CancelledError
from blib import AsyncTransport
from pathlib import Path from pathlib import Path
from .router import Router from .router import Router
@ -18,11 +19,11 @@ from ..enums import AppType
from ..error import GeminiError from ..error import GeminiError
from ..message import Request, Response from ..message import Request, Response
from ..misc import BaseApp from ..misc import BaseApp
from ..transport import AsyncTransport
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
from collections.abc import Callable from collections.abc import Callable
from typing import Any
from .router import RouteHandler from .router import RouteHandler
@ -36,7 +37,7 @@ SIGNALS: list[str] = [
] ]
class AsyncServer(BaseApp, dict): class AsyncServer(BaseApp, dict[str, Any]):
"Server for the Gemini protocol" "Server for the Gemini protocol"
apptype: AppType = AppType.SERVER apptype: AppType = AppType.SERVER
@ -148,7 +149,7 @@ class AsyncServer(BaseApp, dict):
asyncio.run(self.start()) asyncio.run(self.start())
def set_signal_handler(self, handler: Callable | None) -> None: def set_signal_handler(self, handler: Callable[..., Any] | None) -> None:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
for sig in SIGNALS: for sig in SIGNALS:
@ -204,7 +205,7 @@ class AsyncServer(BaseApp, dict):
self._server = None self._server = None
def stop(self, *_) -> None: def stop(self, *_: Any) -> None:
"Tell the server to stop" "Tell the server to stop"
if self._server is None: if self._server is None:

View file

@ -1,149 +0,0 @@
from __future__ import annotations
import asyncio
import typing
from contextlib import contextmanager
from .misc import convert_to_bytes
if typing.TYPE_CHECKING:
from typing import Any
class AsyncTransport:
"Transport class for ``StreamReader`` and ``StreamWriter`` objects"
def __init__(self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
timeout: int = 60,
encoding: str = 'utf-8',):
"""
Create a new async transport
:param reader: Reader object
:param writer: Writer object
:param timeout: Time to wait for read methods before giving up
:param encoding: Text encoding to use when decoding raw data into text
"""
self.reader: asyncio.StreamReader = reader
"Reader object"
self.writer: asyncio.StreamWriter = writer
"Writer object"
self.encoding: str = encoding
"Text encoding to use when converting text into bytes"
self.timeout: int = timeout
"Time to wait for read methods before giving up"
@property
def eof(self) -> bool:
"Checks if the reader has reached the end of the stream"
return self.reader.at_eof()
@property
def local_address(self) -> str:
"Get the address of the local socket"
return self.writer.get_extra_info('sockname')[0]
@property
def local_port(self) -> str:
"Get the port of the local socket"
return self.writer.get_extra_info('sockname')[1]
@property
def remote_address(self) -> str:
"Get the address of the remote socket"
return self.writer.get_extra_info('peername')[0]
@property
def remote_port(self) -> str:
"Get the port of the remote socket"
return self.writer.get_extra_info('peername')[1]
@property
def client_port(self) -> int:
"Get the port of the lcient"
return self.writer.get_extra_info('peername')[1]
async def close(self) -> None:
"Close the writer stream"
if self.writer.can_write_eof():
self.writer.write_eof()
self.writer.close()
await self.writer.wait_closed()
async def read(self, length: int = -1) -> bytes:
"""
Read a chunk of data
:param length: Amount of data in bytes to read
"""
return await asyncio.wait_for(self.reader.read(length), self.timeout)
async def readline(self, limit: int = 65536) -> bytes:
"Read until a line ending ('\\\\r' or '\\\\n') is encountered"
with self._set_limit(limit):
return await asyncio.wait_for(self.reader.readline(), self.timeout)
async def readuntil(self, separator: bytes | str, limit = 65536) -> bytes:
"""
Read upto the separator
:param separator: Text or bytes to stop at
"""
if isinstance(separator, str):
separator = separator.encode(self.encoding)
with self._set_limit(limit):
return await asyncio.wait_for(self.reader.readuntil(separator), self.timeout)
async def write(self, data: Any) -> None:
"""
Send data
:param data: Data to be sent
"""
data = convert_to_bytes(data, self.encoding)
self.writer.write(data)
await self.writer.drain()
@contextmanager
def _set_limit(self, limit: int = 65536):
orig_limit = self.reader._limit
self.reader._limit = limit
try:
yield
finally:
self.reader._limit = orig_limit

View file

@ -4,8 +4,9 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "gemi-python" name = "gemi-python"
dynamic = ["version", "readme"] version = "0.1.2"
authors = [{name = "Izalia Mae", email = "admin@barkshark.xyz"}] dynamic = ["readme"]
authors = [{name = "Zoey Mae", email = "admin@barkshark.xyz"}]
description = "Utilities for the Gemini protocol" description = "Utilities for the Gemini protocol"
license = {text = "CNPL 7+"} license = {text = "CNPL 7+"}
keywords = ["gemini", "gemtext"] keywords = ["gemini", "gemtext"]
@ -27,6 +28,7 @@ classifiers = [
] ]
requires-python = ">= 3.9" requires-python = ">= 3.9"
dependencies = [ dependencies = [
"barkshark-lib >= 0.1.2",
"platformdirs == 4.2.0", "platformdirs == 4.2.0",
"pyopenssl == 24.1.0" "pyopenssl == 24.1.0"
] ]
@ -46,7 +48,8 @@ doc = [
dev = [ dev = [
"flake8 == 6.1.0", "flake8 == 6.1.0",
"mypy == 1.9.0", "mypy == 1.9.0",
"pyinstaller == 6.5.0" "pyinstaller == 6.5.0",
"types-pyOpenSSL == 24.0.0.20240417"
] ]
[project.scripts] [project.scripts]
@ -82,5 +85,5 @@ disallow_untyped_decorators = true
warn_redundant_casts = true warn_redundant_casts = true
warn_unreachable = true warn_unreachable = true
warn_unused_ignores = true warn_unused_ignores = true
ignore_missing_imports = true strict = true
follow_imports = "silent" implicit_reexport = true

View file

@ -1,5 +1,5 @@
[flake8] [flake8]
extend-ignore = ANN101,ANN204,E128,E251,E261,E266,E301,E303,W191 extend-ignore = ANN101,ANN204,E128,E251,E261,E266,E301,E303,E402,W191
extend-exclude = docs, test*.py extend-exclude = docs, test*.py
per-file-ignores = __init__.py: F401 per-file-ignores = __init__.py: F401
max-line-length = 100 max-line-length = 100