add barkshark-lib to replace JsonBase, HttpDate, and Enum classes

This commit is contained in:
Izalia Mae 2024-04-22 21:48:01 -04:00
parent 9fb4b6d39d
commit 743fc04341
16 changed files with 76 additions and 360 deletions

View file

@ -1,16 +1,13 @@
__version__ = "0.2.3"
__version__ = "0.3.0-rc1"
from .algorithms import Algorithm, HS2019, RsaSha256
from .algorithms import get as get_algorithm, register as register_algorithm
from .errors import InvalidKeyError, SignatureFailureError
from .message import Attachment, Message, Property
from .misc import Digest, HttpDate, JsonBase, MessageDate, Signature
from .misc import Digest, MessageDate, Signature
from .objects import HostMeta, HostMetaJson, Nodeinfo, Webfinger, WellKnownNodeinfo
from .request_classes import register_signer, register_validator
from .enums import (
Enum,
IntEnum,
StrEnum,
AlgorithmType,
KeyType,
NodeinfoProtocol,

View file

@ -4,10 +4,10 @@ import argparse
import json
import socket
from blib import JsonBase
from functools import cached_property, lru_cache
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.error import HTTPError
from urllib.request import Request, urlopen
@ -116,8 +116,8 @@ ACTOR_BIO = """<p>Signature validation actor</p>
at <a href="{url}/">{url}</a>"""
def get_actor() -> aputils.JsonBase:
return aputils.JsonBase({
def get_actor() -> JsonBase:
return JsonBase({
"@context": [
"https://www.w3.org/ns/activitystreams"
],
@ -154,27 +154,6 @@ class ClientError(Exception):
...
class Response(aputils.JsonBase):
def __init__(self,
status: int,
message: str,
method: str,
path: str,
address: str,
valid: bool,
headers: dict[str, Any]) -> None:
aputils.JsonBase.__init__(self,
status = status, # type: ignore
message = message, # type: ignore
method = method, # type: ignore
path = path.split("?", 1)[0], # type: ignore
address = address, # type: ignore
valid = valid, # type: ignore
headers = headers # type: ignore
)
class RequestHandler(BaseHTTPRequestHandler):
default_request_version = "HTTP/1.1"
signature: aputils.Signature
@ -234,7 +213,7 @@ class RequestHandler(BaseHTTPRequestHandler):
def send(self,
status: int,
message: aputils.JsonBase | str,
message: JsonBase | str,
headers: dict[str, str] | None = None) -> None:
if isinstance(message, str):
@ -265,15 +244,16 @@ class RequestHandler(BaseHTTPRequestHandler):
def send_error(self, status: int, message: str) -> None: # type: ignore
response = Response(
status,
message,
self.method,
self.path,
self.remote,
False,
self.parsed_headers
)
response = JsonBase({
"status": status,
"message": message,
"method": self.method,
"path": self.path.split("?", 1)[0],
"query": self.path.split("?", 1)[1],
"address": self.remote,
"valid": False,
"headers": self.parsed_headers
})
self.send(status, response, {"Content-Type": "application/json"})
@ -290,7 +270,7 @@ class RequestHandler(BaseHTTPRequestHandler):
self.send_error(404, "Invalid user")
return
self.send(200, aputils.JsonBase({
self.send(200, JsonBase({
"subject": f"acct:valtest@{args.hostname}",
"aliases": [
f"{URL}/actor"
@ -399,15 +379,16 @@ class RequestHandler(BaseHTTPRequestHandler):
self.send_error(401, str(error))
return False
response = Response(
200,
"HTTP signature is valid :3",
self.method,
self.path,
self.remote,
True,
self.parsed_headers
)
response = JsonBase({
"status": 200,
"message": "HTTP signature is valid :3",
"method": self.method,
"path": self.path.split("?", 1)[0],
"query": self.path.split("?", 1)[1],
"address": self.remote,
"valid": True,
"headers": self.parsed_headers
})
self.send(200, response)

View file

@ -8,12 +8,13 @@ from Crypto import Hash
from Crypto.PublicKey.RSA import RsaKey
from Crypto.Signature import PKCS1_v1_5
from abc import ABC, abstractmethod
from blib import HttpDate, JsonBase
from datetime import datetime, timedelta
from urllib.parse import urlparse
from .enums import AlgorithmType
from .errors import SignatureFailureError
from .misc import Digest, HttpDate, Signature
from .misc import Digest, Signature
if typing.TYPE_CHECKING:
from collections.abc import Sequence
@ -287,12 +288,10 @@ class HS2019(Algorithm):
host: str | None,
path: str,
headers: dict[str, str],
body: Message | dict[str, Any] | bytes | str | None = None) -> dict[str, str]:
from .message import Message
body: JsonBase | dict[str, Any] | bytes | str | None = None) -> dict[str, str]:
if body is not None:
if isinstance(body, Message):
if isinstance(body, JsonBase):
body = body.to_json()
elif isinstance(body, dict):
@ -312,7 +311,7 @@ class HS2019(Algorithm):
date = HttpDate.parse(raw_date)
elif isinstance(raw_date, datetime) and not isinstance(raw_date, HttpDate):
date = HttpDate.new_from_datetime(raw_date)
date = HttpDate.parse(raw_date)
else:
date = raw_date

View file

@ -1,10 +1,10 @@
from __future__ import annotations
import enum
import typing
from blib import StrEnum
if typing.TYPE_CHECKING:
from collections.abc import Sequence
from typing import Any
try:
@ -16,54 +16,9 @@ if typing.TYPE_CHECKING:
class classproperty(property):
def __get__(self, owner_self: Any, owner_cls: type | None = None) -> Any:
return self.fget(owner_cls) # type: ignore
return self.fget(owner_cls) # type: ignore[misc]
class Enum(enum.Enum):
"Base class for all enums"
@classmethod
def parse(cls: type[Self], key: Any) -> Self:
"""
Try to turn an object into an enum. If an enum cannot be found, a KeyError will be raised
:param typing.Any key: Value to attempt to convert to an Enum
:raises KeyError: When an Enum value cannot be found
"""
if isinstance(key, cls):
return key
try:
return cls[key]
except KeyError:
pass
try:
return cls(key)
except ValueError:
pass
if isinstance(key, str):
for item in cls:
if key.lower() == item.name.lower():
return item
if issubclass(cls, StrEnum) and key.lower() == item.value.lower():
return item
raise KeyError(f"Invalid enum property for {cls.__name__}: {key}")
class IntEnum(int, Enum):
"Enum that acts like an ``int``"
class StrEnum(str, Enum):
"Enum that acts like a ``str``"
class AlgorithmType(StrEnum):
"Algorithm type"
@ -210,9 +165,10 @@ class ObjectType(StrEnum):
@classproperty
def Activity(cls: type[ObjectType]) -> Sequence[ObjectType]: # type: ignore
def Activity(cls: type[Self]) -> tuple[Self, ...]: # type: ignore[misc]
"List of activity types"
return (
return ( # type: ignore[return-value]
cls.ACCEPT,
cls.ADD,
cls.ANNOUNCE,
@ -245,9 +201,10 @@ class ObjectType(StrEnum):
@classproperty
def Actor(cls: type[ObjectType]) -> Sequence[ObjectType]: # type: ignore
def Actor(cls: type[Self]) -> tuple[Self, ...]: # type: ignore[misc]
"List of actor types"
return (
return ( # type: ignore[return-value]
cls.APPLICATION,
cls.GROUP,
cls.ORGANIZATION,
@ -257,9 +214,10 @@ class ObjectType(StrEnum):
@classproperty
def Collection(cls: type[ObjectType]) -> Sequence[ObjectType]: # type: ignore
def Collection(cls: type[Self]) -> tuple[Self, ...]: # type: ignore[misc]
"List of collection types"
return (
return ( # type: ignore[return-value]
cls.COLLECTION,
cls.COLLECTION_PAGE,
cls.ORDERED_COLLECTION,
@ -268,9 +226,10 @@ class ObjectType(StrEnum):
@classproperty
def Media(cls: type[ObjectType]) -> Sequence[ObjectType]: # type: ignore
def Media(cls: type[Self]) -> tuple[Self, ...]: # type: ignore[misc]
"List of media types"
return (
return ( # type: ignore[return-value]
cls.AUDIO,
cls.EMOJI,
cls.IMAGE,
@ -279,9 +238,10 @@ class ObjectType(StrEnum):
@classproperty
def Object(cls: type[ObjectType]) -> Sequence[ObjectType]: # type: ignore
def Object(cls: type[Self]) -> tuple[Self, ...]: # type: ignore[misc]
"List of object types"
return (
return ( # type: ignore[return-value]
cls.APPLICATION,
cls.ARTICLE,
cls.AUDIO,
@ -293,7 +253,7 @@ class ObjectType(StrEnum):
cls.GROUP,
cls.IMAGE,
cls.NOTE,
cls.Object,
cls.OBJECT,
cls.ORGANIZATION,
cls.ORDERED_COLLECTION,
cls.ORDERED_COLLECTION_PAGE,

View file

@ -4,6 +4,7 @@ import json
import re
import typing
from blib import HttpDate, JsonBase
from datetime import datetime
from functools import cached_property
from mimetypes import guess_type
@ -11,7 +12,7 @@ from urllib.parse import urlparse
from .enums import ObjectType
from .errors import InvalidKeyError
from .misc import HttpDate, MessageDate, JsonBase
from .misc import MessageDate
from .signer import Signer
if typing.TYPE_CHECKING:
@ -55,8 +56,7 @@ class Property(typing.Generic[T]):
"""
Create a new dict property
:param key: Name of the key to be handled by this ``Property``
:param value_type: Name of the value type to be used for (de)serialization
:param type_name: Name of the value type to be used for (de)serialization
"""
self.key: str = ""

View file

@ -5,15 +5,14 @@ import json
import typing
from Crypto.Hash import SHA256, SHA512
from datetime import datetime, timezone
from functools import wraps
from blib import HttpDate
from typing import Any
from .enums import AlgorithmType
if typing.TYPE_CHECKING:
from collections.abc import Callable, Sequence
from datetime import tzinfo
from collections.abc import Sequence
from datetime import datetime
try:
from typing import Self
@ -28,37 +27,6 @@ HASHES = {
}
def deprecated(new_method: str, version: str, remove: str | None = None) -> Callable[..., Any]:
"""
Decorator to mark a function as deprecated and display a warning on first use.
:param new_method: Name of the function to replace the wrapped function
:param version: Version of the module in which the wrapped function was considered
deprecated
:param remove: Version the wrapped function will get removed
"""
called = False
def wrapper(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def inner(*args: Any, **kwargs: Any) -> Any:
if not called:
name = func.__qualname__ if hasattr(func, "__qualname__") else func.__name__
if not remove:
print(f"WARN: {name} was deprecated in {version}. Use {new_method} instead.")
else:
msg = f"WARN: {name} was deprecated in {version} and will be removed in "
msg += f"{remove}. Use {new_method} instead."
print(msg)
return func(*args, **kwargs)
return inner
return wrapper
class Digest:
"Represents a body digest"
@ -135,18 +103,6 @@ class Digest:
return cls(digest, alg)
@classmethod
@deprecated("Digest.parse", "0.1.9", "0.3.0")
def new_from_digest(cls: type[Self], digest: str | None) -> Self | None:
"""
Create a new digest from a digest header
:param digest: Digest header
"""
return cls.parse(digest)
@property
def hashalg(self) -> str:
"Hash function used when creating the signature as a string"
@ -171,146 +127,6 @@ class Digest:
return self == Digest.new(body, hash_size)
class HttpDate(datetime):
"Datetime object with convenience methods for parsing and creating HTTP date strings"
FORMAT: str = "%a, %d %b %Y %H:%M:%S GMT"
"Format to pass to datetime when (de)serializing a raw HTTP date"
def __new__(cls: type[Self],
year: int,
month: int,
day: int,
hour: int = 0,
minute: int = 0,
second: int = 0,
microsecond: int = 0,
tzinfo: tzinfo = timezone.utc) -> Self:
return datetime.__new__(
cls, year, month, day, hour, minute, second, microsecond, tzinfo
)
def __str__(self) -> str:
return self.to_string()
@classmethod
def parse(cls: type[Self], date: datetime | str | int | float) -> Self:
"""
Parse a unix timestamp or HTTP date in string format
:param date: Unix timestamp or string from an HTTP Date header
"""
if isinstance(date, cls):
return date
elif isinstance(date, datetime):
return cls.fromisoformat(date.isoformat())
elif isinstance(date, (int | float)):
data = cls.fromtimestamp(float(date) if type(date) is int else date)
else:
data = cls.strptime(date, cls.FORMAT)
return data.replace(tzinfo=timezone.utc)
@classmethod
@deprecated("HttpDate.parse", "0.2.3", "0.3.0")
def new_from_datetime(cls: type[Self], date: datetime) -> Self:
"""
Create a new ``HttpDate`` object from a ``datetime`` object
:param date: ``datetime`` object to convert
"""
return cls.fromisoformat(date.isoformat())
@classmethod
def new_utc(cls: type[Self]) -> Self:
"Create a new ``HttpDate`` object from the current UTC time"
return cls.now(timezone.utc)
def timestamp(self) -> int:
"Return the date as a unix timestamp without microseconds"
return int(datetime.timestamp(self))
def to_string(self) -> str:
"Create an HTTP Date header string from the datetime object"
return self.strftime(self.FORMAT)
class JsonBase(dict[str, Any]):
"A ``dict`` with methods to convert to JSON and back"
@classmethod
@deprecated("JsonBase.parse", "0.1.5", "0.2.0")
def new_from_json(cls: type[Self], data: str | bytes | dict | Self) -> Self:
"""
Parse a JSON object
.. deprecated:: 0.1.5
Use :meth:`JsonBase.parse` instead
:param data: JSON object to parse
:raises TypeError: When an invalid object type is provided
"""
return cls.parse(data)
@classmethod
def parse(cls: type[Self], data: str | bytes | dict | Self) -> Self:
"""
Parse a JSON object
:param data: JSON object to parse
:raises TypeError: When an invalid object type is provided
"""
if isinstance(data, (str, bytes)):
data = json.loads(data)
if isinstance(data, cls):
return data
if not isinstance(data, dict):
raise TypeError(f"Cannot parse objects of type \"{type(data).__name__}\"")
return cls(data)
def to_json(self, indent: int | str | None = None, **kwargs: Any) -> str:
"""
Return the message as a JSON string
:param indent: Number of spaces or the string to use for indention
:param kwargs: Keyword arguments to pass to :func:`json.dumps`
"""
return json.dumps(self, indent = indent, default = self.handle_value_dump, **kwargs)
def handle_value_dump(self, value: Any) -> Any:
"""
Gets called when a value is of the wrong type and needs to be converted for dumping to
json. If the type is unknown, it will be forcibly converted to a ``str``.
:param value: Value to be converted
"""
if not isinstance(value, (str, int, float, bool, dict, list, tuple, type(None))):
print(f"Warning: Cannot properly convert value of type '{type(value).__name__}'")
return str(value)
return value
class MessageDate(HttpDate):
"""
Datetime object with convenience methods for parsing and creating ActivityPub message date
@ -424,24 +240,12 @@ class Signature:
"""
Parse a Signature in string format
:param str string: Signature string
:param str data: Signature string
"""
return cls.new_from_headers({"signature": data})
@classmethod
@deprecated("Signature.parse", "0.1.9", "0.3.0")
def new_from_signature(cls: type[Self], string: str) -> Self:
"""
Parse a Signature header
:param str string: Signature header string
"""
return cls.new_from_headers({"signature": string})
@property
def algs(self) -> tuple[str, str]:
"Return the algorithms used for signing [0] and hashing [1]"
@ -460,14 +264,6 @@ class Signature:
return self.algs[0]
@property
@deprecated("Signature.algorithm", "0.1.9", "0.3.0")
def algorithm_type(self) -> AlgorithmType:
"Type of algorithm used for this signature"
return self.algorithm
@property
def created_date(self) -> HttpDate:
if not self.created:

View file

@ -3,7 +3,8 @@ from __future__ import annotations
import re
import typing
from .misc import JsonBase
from blib import JsonBase
from .enums import (
NodeinfoProtocol,
NodeinfoServiceInbound,

View file

@ -5,6 +5,7 @@ import time
import typing
from Crypto.PublicKey import RSA, ECC
from blib import deprecated
from functools import wraps
from inspect import iscoroutinefunction
from pathlib import Path
@ -12,7 +13,7 @@ from pathlib import Path
from . import algorithms
from .enums import AlgorithmType, KeyType
from .errors import SignatureFailureError
from .misc import Signature, deprecated
from .misc import Signature
from .request_classes import SIGNERS, VALIDATORS
if typing.TYPE_CHECKING:

View file

@ -35,8 +35,6 @@ Misc
* :meth:`aputils.register_signer`
* :meth:`aputils.register_validator`
* :class:`aputils.Digest`
* :class:`aputils.HttpDate`
* :class:`aputils.JsonBase`
* :class:`aputils.MessageDate`
* :class:`aputils.Signature`

View file

@ -36,16 +36,3 @@ Enums
:show-inheritance:
:undoc-members:
:exclude-members: __new__
.. autoclass:: aputils.Enum
:members:
:show-inheritance:
:exclude-members: __new__, __init__
.. autoclass:: aputils.IntEnum
:show-inheritance:
:exclude-members: __new__, __init__
.. autoclass:: aputils.StrEnum
:show-inheritance:
:exclude-members: __new__, __init__

View file

@ -7,15 +7,6 @@ Misc
.. autoclass:: aputils.Digest
:members:
.. autoclass:: aputils.HttpDate
:members:
:show-inheritance:
:exclude-members: __new__
.. autoclass:: aputils.JsonBase
:members:
:show-inheritance:
.. autoclass:: aputils.MessageDate
:members:
:show-inheritance:

View file

@ -49,8 +49,9 @@ external_toc_exclude_missing = True
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
intersphinx_mapping = {
'Crypto': ('https://www.pycryptodome.org/', None),
"blib": ("https://docs.barkshark.xyz/blib", None),
'python': (f'https://docs.python.org/{pyversion}', None),
'crypto': ('https://www.pycryptodome.org/', None)
}
# -- Options for HTML output -------------------------------------------------

View file

@ -120,7 +120,7 @@ typing information, but also has methods to aid in fetching info from the messag
Parsing
~~~~~~~
Since ``Message`` sub-classes :class:`aputils.JsonBase`, you just have to pass a message in any
Since ``Message`` sub-classes :class:`blib.JsonBase`, you just have to pass a message in any
form to :meth:`aputils.Message.parse`.
.. code-block:: python
@ -154,7 +154,7 @@ You can also create new messages.
note["summary"] = "open for a fun fact"
# Most keys also have setters though and is the recommended way to set values
note.created = aputils.HttpDate.new_utc()
note.created = aputils.MessageDate.new_utc()
print(note.to_json(4))

View file

@ -22,7 +22,10 @@ classifiers = [
"Typing :: Typed"
]
requires-python = ">= 3.8"
dependencies = ["pycryptodome == 3.19.0"]
dependencies = [
"barkshark-lib >= 0.1.2",
"pycryptodome == 3.19.0"
]
dynamic = ["version"]
[project.readme]

View file

@ -10,15 +10,15 @@ BODY_DIGEST = "SHA-256=NaQWaYfGtWY/aniiFBuMuASqbmGEes5vsPTFaRKuTGI="
class MiscTest(unittest.TestCase):
def test_httpdate(self):
output = aputils.HttpDate(2022, 11, 25, 6, 9, 42, tzinfo=timezone.utc)
data = "Fri, 25 Nov 2022 06:09:42 GMT"
output = aputils.MessageDate(2022, 11, 25, 6, 9, 42, tzinfo=timezone.utc)
data = "2022-11-25T06:09:42Z"
self.assertEqual(output.to_string(), data)
def test_httpdate_parse(self):
output = aputils.HttpDate.parse("Fri, 25 Nov 2022 06:09:42 GMT")
data = aputils.HttpDate(2022, 11, 25, 6, 9, 42, tzinfo=timezone.utc)
output = aputils.MessageDate.parse("2022-11-25T06:09:42Z")
data = aputils.MessageDate(2022, 11, 25, 6, 9, 42, tzinfo=timezone.utc)
self.assertEqual(output, data)

View file

@ -2,6 +2,7 @@ import aputils
import unittest
import json
from blib import HttpDate
from datetime import timezone
from . import scriptpath
@ -9,7 +10,7 @@ from . import scriptpath
signer = aputils.Signer(scriptpath.joinpath('rsa_privkey.pem'), 'https://social.example.com/users/merpinator#main-key')
url = 'https://relay.example.com/actor'
date = aputils.HttpDate(2022, 11, 25, 6, 9, 42, tzinfo = timezone.utc)
date = HttpDate(2022, 11, 25, 6, 9, 42, tzinfo = timezone.utc)
class SignerTest(unittest.TestCase):