create socket-based http client

This commit is contained in:
Izalia Mae 2022-03-12 06:24:25 -05:00
parent 391bdc8054
commit 95c0d1ff45
20 changed files with 1413 additions and 1330 deletions

View file

@ -15,20 +15,14 @@ from . import logging
izzylog = logging.logger['IzzyLib']
izzylog.set_config('level', os.environ.get('IZZYLOG_LEVEL', 'INFO'))
from .dotdict import (
DotDict,
DefaultDotDict,
LowerDotDict,
MultiDotDict,
JsonEncoder
)
from .path import *
from .datestring import *
from .dotdict import *
from .misc import *
from .cache import *
from .config import *
from .connection import *
from .url import *
from .http_client import (
HttpClient,
HttpClientRequest,
@ -48,6 +42,7 @@ def register_global(obj, name=None):
def add_builtins(*classes, **kwargs):
new_builtins = [
BaseConfig,
DateString,
DotDict,
HttpClient,
JsonConfig,

47
izzylib/enums.py Normal file
View file

@ -0,0 +1,47 @@
from enum import Enum, IntEnum
__all__ = [
'DatetimeFormat',
'LogLevel',
'PathType',
'Protocol'
]
class DatetimeFormat(Enum):
HTTP = '%a, %d %b %Y %H:%M:%S GMT'
ACTIVITYPUB = '%Y-%m-%dT%H:%M:%SZ'
MASTODON = '%Y-%m-%d'
class LogLevel(IntEnum):
CRITICAL = 60,
ERROR = 50
WARNING = 40
INFO = 30
VERBOSE = 20
DEBUG = 10
MERP = 0
class PathType(Enum):
DIR = 'dir'
FILE = 'file'
LINK = 'link'
UNKNOWN = 'unknown'
class Protocol(IntEnum):
FTP = 21
SSH = 22
SMTP = 25
DNS = 53
TFTP = 69
HTTP = 80
WS = 80
HTTPS = 443
WSS = 443
FTPS = 990
MYSQL = 3306
PSQL = 5432

View file

@ -3,6 +3,10 @@ __all__ = [
]
class ReadOnlyError(Exception):
'Raise when a read-only property is attempted to be set'
class DBusClientError(Exception):
pass

View file

@ -1,28 +1,3 @@
content_types = {
'json': 'application/json',
'activity': 'application/activity+json',
'css': 'text/css',
'html': 'text/html',
'js': 'application/javascript',
'png': 'image/png',
'jpeg': 'image/jpeg',
'gif': 'image/gif'
}
http_methods = {
'CONNECT',
'DELETE',
'GET',
'HEAD',
'OPTIONS',
'PATCH',
'POST',
'PUT',
'TRACE'
}
from .client import HttpClient
from .request import HttpClientRequest
from .response import HttpClientResponse

View file

@ -1,210 +1,74 @@
import functools, json, sys
import socket
import ssl
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property, partial
from io import BytesIO
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.request import urlopen
from . import http_methods
from .config import Config
from .request import HttpClientRequest
from .response import HttpClientResponse
from .. import izzylog, __version__
from ..dotdict import DefaultDotDict, DotDict
from ..exceptions import HttpFileDownloadedError
from ..misc import Url
from ..path import Path
from .. import __version__
from ..dotdict import DotDict
from ..http_utils import Headers
from ..misc import class_name
from ..object_base import ObjectBase
try:
from PIL import Image
except ImportError:
izzylog.verbose('Pillow module not found. Image downloading is disabled')
Image = False
from typing import *
class HttpClient(ObjectBase):
def __init__(self, appagent=None, headers={}, request_class=None, response_class=None):
super().__init__(
headers = ClientHeaders(headers),
timeout = 60,
request_class = request_class or HttpClientRequest,
response_class = response_class or HttpClientResponse,
readonly = ['headers']
)
if appagent:
self.set_appagent(appagent)
__pdoc__ = {f'Client.{method.lower()}': f'Send a {method.upper()} request' for method in http_methods}
def build_request(self, url, method='GET', body=b'', headers={}, cookies=[]):
return self.request_class(url,
method = method,
body = body,
headers = {**headers, **self.headers},
cookies = cookies
)
class HttpClient:
'Basic HTTP client based on `urllib.request.urlopen`'
def request(self, url, method='GET', body=b'', headers={}, cookies=[]):
req = self.build_request(url, method, body, headers, cookies)
return self.send_request(req)
def __init__(self, **kwargs):
self._cfg = Config(**kwargs)
for method in http_methods:
self.__set_method(method)
def send_request(self, request):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.settimeout(self.timeout)
sock.connect((request.url.ipaddress, request.url.port))
if request.url.proto.lower() in ['https', 'wss']:
context = ssl.create_default_context()
sock = context.wrap_socket(sock, server_hostname=request.url.domain)
@property
def cfg(self):
'The config options for the client as a dict'
return self._cfg
sock.sendall(request.compile())
return self.response_class(self, sock, request.url)
def get(self, *args, **kwargs):
return self.request(*args, method='GET', **kwargs)
def set_appagent(self, appagent):
self.headers['User-Agent'] = f'IzzyLib/{__version__} ({appagent})'
def post(self, *args, **kwargs):
return self.request(*args, method='POST', **kwargs)
class ClientHeaders(DotDict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def head(self, *args, **kwargs):
return self.request(*args, method='HEAD', **kwargs)
def __getitem__(self, key):
return super().__getitem__(key.replace('_', '-').title())
def connect(self, *args, **kwargs):
return self.request(*args, method='CONNECT', **kwargs)
def __setitem__(self, key, value):
super().__setitem__(key.replace('_', '-').title(), value)
def delete(self, *args, **kwargs):
return self.request(*args, method='DELETE', **kwargs)
def put(self, *args, **kwargs):
return self.request(*args, method='PUT', **kwargs)
def patch(self, *args, **kwargs):
return self.request(*args, method='PATCH', **kwargs)
def trace(self, *args, **kwargs):
return self.request(*args, method='TRACE', **kwargs)
def options(self, *args, **kwargs):
return self.request(*args, method='OPTIONS', **kwargs)
def __set_method(self, method):
setattr(self, method.lower(), partial(self.request, method=method.upper()))
def build_request(self, *args, **kwargs) -> HttpClientRequest:
'Creates a new HttpClientRequest object. It can be used with `Client.send_request` See `izzylib.http_client.request.HttpClientRequest` for available arguments..'
request = self.cfg.request_class(*args, **kwargs)
request._set_params(self.cfg)
return request
def send_request(self, request: HttpClientRequest) -> HttpClientResponse:
'Sends a request'
if not isinstance(request, HttpClientRequest):
raise TypeError(f'Must be a izzylib.http_client.request.HttpClientRequest object (or subclassed), not {type(request).__name__}')
if not request._params_set:
request._set_params(self.cfg)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
return self.cfg.response_class(response)
def request(self, *args, **kwargs) -> HttpClientResponse:
'Create and send a request'
request = self.build_request(*args, **kwargs)
return self.send_request(request)
def json_request(self, *args, headers:dict={}, activity:bool=False, **kwargs):
'Create and send a request with the Accept header set for JSON. If `activity` is `True`, the header value will be `aplication/activity+json`, otherwise `application/json`'
for key in list(headers.keys()):
if key.lower() == 'accept':
del headers[key]
json_type = 'activity+json' if activity else 'json'
headers['Accept'] = f'application/{json_type}'
return self.request(*args, headers=headers, **kwargs)
def download_file(self, url:str, filepath:Path, *args, overwrite:bool=False, create_dirs:bool=True, chunk_size:int=2048, **kwargs) -> Path:
'Make a request and save it to the specified path'
filepath = Path(filepath)
tmppath = filepath.parent.joinpath(filepath.name + '.dltemp')
if not overwrite and filepath.exists:
raise FileExistsError(f'File already exists: {filepath}')
if not filepath.parent.exists:
if not create_dirs:
raise FileNotFoundError(f'Path does not exist: {filepath.parent}')
filepath.parent.mkdir()
if tmpath.exists:
kwargs['headers']['range'] = f'bytes={tmppath.size}'
resp = self.request(url, *args, stream=True, **kwargs)
if not resp.headers.get('content-length'):
try: tmppath.delete()
except: pass
raise FileExistsError('File already downloaded fully')
if resp.status != 200:
raise HttpError(resp)
with tmppath.open('ab') as fd:
for chunk in resp.chunks(chunk_size):
fd.write(chunk)
shutil.move(tmppath, filepath)
return filepath
def download_image(self, url:str, filepath:Path, *args, output_format:str='png', dimensions:List[int]=None, create_dirs:bool=True, **kwargs) -> BytesIO:
'''
Download an image and save it in the specified format. Optionally resize the image with `dimensions` up to the original size.
example: `dimensions = [50, 50]`
'''
if not Image:
raise ValueError('Pillow module is not installed')
filepath = Path(filepath)
if not filepath.parent.exists and not create_dirs:
raise FileNotFoundError(f'Path does not exist: {filepath.parent}')
filepath.parent.mkdir()
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
raise HttpError(resp)
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
if dimensions:
image.thumbnail(dimensions)
image.save(byte, format=output_format.upper())
with filepath.open('wb') as fd:
fd.write(byte.getvalue())
return byte
def __delitem__(self, key):
super().__delitem__(key.replace('_', '-').title())

View file

@ -1,51 +1,28 @@
import json
from datetime import datetime
from urllib.parse import urlencode
from urllib.request import Request as PyRequest
from . import http_methods, content_types
from ..dotdict import CapitalDotDict
from ..misc import Url, convert_to_boolean, convert_to_bytes, protocol_ports
try: import magic
except ImportError: magic = None
from ..datestring import DateString
from ..enums import DatetimeFormat
from ..http_utils import CookieItem, Headers, create_request_message, methods
from ..object_base import ObjectBase
from ..url import Url
class HttpClientRequest(PyRequest):
def __init__(self, url:str, body:bytes=None, headers:dict={}, cookies:dict={}, method:str='GET'):
'An HTTP request. Headers can be accessed, set, or deleted as dict items.'
class HttpClientRequest(ObjectBase):
def __init__(self, url, method='GET', body=b'', headers={}, cookies={}):
assert method.upper() in methods
super().__init__(url)
self._url = None
self._headers = CapitalDotDict(headers)
self.body = body
self.url = url
self.method = method
if self.url.proto.upper() not in protocol_ports.keys():
raise ValueError(f'Invalid protocol in url: {self.url.proto}')
self._params_set = False
super().__init__(
url = url,
method = method,
body = body,
headers = headers,
readonly_props = ['headers']
)
def __getattr__(self, key):
if key == 'origin_req_host':
return object.__getattribute__(self, '_host')
elif key == 'full_url':
return object.__getattribute__(self, '_url')
return object.__getattribute__(self, key)
self.headers['Host'] = self.url.domain
def __setattr__(self, key, value):
if key in ['headers', 'host']:
return
object.__setattr__(self, key, value)
def __bytes__(self):
return self.compile()
def __getitem__(self, key):
@ -53,154 +30,71 @@ class HttpClientRequest(PyRequest):
def __setitem__(self, key, value):
self.headers[key] = str(value)
self.headers[key] = value
def __delitem__(self, key):
del self.headers[key]
def _set_params(self, config):
self.headers.update(config.headers)
def _parse_property(self, key, value):
if key == 'url':
if not isinstance(value, Url):
value = Url(value)
if config.proxy_host:
self.set_proxy(f'{config.proxy_host}:{config.proxy_port}', config.proxy_type)
value = value.replace_property('anchor', None)
self._params_set = True
elif key == 'body':
if not isinstance(value, bytes):
value = convert_to_bytes(value)
elif key == 'headers':
value = Headers(value)
return value
@property
def body(self):
'Data to be sent along with the request'
return self._data
def cookies(self):
for cookie in self.headers.get('cookie', []):
yield cookie
@body.setter
def body(self, data):
self._data = convert_to_bytes(data)
#self.set_header('Content-Length', len(self._data))
def compile(self):
if 'Content-Length' not in self.headers and self.body:
self.headers['Content-Length'] = len(self.body)
if 'Host' not in self.headers:
self.headers['Host'] = DateString.now(DatetimeFormat.HTTP)
return create_request_message(self.method, self.url.path, body=self.body, headers=self.headers)
@property
def data(self):
'Alias for `HttpClientRequest.body`'
return self._data
def get_cookie(self, key):
return self.headers['Cookies']
@data.setter
def data(self, data):
self.body = data
def get_header(self, key, default=None):
try:
return self.headers[key]
except KeyError:
self.headers[key] = default
@property
def headers(self):
'Headers to send with this request.'
return self._headers
@headers.setter
def headers(self, data:dict):
self._headers.update(dict)
@property
def host(self):
'Domain the request will be sent to.'
return self.url.host
@property
def method(self):
'Method used for this request.'
return self._method
@method.setter
def method(self, value):
if value.upper() not in http_methods:
raise ValueError(f'Invalid HTTP method: {method}')
self._method = value.upper()
@property
def port(self):
'Port the request will use when contacting the server.'
return self.url.port
@property
def secure(self):
'Whether or not the request will use SSL.'
return self.url.proto in ['https', 'wss', 'ftps']
@property
def url(self) -> Url:
'The URL of the request'
return Url(self.full_url)
@url.setter
def url(self, url):
self.full_url = Url(url)
def set_chunked(self, value:True):
'Set the `Encoding` header to `chunked` if `value` is `True`'
if convert_to_boolean(value):
self.set_header('Encoding', 'chunked')
self.unset_header('Content-Length')
else:
self.unset_header('Encoding')
self.set_header('Content-Length', len(self.body))
def get_header(self, key):
'Get a header if it exists'
return self.headers.get(key)
return self.headers[key]
def set_header(self, key, value):
'Set a header to the specified value'
self.headers[key] = value
def unset_header(self, key):
'Remove a header'
self.headers.pop(key, None)
def set_cookie(self, key, value, **kwargs):
for cookie in self.cookies:
if key == cookie.key:
cookie.value = value
cookie.update(kwargs)
return cookie
def update_headers(self, data={}, **kwargs):
'Update headers'
kwargs.update(data)
self.headers.update(kwargs)
def set_type(self, content_type):
'Set the `Content-Type` header. Either a mimetype or a `content_types` key can be used.'
self.set_header('Content-Type', content_types.get(content_type, content_type))
def set_type_from_body(self) -> str:
'Set the `Content-Type` header based on `HttpClientRequest.body` and return the mimetype.'
if not self.body:
return
mimetype = magic.from_buffer(self.body, mime=True)
self.set_type(mimetype)
return mimetype
def sign_headers(self, privkey:str, keyid:str):
'[Not implemented yet] Create an ActivityPub signature of the headers via a private RSA key. Will create `Host` and `Date` headers if they do not exist.'
raise ImportError(f'Not implemented yet')
if not sign_request:
raise ImportError(f'Could not import HTTP signatures. Header signing disabled')
return sign_request(self, privkey, keyid)
item = CookieItem(key, value, **kwargs)
self.cookies.append(item)
return item

View file

@ -1,100 +1,178 @@
from io import BytesIO
import traceback
from ..dotdict import DotDict
from ..http_utils import Cookies, Headers
from ..misc import Url
from ..http_utils import Headers, parse_headers
from ..misc import class_name
from ..object_base import ObjectBase
from ..path import Path
class HttpClientResponse:
headers = None
cookies = None
class HttpClientResponse(ObjectBase):
_body = b''
def __init__(self, response):
self.__response = response
self.__body = b''
self.__url = Url(response.url)
def __init__(self, client, sock, url):
super().__init__(
client = client,
sock = sock,
url = url,
headers = Headers(request=False),
version = None,
status = None,
message = None,
raw_headers = b'',
readonly_props = ['client', 'socket']
)
headers = []
cookies = []
self._read_headers()
for key, value in response.getheaders():
if key.lower in ['set-cookie']:
cookies.append(value)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_traceback:
traceback.print_tb(exc_traceback)
try:
self.sock.close()
except Exception as e:
print(f'{class_name(e)}: {e}')
pass
def _read_headers(self):
#for line in self.read_lines():
#self.raw_headers += line
data = b''
while True:
data += self.read(2048)
if not data or b'\r\n\r\n' in data:
break
try:
self.raw_headers, self._body = data.split(b'\r\n\r\n', 1)
except ValueError:
self.raw_headers = data
parsed = parse_headers(self.raw_headers, request=False)
for key, value in parsed.items():
if key == 'headers':
for k, v in value:
self.headers.append(k, v)
else:
headers.append((key, value))
self.set_property(key, value)
self.headers = Headers(headers, readonly=True)
self.cookies = Cookies(cookies, readonly=True)
self.set_readonly(key)
self.headers._readonly = True
def __getitem__(self, key):
return self.get_header(key.capitalize())
def _read_body(self):
if self.length <= len(self._body):
return
self._body += self.read(self.length - len(self._body))
return self._body
@property
def length(self):
try:
return self.headers['Content-Length'].one()
except:
return 0
@property
def mimetype(self):
try:
return self.headers['Content-Type'].one()
except:
return
@property
def body(self):
if not self.__body:
self.read()
if self.length:
self._read_body()
return self.__body
return self._body
#@property
#def body(self):
#if not self._body:
#self._read_body_lines()
# todo: fetch encoding from headers if possible
@property
def encoding(self):
return 'utf-8'
@property
def status(self):
return self.__response.status
@property
def url(self):
return self.__url
@property
def version(self):
vers = self.__response.version
if vers == 10:
return 1.0
elif vers == 11:
return 1.1
elif vers == 20:
return 2.0
print('Warning! Invalid HTTP version:', type(vers).__name__, vers)
return vers
@property
def bytes(self):
return self.__body
#return self._body
@property
def text(self):
return self.body.decode(self.encoding)
return self.body.decode('utf-8')
@property
def form(self):
return DotDict.new_from_query_string(self.text)
@property
def json(self):
return DotDict(self.body)
return DotDict(self.text)
def get_header(self, name):
return self.headers.get(name.lower())
def cookies(self):
return self.headers.cookies()
def read(self, amount=None):
data = self.__response.read(amount)
self.__body += data
def get_cookie(self, key):
return self.headers.get_cookie(key)
return data
def query(self):
return self.url.query
def read(self, length=8192):
return self.sock.recv(length)
#def read_lines(self):
#while True:
#line = self.fd.readline()
#if not line or line == b'\r\n':
#break
#yield line
def save_to_file(self, path=None, filename=None, chunk=8192):
path = Path(path) if path else Path.cwd
data_left = self.length
if path.is_dir:
path = path.join(filename or self.url.path.name)
with path.open('wb') as fd:
fd.write(self._body)
while True:
if chunk > data_left:
fd.write(self.read(data_left))
break
fd.write(self.read(chunk))
data_left -= chunk
return path

View file

@ -1,12 +1,36 @@
from . import error
from .client import HttpUrllibClient, set_default_client
from .request import HttpUrllibRequest
from .response import HttpUrllibResponse
import mimetypes
__all__ = [
'HttpUrllibClient',
'HttpUrllibRequest',
'HttpUrllibResponse',
'set_default_client',
'error'
]
mimetypes.add_type('application/activity+json', '.activity')
content_types = {
'json': 'application/json',
'activity': 'application/activity+json',
'css': 'text/css',
'html': 'text/html',
'js': 'application/javascript',
'png': 'image/png',
'jpeg': 'image/jpeg',
'gif': 'image/gif'
}
http_methods = {
'CONNECT',
'DELETE',
'GET',
'HEAD',
'OPTIONS',
'PATCH',
'POST',
'PUT',
'TRACE'
}
valid_protocols = ['http', 'https', 'ws', 'wss', 'ftp', 'ftps']
from .client import HttpUrllibClient
from .request import HttpUrllibClientRequest
from .response import HttpUrllibClientResponse

View file

@ -1,218 +1,210 @@
import json, sys, urllib3
import functools, json, sys
from PIL import Image
from base64 import b64encode
from base64 import b64decode, b64encode
from datetime import datetime
from functools import cached_property, lru_cache
from functools import cached_property, partial
from io import BytesIO
from ssl import SSLCertVerificationError
from urllib.error import HTTPError
from urllib.request import urlopen
from .request import HttpUrllibRequest
from .response import HttpUrllibResponse
from . import http_methods
from .config import Config
from .request import HttpUrllibClientRequest
from .response import HttpUrllibClientResponse
from .. import __version__
from ..dotdict import DefaultDotDict, DotDict, LowerDotDict
from .. import izzylog, __version__
from ..dotdict import DefaultDotDict, DotDict
from ..exceptions import HttpFileDownloadedError
from ..misc import Url
from ..path import Path
from ..url import Url
try:
from PIL import Image
except ImportError:
izzylog.verbose('Pillow module not found. Image downloading is disabled')
Image = False
from typing import *
Client = None
proxy_ports = {
'http': 80,
'https': 443
}
__pdoc__ = {f'Client.{method.lower()}': f'Send a {method.upper()} request' for method in http_methods}
class HttpUrllibClient:
def __init__(self, headers={}, useragent=None, appagent=None, proxy_type='https', proxy_host=None, proxy_port=None, num_pools=20):
if not useragent:
useragent = f'IzzyLib/{__version__}'
'Basic HTTP client based on `urllib.request.urlopen`'
self.headers = {k:v.lower() for k,v in headers.items()}
self.agent = f'{useragent} ({appagent})' if appagent else useragent
if proxy_type not in ['http', 'https']:
raise ValueError(f'Not a valid proxy type: {proxy_type}')
def __init__(self, **kwargs):
self._cfg = Config(**kwargs)
if proxy_host:
proxy = f'{proxy_type}://{proxy_host}:{proxy_ports[proxy_type] if not proxy_port else proxy_port}'
self.pool = urllib3.ProxyManager(proxy, num_pools=num_pools)
else:
self.pool = urllib3.PoolManager(num_pools=num_pools)
for method in http_methods:
self.__set_method(method)
@property
def agent(self):
return self.headers['user-agent']
def cfg(self):
'The config options for the client as a dict'
return self._cfg
@agent.setter
def agent(self, value):
self.headers['user-agent'] = value
def get(self, *args, **kwargs):
return self.request(*args, method='GET', **kwargs)
def set_global(self):
set_default_client(self)
def post(self, *args, **kwargs):
return self.request(*args, method='POST', **kwargs)
def build_request(self, *args, **kwargs):
return HttpUrllibRequest(*args, **kwargs)
def head(self, *args, **kwargs):
return self.request(*args, method='HEAD', **kwargs)
def handle_request(self, request):
request.headers.update(self.headers)
response = self.pool.urlopen(*request._args, **request._kwargs)
return HttpUrllibResponse(response)
def connect(self, *args, **kwargs):
return self.request(*args, method='CONNECT', **kwargs)
def request(self, *args, **kwargs):
return self.handle_request(self.build_request(*args, **kwargs))
def delete(self, *args, **kwargs):
return self.request(*args, method='DELETE', **kwargs)
def signed_request(self, privkey, keyid, *args, **kwargs):
return self.request(*args, privkey=privkey, keyid=keyid, **kwargs)
def put(self, *args, **kwargs):
return self.request(*args, method='PUT', **kwargs)
def download(self, url, filepath, *args, filename=None, **kwargs):
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
raise HttpFileDownloadedError(f'Failed to download {url}: Status: {resp.status}, Body: {resp.body}')
return resp.save(filepath)
def patch(self, *args, **kwargs):
return self.request(*args, method='PATCH', **kwargs)
def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs):
if not Image:
izzylog.error('Pillow module is not installed')
return
resp = self.request(url, *args, **kwargs)
if resp.status != 200:
izzylog.error(f'Failed to download {url}:', resp.status, resp.body)
return False
if not filename:
filename = Path(url).stem()
path = Path(filepath)
if not path.exists:
izzylog.error('Path does not exist:', path)
return False
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
image.thumbnail(dimensions)
image.save(byte, format=ext.upper())
with path.join(filename).open('wb') as fd:
fd.write(byte.getvalue())
def trace(self, *args, **kwargs):
return self.request(*args, method='TRACE', **kwargs)
def json(self, *args, headers={}, activity=False, **kwargs):
def options(self, *args, **kwargs):
return self.request(*args, method='OPTIONS', **kwargs)
def __set_method(self, method):
setattr(self, method.lower(), partial(self.request, method=method.upper()))
def build_request(self, *args, **kwargs) -> HttpUrllibClientRequest:
'Creates a new `HttpUrllibClientRequest` object. It can be used with `Client.send_request` See `HttpUrllibClientRequest` for available arguments..'
request = self.cfg.request_class(*args, **kwargs)
request._set_params(self.cfg)
return request
def send_request(self, request: HttpUrllibClientRequest) -> HttpUrllibClientResponse:
'Sends a request'
if not isinstance(request, HttpUrllibClientRequest):
raise TypeError(f'Must be a HttpUrllibClientRequest object (or subclassed), not {type(request).__name__}')
if not request._params_set:
request._set_params(self.cfg)
try:
response = urlopen(request)
except HTTPError as e:
response = e.fp
return self.cfg.response_class(response)
def request(self, *args, **kwargs) -> HttpUrllibClientResponse:
'Create and send a request'
request = self.build_request(*args, **kwargs)
return self.send_request(request)
def json_request(self, *args, headers:dict={}, activity:bool=False, **kwargs):
'Create and send a request with the Accept header set for JSON. If `activity` is `True`, the header value will be `aplication/activity+json`, otherwise `application/json`'
for key in list(headers.keys()):
if key.lower() == 'accept':
del headers[key]
json_type = 'activity+json' if activity else 'json'
headers.update({
'accept': f'application/{json_type}'
})
headers['Accept'] = f'application/{json_type}'
return self.request(*args, headers=headers, **kwargs)
def set_default_client(client=None):
global Client
Client = client or HttpClient()
def download_file(self, url:str, filepath:Path, *args, overwrite:bool=False, create_dirs:bool=True, chunk_size:int=2048, **kwargs) -> Path:
'Make a request and save it to the specified path'
filepath = Path(filepath)
tmppath = filepath.parent.joinpath(filepath.name + '.dltemp')
if not overwrite and filepath.exists:
raise FileExistsError(f'File already exists: {filepath}')
if not filepath.parent.exists:
if not create_dirs:
raise FileNotFoundError(f'Path does not exist: {filepath.parent}')
filepath.parent.mkdir()
if tmpath.exists:
kwargs['headers']['range'] = f'bytes={tmppath.size}'
resp = self.request(url, *args, stream=True, **kwargs)
if not resp.headers.get('content-length'):
try: tmppath.delete()
except: pass
raise FileExistsError('File already downloaded fully')
if resp.status != 200:
raise HttpError(resp)
with tmppath.open('ab') as fd:
for chunk in resp.chunks(chunk_size):
fd.write(chunk)
shutil.move(tmppath, filepath)
return filepath
@lru_cache(maxsize=512)
def fetch_actor(url):
if not Client:
raise ValueError('Please set global client with "HttpUrllibClient.set_global()"')
def download_image(self, url:str, filepath:Path, *args, output_format:str='png', dimensions:List[int]=None, create_dirs:bool=True, **kwargs) -> BytesIO:
'''
Download an image and save it in the specified format. Optionally resize the image with `dimensions` up to the original size.
url = url.split('#')[0]
headers = {'Accept': 'application/activity+json'}
resp = Client.request(url, headers=headers)
example: `dimensions = [50, 50]`
'''
try:
actor = resp.json
if not Image:
raise ValueError('Pillow module is not installed')
except json.decoder.JSONDecodeError:
return
filepath = Path(filepath)
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
if not filepath.parent.exists and not create_dirs:
raise FileNotFoundError(f'Path does not exist: {filepath.parent}')
actor.web_domain = Url(url).host
actor.shared_inbox = actor.inbox
actor.pubkey = None
actor.handle = actor.preferredUsername
filepath.parent.mkdir()
if actor.get('endpoints'):
actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox)
resp = self.request(url, *args, **kwargs)
if actor.get('publicKey'):
actor.pubkey = actor.publicKey.get('publicKeyPem')
if resp.status != 200:
raise HttpError(resp)
return actor
byte = BytesIO()
image = Image.open(BytesIO(resp.body))
if dimensions:
image.thumbnail(dimensions)
@lru_cache(maxsize=512)
def fetch_instance(domain):
if not Client:
raise ValueError('Please set global client with "HttpUrllibClient.set_global()"')
image.save(byte, format=output_format.upper())
headers = {'Accept': 'application/json'}
resp = Client.request(f'https://{domain}/api/v1/instance', headers=headers)
with filepath.open('wb') as fd:
fd.write(byte.getvalue())
try:
return resp.json
except json.decoder.JSONDecodeError:
return
except Exception as e:
izzylog.debug(f'HTTP {resp.status}: {resp.body}')
raise e from None
@lru_cache(maxsize=512)
def fetch_nodeinfo(domain):
if not Client:
raise ValueError('Please set global client with HttpUrllibClient.set_global()')
webfinger = Client.request(f'https://{domain}/.well-known/nodeinfo')
webfinger_data = DotDict(webfinger.body)
for link in webfinger.json.links:
if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
nodeinfo_url = link['href']
break
nodeinfo = Client.request(nodeinfo_url)
return nodeinfo.json
@lru_cache(maxsize=512)
def fetch_webfinger_account(handle, domain):
if not Client:
raise ValueError('Please set global client with HttpUrllibClient.set_global()')
data = DefaultDotDict()
webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}')
if not webfinger.body:
raise ValueError('Webfinger body empty')
data.handle, data.domain = webfinger.json.subject.replace('acct:', '').split('@')
for link in webfinger.json.links:
if link['rel'] == 'self' and link['type'] == 'application/activity+json':
data.actor = link['href']
return data
return byte

View file

@ -1,5 +1,5 @@
from .request import HttpClientRequest
from .response import HttpClientResponse
from .request import HttpUrllibClientRequest
from .response import HttpUrllibClientResponse
from .. import __version__
from ..config import BaseConfig
@ -44,8 +44,8 @@ class Config(BaseConfig):
headers = CapitalDotDict({'User-Agent': f'IzzyLib/{__version__}'}),
appagent = None,
timeout = 60,
request_class = HttpClientRequest,
response_class = HttpClientResponse,
request_class = HttpUrllibClientRequest,
response_class = HttpUrllibClientResponse,
proxy_type = 'https',
proxy_host = None,
proxy_port = None

View file

@ -1,36 +0,0 @@
from urllib3.exceptions import (
HTTPError,
PoolError,
RequestError,
SSLError,
ProxyError,
DecodeError,
ProtocolError,
MaxRetryError,
HostChangedError,
TimeoutStateError,
TimeoutError,
ReadTimeoutError,
NewConnectionError,
EmptyPoolError,
ClosedPoolError,
LocationValueError,
LocationParseError,
URLSchemeUnknown,
ResponseError,
SecurityWarning,
InsecureRequestWarning,
SystemTimeWarning,
InsecurePlatformWarning,
SNIMissingWarning,
DependencyWarning,
ResponseNotChunked,
BodyNotHttplibCompatible,
IncompleteRead,
InvalidChunkLength,
InvalidHeader,
ProxySchemeUnknown,
ProxySchemeUnsupported,
HeaderParsingError,
UnrewindableBodyError
)

View file

@ -1,92 +1,209 @@
import json
from datetime import datetime
from mimetypes import types_map as content_types
from urllib.parse import urlencode
from urllib.request import Request as PyRequest
from ..dotdict import DotDict, LowerDotDict
from ..misc import Url, boolean
from . import http_methods, valid_protocols
from ..dotdict import CapitalDotDict
from ..enums import Protocol
from ..misc import convert_to_boolean, convert_to_bytes
from ..url import Url
try:
from ..http_signatures import sign_request
except ModuleNotFoundError:
sign_request = None
try: import magic
except ImportError: magic = None
methods = ['delete', 'get', 'head', 'options', 'patch', 'post', 'put']
class HttpUrllibClientRequest(PyRequest):
def __init__(self, url:str, body:bytes=None, headers:dict={}, cookies:dict={}, method:str='GET'):
'An HTTP request. Headers can be accessed, set, or deleted as dict items.'
super().__init__(url)
class HttpUrllibRequest:
def __init__(self, url, **kwargs):
self._body = b''
self._url = None
self._headers = CapitalDotDict(headers)
method = kwargs.get('method', 'get').lower()
if method not in methods:
raise ValueError(f'Invalid method: {method}')
self.url = Url(url)
self.body = kwargs.get('body')
self.body = body
self.url = url
self.method = method
self.headers = LowerDotDict(kwargs.get('headers', {}))
self.redirect = boolean(kwargs.get('redirect', True))
self.retries = int(kwargs.get('retries', 10))
self.timeout = int(kwargs.get('timeout', 5))
privkey = kwargs.get('privkey')
keyid = kwargs.get('keyid')
if self.url.proto.lower() not in valid_protocols:
raise ValueError(f'Invalid protocol in url: {self.url.proto}')
if privkey and keyid:
self.sign(privkey, keyid)
self._params_set = False
@property
def _args(self):
return [self.method.upper(), self.url]
def __getattr__(self, key):
if key == 'origin_req_host':
return object.__getattribute__(self, '_host')
elif key == 'full_url':
return object.__getattribute__(self, '_url')
return object.__getattribute__(self, key)
@property
def _kwargs(self):
return {
'body': self.body,
'headers': self.headers,
'redirect': self.redirect,
'retries': self.retries,
'timeout': self.timeout
}
def __setattr__(self, key, value):
if key in ['headers', 'host']:
return
object.__setattr__(self, key, value)
def __getitem__(self, key):
return self.headers[key]
def __setitem__(self, key, value):
self.headers[key] = str(value)
def __delitem__(self, key):
del self.headers[key]
def _set_params(self, config):
self.headers.update(config.headers)
if config.proxy_host:
self.set_proxy(f'{config.proxy_host}:{config.proxy_port}', config.proxy_type)
self._params_set = True
@property
def body(self):
return self._body
'Data to be sent along with the request'
return self._data
@body.setter
def body(self, data):
if isinstance(data, dict):
data = DotDict(data).to_json()
self._data = convert_to_bytes(data)
#self.set_header('Content-Length', len(self._data))
elif any(map(isinstance, [data], [list, tuple])):
data = json.dumps(data)
if data == None:
data = b''
@property
def data(self):
'Alias for `HttpClientRequest.body`'
return self._data
elif not isinstance(data, bytes):
data = bytes(data, 'utf-8')
self._body = data
@data.setter
def data(self, data):
self.body = data
@property
def headers(self):
'Headers to send with this request.'
return self._headers
@headers.setter
def headers(self, data:dict):
self._headers.update(dict)
@property
def host(self):
'Domain the request will be sent to.'
return self.url.host
@property
def method(self):
'Method used for this request.'
return self._method
@method.setter
def method(self, value):
if value.upper() not in http_methods:
raise ValueError(f'Invalid HTTP method: {method}')
self._method = value.upper()
@property
def port(self):
'Port the request will use when contacting the server.'
return self.url.port
@property
def secure(self):
'Whether or not the request will use SSL.'
return self.url.proto in ['https', 'wss', 'ftps']
@property
def url(self) -> Url:
'The URL of the request'
return Url(self.full_url)
@url.setter
def url(self, url):
self.full_url = Url(url)
def set_chunked(self, value:True):
'Set the `Encoding` header to `chunked` if `value` is `True`'
if convert_to_boolean(value):
self.set_header('Encoding', 'chunked')
self.unset_header('Content-Length')
else:
self.unset_header('Encoding')
self.set_header('Content-Length', len(self.body))
def get_header(self, key):
'Get a header if it exists'
return self.headers.get(key)
def set_header(self, key, value):
'Set a header to the specified value'
self.headers[key] = value
def unset_header(self, key):
'Remove a header'
self.headers.pop(key, None)
def sign(self, privkey, keyid):
if not sign_request:
raise AttributeError('PyCryptodome not installed. Request signing is disabled.')
def update_headers(self, data={}, **kwargs):
'Update headers'
kwargs.update(data)
self.headers.update(kwargs)
sign_request(self, privkey, keyid)
def set_type(self, content_type):
'Set the `Content-Type` header. Either a mimetype or file extension can be used.'
self.set_header('Content-Type', content_types.get(f'.{content_type}', content_type))
def set_type_from_body(self) -> str:
'Set the `Content-Type` header based on `HttpClientRequest.body` and return the mimetype.'
if not self.body:
return
mimetype = magic.from_buffer(self.body, mime=True)
self.set_type(mimetype)
return mimetype
def sign_headers(self, privkey:str, keyid:str):
'[Not implemented yet] Create an ActivityPub signature of the headers via a private RSA key. Will create `Host` and `Date` headers if they do not exist.'
raise ImportError(f'Not implemented yet')
if not sign_request:
raise ImportError(f'Could not import HTTP signatures. Header signing disabled')
return sign_request(self, privkey, keyid)

View file

@ -1,62 +1,82 @@
import json
from io import BytesIO
from izzylib import DefaultDotDict, DotDict, Path, Url
from ..dotdict import DotDict
from ..http_utils import Headers
from ..url import Url
class HttpUrllibClientResponse:
headers = None
cookies = None
class HttpUrllibResponse:
def __init__(self, response):
self.response = response
self.__response = response
self.__body = b''
self.__url = Url(response.url)
self._dict = None
headers = []
cookies = []
for key, value in response.getheaders():
if key.lower in ['set-cookie']:
cookies.append(value)
else:
headers.append((key, value))
self.headers = Headers(headers, readonly=True)
#self.cookies = Cookies(cookies, readonly=True)
def __getitem__(self, key):
return self.dict[key]
def __setitem__(self, key, value):
self.dict[key] = value
@property
def encoding(self):
for line in self.headers.get('content-type', '').split(';'):
try:
k,v = line.split('=')
if k.lower == 'charset':
return v.lower()
except:
pass
return 'utf-8'
@property
def headers(self):
return self.response.headers
@property
def status(self):
return self.response.status
@property
def url(self):
return Url(self.response.geturl())
return self.get_header(key.capitalize())
@property
def body(self):
data = self.response.read(cache_content=True)
if not self.__body:
self.read()
if not data:
data = self.response.data
return self.__body
return data
# todo: fetch encoding from headers if possible
@property
def encoding(self):
return 'utf-8'
@property
def status(self):
return self.__response.status
@property
def url(self):
return self.__url
@property
def version(self):
vers = self.__response.version
if vers == 10:
return 1.0
elif vers == 11:
return 1.1
elif vers == 20:
return 2.0
print('Warning! Invalid HTTP version:', type(vers).__name__, vers)
return vers
@property
def bytes(self):
return self.__body
@property
@ -65,33 +85,16 @@ class HttpUrllibResponse:
@property
def dict(self):
if not self._dict:
self._dict = DotDict(self.text)
return self._dict
def json(self):
return DotDict(self.body)
def json_pretty(self, indent=4):
return self.dict.to_json(indent)
def get_header(self, name):
return self.headers.get(name.lower())
def chunks(self, size=1024):
return self.response.stream(amt=size)
def read(self, amount=None):
data = self.__response.read(amount)
self.__body += data
def save(self, path, overwrite=True, create_parents=True):
path = Path(path)
if not path.parent.exists:
if not create_parents:
raise ValueError(f'Path does not exist: {path.parent}')
path.parent.mkdir()
if overwrite and path.exists:
path.delete()
with path.open('wb') as fd:
for chunk in self.chunks():
fd.write(chunk)
return data

View file

@ -1,168 +1,168 @@
import asyncio, json
from datetime import datetime, timezone
from io import BytesIO
from .datestring import DateString
from .dotdict import DotDict
from .misc import DateString
from .misc import class_name
from .url import Url
from typing import Dict
cookie_fields = {
'expires': 'Expires',
'max_age': 'Max-Age',
'domain': 'Domain',
'path': 'Path',
'secure': 'Secure',
'httponly': 'HttpOnly',
'samesite': 'SameSite'
}
samesite_values = {
'String',
'Lax',
'None'
}
def parse_header_key_name(key):
return key.replace('_', '-').title()
def parse_cookie_key_name(key):
return cookie_fields.get(key.replace('_', '-').lower(), key)
if key not in cookie_fields.values():
raise KeyError(f'Invalid cookie key: {key}')
return key
methods = [
'CONNECT',
'DELETE',
'GET',
'HEAD',
'OPTIONS',
'PATCH',
'POST',
'PUT',
'TRACE'
]
class Headers(DotDict):
__readonly = False
_readonly = False
_cookie_header = None
def __init__(self, data=(), readonly=False):
super().__init__()
for key, value in data:
self[key] = value
self.__readonly = readonly
def __init__(self, *args, request=True, readonly=False, **kwargs):
super().__init__(*args, **kwargs)
self._cookie_header = 'Cookie' if request else 'Set-Cookie'
self._readonly = readonly
def __getitem__(self, key):
return super().__getitem__(parse_header_key_name(key))
return super().__getitem__(self._format_key(key))
def __setitem__(self, key, value):
if key.startswith('_'):
super().__setattr__(key, value)
if self._readonly:
raise ReadOnlyError('Headers are readonly')
key = self._format_key(key)
if type(value) == HeaderItem:
super().__setitem__(key, value)
return
if self.readonly:
raise AssertionError('Headers are read-only')
if key == self._cookie_header:
if type(value) == str:
value = CookieItem.from_string(value)
key = parse_header_key_name(key)
elif type(value) != CookieItem:
raise TypeError(f'{key} header must be a `str` or `CookieItem`, not a `{class_name(value)}`')
if key in ['Cookie', 'Set-Cookie']:
izzylog.warning('Do not set the "Cookie" or "Set-Cookie" headers')
return
try:
for cookie in self[key]:
if cookie.key == value.key:
new_data = {k:v for k,v in value.as_dict().items() if k != key}
cookie.value = value.value
cookie.update()
return
self[key].append(value)
return
except KeyError:
pass
elif key == 'Date':
value = DateString(value, 'http')
try:
super().__getitem__(key).update(value)
elif key == 'Content-Length':
value = int(value)
except KeyError:
super().__setitem__(key, HeaderItem(key, value))
super().__setitem__(key, HeaderItem(key, value))
#try:
#self[key].append(value)
#except KeyError:
#super().__setitem__(key, HeaderItem(key, value))
def __delitem__(self, key):
if self.readonly:
raise AssertionError('Headers are read-only')
super().__delitem__(parse_header_key_name(key))
super().__delitem__(self._format_key(key))
@property
def readonly(self):
return self.__readonly
def _format_key(self, key):
return key.replace('_', '-').title()
def get_one(self, key):
return self[key].one()
def append(self, key, value):
try:
self[key].append(value)
except KeyError:
self[key] = value
def set(self, key, value):
def cookies(self):
for cookie in self.get(self._cookie_header, []):
yield cookie
def items(self):
for key, item in super().items():
for value in item:
yield (key, value)
def get(self, key, default=None):
try:
self.__getitem__(key)
except KeyError:
return default
def get_cookie(self, key):
for cookie in self.cookies():
if cookie.key == key:
return cookie
raise KeyError(key)
def get_one(self, key, default=None):
try:
return self[key].one()
except (KeyError, IndexError):
return default
def new_cookie(self, key, value, **kwargs):
return self.set_cookie(CookieItem(key, value, **kwargs))
def new_cookie_from_string(self, value):
return self.set_cookie(CookieItem.from_string(value))
def set_all(self, key, value):
try:
self[key].set(value)
except:
except KeyError:
self[key] = value
def update(self, data={}, **kwargs):
kwargs.update(data)
for key, value in kwargs.items():
self[key] = value
class Cookies(DotDict):
__readonly = False
def __init__(self, cookies=[], readonly=False):
super().__init__()
for cookie in cookies:
self.new_from_string(cookie)
self.__readonly = readonly
def __setattr__(self, key, value):
if key.startswith('_'):
super().__setattr__(key, value)
return
if self.readonly:
raise AssertionError('Cookies are read-only')
if isinstance(value, str):
value = CookieItem.from_string(value)
elif not isinstance(value, CookieItem):
raise TypeError(f'Cookie must be a str or CookieItem, not a {type(value).__name__}')
super().__setattr__(key, value)
def __delattr__(self, key):
if self.readonly:
raise AssertionError('Cookies are read-only')
super().__delattr__(key)
@property
def readonly(self):
return self.__readonly
def new_from_string(self, string):
cookie = CookieItem.new_from_string(string)
self[cookie.name] = cookie
def set_cookie(self, cookie):
self[self._cookie_header] = cookie
return cookie
def update(self, data={}, **kwargs):
kwargs.update(data)
for key, value in kwargs.items():
self[key] = value
class HeaderItem(list):
def __init__(self, key, *values):
super().__init__(values)
def __init__(self, key, values):
super().__init__()
self.update(values)
self.key = key
@ -170,6 +170,10 @@ class HeaderItem(list):
return ','.join(str(v) for v in self)
def __repr__(self):
return f'HeaderItem({self.key})'
def set(self, *values):
self.clear()
@ -188,121 +192,44 @@ class HeaderItem(list):
class CookieItem(DotDict):
def __init__(self, key, value, **kwargs):
super().__init__(kwargs)
self.key = key
self.value = value
self.args = DotDict()
try:
parse_cookie_key_name(key)
raise ValueError(f'The key name for a cookie cannot be {key}')
for k,v in kwargs.items():
if k not in cookie_params.values():
raise AttributeError(f'Not a valid cookie parameter: {key}')
except KeyError:
pass
self.__key = key
self.__value = value
def __getitem__(self, key):
return parse_cookie_key_name(key)
def __setitem__(self, key, value):
key = parse_cookie_key_name(key)
if key == 'Expires':
value = self._parse_expires(value)
elif key == 'Max-Age':
value = self._parse_max_age(value)
elif key == 'SameSite':
value = self._parse_samesite(value)
elif key in ['Secure', 'HttpOnly']:
value = boolean(value)
super().__setitem__(key, value)
def __delitem__(self, key):
super().__delitem(parse_cookie_key_name(key))
setattr(self, k, v)
def __str__(self):
text = f'{self.key}={self.value}'
try: text += f'; Expires={self.expires}'
except KeyError: pass
if self.expires:
text += f'; Expires={self.expires.strftime("%a, %d %b %Y %H:%M:%S GMT")}'
try: text += f'; Max-Age={self.maxage}'
except KeyError: pass
if self.maxage != None:
text += f'; Max-Age={self.maxage}'
try: text += f'; Domain={self.domain}'
except KeyError: pass
if self.domain:
text += f'; Domain={self.domain}'
try: text += f'; Path={self.path}'
except KeyError: pass
if self.path:
text += f'; Path={self.path}'
try: text += f'; SameSite={self.samesite}'
except KeyError: pass
if self.samesite:
text += f'; SameSite={self.samesite}'
if self.get('secure'):
if self.secure:
text += f'; Secure'
if self.get('httponly'):
if self.httponly:
text += f'; HttpOnly'
return text
def _parse_expires(self, data):
if type(data) == str:
data = DateString.new_http(data)
elif type(data) in [int, float]:
data = DateString.from_timestamp(data, 'http')
elif type(data) == datetime:
data = DateString.from_datetime(data, 'http')
elif type(data) == timedelta:
data = DateSTring.from_datetime(datetime.now(timezone.utc) + data, 'http')
elif type(data) != DateString:
raise TypeError(f'Expires must be a http date string, timestamp, datetime, or DateString object, not {type(data).__name__}')
return data
def _parse_max_age(self, data):
if isinstance(data, float):
data = int(data)
elif isinstance(date, timedelta):
data = data.seconds
elif isinstance(date, datetime):
data = (datetime.now() - date).seconds
elif not isinstance(data, int):
raise TypeError(f'Max-Age must be an integer, timedelta object, or datetime object, not {data.__class__.__name__}')
return data
def _parse_samesite(self, data):
if isinstance(data, bool):
data = 'Strict' if data else 'None'
elif isinstance(data, str):
if data.title() not in samesite_values:
raise ValueError(f'Valid SameSite values are Strict, Lax, or None, not {data}')
else:
raise TypeError(f'SameSite must be a boolean or string, not {data.__class__.__name__}')
return data.title()
@classmethod
def from_string(cls, data):
kwargs = {}
@ -333,8 +260,125 @@ class CookieItem(DotDict):
return cls(key, value, **kwargs)
@property
def expires(self):
return self.args.get('Expires')
@expires.setter
def expires(self, data):
if isinstance(data, str):
data = DateString(data, 'http')
elif isinstance(data, int) or isinstance(data, float):
data = datetime.fromtimestamp(data).replace(tzinfo=timezone.utc)
elif isinstance(data, datetime):
if not data.tzinfo:
data = data.replace(tzinfo=timezone.utc)
elif isinstance(data, timedelta):
data = datetime.now(timezone.utc) + data
else:
raise TypeError(f'Expires must be a http date string, timestamp, or datetime object, not {data.__class__.__name__}')
self.args['Expires'] = data
@property
def maxage(self):
return self.args.get('Max-Age')
@maxage.setter
def maxage(self, data):
if isinstance(data, int):
pass
elif isinstance(date, timedelta):
data = data.seconds
elif isinstance(date, datetime):
data = (datetime.now() - date).seconds
else:
raise TypeError(f'Max-Age must be an integer, timedelta object, or datetime object, not {data.__class__.__name__}')
self.args['Max-Age'] = data
@property
def domain(self):
return self.args.get('Domain')
@domain.setter
def domain(self, data):
if not isinstance(data, str):
raise ValueError(f'Domain must be a string, not {data.__class__.__name__}')
self.args['Domain'] = data
@property
def path(self):
return self.args.get('Path')
@path.setter
def path(self, data):
if not isinstance(data, str):
raise ValueError(f'Path must be a string or izzylib.Path object, not {data.__class__.__name__}')
self.args['Path'] = Path(data)
@property
def secure(self):
return self.args.get('Secure')
@secure.setter
def secure(self, data):
self.args['Secure'] = boolean(data)
@property
def httponly(self):
return self.args.get('HttpOnly')
@httponly.setter
def httponly(self, data):
self.args['HttpOnly'] = boolean(data)
@property
def samesite(self):
return self.args.get('SameSite')
@samesite.setter
def samesite(self, data):
if isinstance(data, bool):
data = 'Strict' if data else 'None'
elif isinstance(data, str) and data.title() in ['Strict', 'Lax', 'None']:
data = data.title()
else:
raise TypeError(f'SameSite must be a boolean or one of Strict, Lax, or None, not {data.__class__.__name__}')
self.args['SameSite'] = data
self.args['Secure'] = True
def as_dict(self):
return DotDict({self.key: self.value}, **self)
data = DotDict({self.key: self.value})
data.update(self.args)
return data
def set_defaults(self):
@ -347,3 +391,70 @@ class CookieItem(DotDict):
self.maxage = 0
return self
def parse_headers(raw_headers, request=True):
data = DotDict(
headers = []
)
for idx, line in enumerate(raw_headers.decode('utf-8').splitlines()):
if idx == 0:
if request:
method, path, version = line.split()
data.update({
'method': method,
'path': Path(path)
})
else:
version, status, *message = line.split()
data.update({
'status': int(status),
'message': ' '.join(message)
})
else:
try: key, value = line.split(': ', 1)
except: continue
data.headers.append((key, value))
data.version = float(version.split('/')[1])
return data
def create_message(first_line: str, headers: Dict[str,str], body: bytes=None):
data = first_line
for k,v in headers.items():
data += f'\r\n{k}: {v}'
data += '\r\n\r\n'
data = data.encode('utf-8')
if body:
if not body.endswith(b'\r\n\r\n'):
body += b'\r\n\r\n'
return data + body
return data
def create_request_message(method, path, body=None, headers=None, version='1.1'):
assert method.upper() in methods
return create_message(f'{method.upper()} {path} HTTP/{version}', headers, body)
def create_response_message(status, message=None, body=None, headers=None, version='1.1'):
msg = f'HTTP/{version} {status}'
if message:
msg += f' {message}'
return create_message(msg, headers, body)

View file

@ -1,18 +1,9 @@
import os, queue, sys, threading, time
from datetime import datetime
from enum import IntEnum
from pathlib import Path
class Levels(IntEnum):
CRITICAL = 60,
ERROR = 50
WARNING = 40
INFO = 30
VERBOSE = 20
DEBUG = 10
MERP = 0
from .enums import LogLevel
class Log:
@ -24,14 +15,14 @@ class Log:
def __init__(self, name, **config):
self.name = name
self.level = Levels.INFO
self.level = LogLevel.INFO
self.date = True
self.format = '%Y-%m-%d %H:%M:%S'
self.logfile = None
self.update_config(**config)
for level in Levels:
for level in LogLevel:
self._set_log_function(level)
@ -57,8 +48,8 @@ class Log:
def parse_level(self, level):
try: return Levels(int(level))
except ValueError: return Levels[level.upper()]
try: return LogLevel(int(level))
except ValueError: return LogLevel[level.upper()]
def set_level_from_env(self, env_name):
@ -96,7 +87,7 @@ class Log:
def log(self, level, *msg):
if isinstance(level, str):
Levels[level.upper()]
LogLevel[level.upper()]
if level < self.level:
return

View file

@ -2,10 +2,11 @@ import json
from . import __version__
from .config import BaseConfig
from .datestring import DateString
from .dotdict import DotDict
from .exceptions import HttpError
from .http_client import HttpClient
from .misc import DateString, Url
from .url import Url
class MastodonApi:

View file

@ -13,6 +13,7 @@ import time
import timeit
from datetime import datetime, timedelta, timezone
from functools import cached_property
from getpass import getpass, getuser
from importlib import util
from subprocess import Popen, PIPE
@ -21,11 +22,12 @@ from urllib.request import urlopen
from . import izzylog
from .dotdict import DotDict
from .enums import DatetimeFormat
from .path import Path
# typing modules
from collections.abc import Callable
from typing import *
from typing import Any, Dict, Union
__all__ = [
@ -54,42 +56,9 @@ __all__ = [
'timestamp',
'var_name',
'ArgParser',
'Argument',
'DateString',
'Url'
'Argument'
]
__pdoc__ = {
'DateString.dt': '`datetime.datetime` object used to store the date',
'DateString.format': 'The format used to create the str',
'DateString.tz_local': 'The local timezone as a `datetime.timezone` object',
'DateString.tz_utc': 'UTC timezone as a `datetime.timezone` object',
'Url.anchor': 'Text after the "#" at the end of the url',
'Url.host': 'Hostname or IP address portion of the url',
'Url.password': 'Password portion of the url',
'Url.path': 'Path portion of a url',
'Url.port': 'Port number for the url. It one is not specified, it will be guessed based on the protocol',
'Url.proto': 'Protocol portion of the url',
'Url.query': 'Query key/value pairs as a dict',
'Url.query_string': 'Query key/value pairs as a string',
'Url.username': 'Username portion of the url'
}
datetime_formats = {
'http': '%a, %d %b %Y %H:%M:%S GMT',
'activitypub': '%Y-%m-%dT%H:%M:%SZ',
'activitypub-date': '%Y-%m-%d'
}
protocol_ports = DotDict(
HTTP = 80,
HTTPS = 443,
WS = 80,
WSS = 443,
FTP = 21,
FTPS = 990
)
def app_data_dirs(author:str, name:str) -> Dict[str,Path]:
'Returns the cache and config paths for software'
@ -621,266 +590,5 @@ class Argument(DotDict):
self.callback = lambda *cliargs: callback(*cliargs, *args, **kwargs)
class DateString(str):
'Create a `str` based on a datetime object'
tz_utc = timezone.utc
tz_local = datetime.now(tz_utc).astimezone().tzinfo
dt = None
format = None
def __init__(self, string, format):
assert format in datetime_formats
self.dt = datetime.strptime(string, datetime_formats[format]).replace(tzinfo=self.tz_utc)
self.format = format
def __new__(cls, string, format):
date = str.__new__(cls, string)
return date
def __getattr__(self, key):
return getattr(self.dt, key)
def __repr__(self):
return f'DateString({self}, format={self.format})'
@classmethod
def new_activitypub(cls, date):
'Create a new `DateString` for use in ActivityPub or Mastodon API messages'
return cls(date, 'activitypub')
@classmethod
def new_http(cls, date):
'Create a new `DateString` for use in HTTP messages'
return cls(date, 'http')
@classmethod
def from_datetime(cls, date, format):
'Create a new `DateString` from an existing `datetime.datetime` object'
assert format in datetime_formats
return cls(date.astimezone(cls.tz_utc).strftime(datetime_formats[format]), format)
@classmethod
def from_timestamp(cls, timestamp, format):
'Create a new `DateString` from a unix timestamp'
return cls.from_datetime(datetime.fromtimestamp(timestamp), format)
@classmethod
def now(cls, format):
'Create a new `DateString` from the current time'
return cls.from_datetime(datetime.now(cls.tz_utc), format)
@property
def http(self):
'Return a new `DateString` in HTTP format'
return DateString(self.dump_to_string('http'), 'http')
@property
def activitypub(self):
'Return a new `DateString` in ActivityPub format'
return DateString(self.dump_to_string('activitypub'), 'activitypub')
@property
def utc(self):
'Return a new `DateString` in UTC time'
return DateString.from_datetime(self.dt.astimezone(self.tz_utc), self.format)
@property
def local(self):
'Return a new `DateString` in local time'
return DateString.from_datetime(self.dt.astimezone(self.tz_local), self.format)
def dump_to_string(self, format=None):
'Return the date as a normal string in the specified format'
if not format: format = self.format
assert format in datetime_formats
return self.dt.strftime(datetime_formats[format])
class Url(str):
'`str` representation of a url parsed with urlparse'
proto:str = None
host:str = None
port:int = None
path:Path = None
query_string:str = None
query:DotDict = None
username:str = None
password:str = None
anchor:str = None
def __init__(self, url):
self._tldcache = None
self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt')
if isinstance(url, str):
parsed = urlparse(url)
else:
parsed = url
#if not all([parsed.scheme, parsed.netloc]):
#raise TypeError('Not a valid url')
self._parsed = parsed
self.proto = parsed.scheme
self.port = protocol_ports.get(self.proto.upper()) if not parsed.port else None
self.path = parsed.path
self.query_string = parsed.query
self.query = DotDict.new_from_query_string(parsed.query)
self.username = parsed.username
self.password = parsed.password
self.anchor = parsed.fragment
try:
self.domain = parsed.netloc.split('@')[1]
except:
self.domain = parsed.netloc
@property
def top(self) -> str:
'Returns the domain without sub-domains'
if not self._tldcache:
if not self._tldcache_path.exists() or self._tldcache_path.mtime + timedelta(hours=24) < datetime.now():
resp = urlopen('https://publicsuffix.org/list/public_suffix_list.dat')
with self._tldcache_path.open('w') as fd:
for line in resp.read().decode('utf-8').splitlines():
if 'end icann domains' in line.lower():
break
if not line or line.startswith('//'):
continue
if line.startswith('*'):
line = line[2:]
fd.write(line + '\n')
with self._tldcache_path.open() as fd:
self._tldcache = set(fd.readlines())
@property
def path_full(self) -> str:
'Return the path with the query pairs and anchor'
string = self.path
if self.query_string:
string += f'?{query_string}'
if self.anchor:
string += f'#{self.anchor}'
return string
@property
def host(self) -> str:
'The domain and, if set, port as a string'
if self.port:
return f'{self.domain}:{self.port}'
return self.domain
@property
def without_query(self) -> str:
'Return the url without the query or anchor on the end'
return Url(self.split('?')[0])
@property
def dict(self) -> DotDict:
'Return the parsed url as a dict'
return DotDict(
proto = self.proto,
domain = self.domain,
port = self.port,
path = self.path,
query = self.query,
username = self.username,
password = self.password,
anchor = self.anchor
)
@classmethod
def new(cls, domain:str, path:Union[Path,str]='/', proto:str='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None):
'Create a new `Url` based on the url parts'
if port == protocol_ports.get(proto):
port = None
url = f'{proto}://'
if username and password:
url += f'{username}:{password}@'
elif username:
url += f'{username}@'
url += domain
if port:
url += f':{port}'
url += '/' + path if not path.startswith('/') else path
if query:
url += '?' + '&'.join(f'{quote(key)}={quote(value)}' for key,value in query.items())
if anchor:
url += f'#{anchor}'
return cls(url)
def join(self, new_path):
'Add to the path portion of the url'
data = self.dict
domain = data.pop('domain')
data['path'] = data['path'].join(new_path)
return self.new(domain, **data)
def replace_property(self, key, value):
data = self.dict
if key not in data.keys():
raise KeyError('Invalid Url property')
data[key] = value
return self.new(*data)
# compat
boolean = convert_to_boolean

70
izzylib/object_base.py Normal file
View file

@ -0,0 +1,70 @@
from .exceptions import ReadOnlyError
from .misc import class_name
class ObjectBase:
__props = {}
def __init__(self, readonly_props=[], **kwargs):
self.__props = dict()
self.__readonly = tuple(readonly_props)
for key, value in kwargs.items():
self.set_property(key, value)
def __repr__(self):
attrs = ', '.join(f'{key}={repr(value)}' for key, value in self.__properties)
return f'{class_name(self)}({attrs})'
def __getattr__(self, key):
if key not in self.__props:
return object.__getattribute__(self, key)
return self.__props[key]
def __setattr__(self, key, value):
if key not in self.__props:
return object.__setattr__(self, key, value)
if key in self.__readonly:
raise ReadOnlyError(key)
self.__props[key] = self._parse_property(key, value)
def __delattr__(self, key):
if key not in self.__props:
return object.__delattr__(self, key, value)
del self.__props[key]
#def __getitem__(self, key):
#return self.__getattr__(key)
#def __setitem__(self, key, value):
#self.__setattr__(key, value)
#def __delitem__(self, key):
#self.__delattr__(key)
def _parse_property(self, key, value):
return value
def get_property(self, key):
return self.__props[key]
def set_property(self, key, value):
self.__props[key] = self._parse_property(key, value)
def set_readonly(self, *props):
self.__readonly = tuple([*props, *self.__readonly])

View file

@ -1,9 +1,11 @@
import enum, json, os, shutil, sys
import json, os, shutil, sys
from datetime import datetime
from functools import cached_property
from pathlib import Path as PyPath
from .enums import PathType
linux_prefix = dict(
bin = '.local/bin',
@ -19,17 +21,17 @@ class TinyDotDict(dict):
__delattr__ = dict.__delitem__
class PathType(enum.Enum):
DIR = 'dir'
FILE = 'file'
LINK = 'link'
UNKNOWN = 'unknown'
class PathMeta(type):
@property
def home(cls):
return cls('~').expanduser()
def cache(cls):
try:
return cls(os.environ['XDG_CACHE_HOME'])
except KeyError:
return cls.home.join('.cache')
@property
@ -37,17 +39,17 @@ class PathMeta(type):
return cls(os.getcwd()).resolve()
@property
def home(cls):
return cls('~').expanduser()
@property
def script(cls):
try: path = sys.modules['__main__'].__file__
except: path = sys.argv[0]
return Path(path).parent
@property
def cache(cls):
return cls.home.join('.cache')
return cls(path).parent
def module(cls, module):

243
izzylib/url.py Normal file
View file

@ -0,0 +1,243 @@
import socket
from datetime import datetime, timedelta
from functools import cached_property
from urllib.parse import urlparse, quote
from urllib.request import urlopen
from . import izzylog
from .dotdict import DotDict
from .enums import Protocol
from .path import Path
# typing modules
from typing import Union
__all__ = ['Url']
__pdoc__ = {
'Url.anchor': 'Text after the "#" at the end of the url',
'Url.host': 'Hostname or IP address portion of the url',
'Url.password': 'Password portion of the url',
'Url.path': 'Path portion of a url',
'Url.port': 'Port number for the url. It one is not specified, it will be guessed based on the protocol',
'Url.proto': 'Protocol portion of the url',
'Url.query': 'Query key/value pairs as a dict',
'Url.query_string': 'Query key/value pairs as a string',
'Url.username': 'Username portion of the url'
}
class Url(str):
'`str` representation of a url parsed with urlparse'
proto:str = None
host:str = None
port:int = None
path:Path = None
query_string:str = None
query:DotDict = None
username:str = None
password:str = None
anchor:str = None
def __init__(self, url):
self._tldcache = None
self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt')
if isinstance(url, str):
parsed = urlparse(url)
else:
parsed = url
#if not all([parsed.scheme, parsed.netloc]):
#raise TypeError('Not a valid url')
self._parsed = parsed
self.proto = parsed.scheme
if not parsed.port:
try:
self.port = Protocol[self.proto.upper()].value
except KeyError:
self.port = None
else:
self.port = None
self.path = parsed.path
self.query_string = parsed.query
self.query = DotDict.new_from_query_string(parsed.query)
self.username = parsed.username
self.password = parsed.password
self.anchor = parsed.fragment
try:
self.domain = parsed.netloc.split('@')[1]
except:
self.domain = parsed.netloc
@property
def dict(self) -> DotDict:
'Return the parsed url as a dict'
return DotDict(
proto = self.proto,
domain = self.domain,
port = self.port,
path = self.path,
query = self.query,
username = self.username,
password = self.password,
anchor = self.anchor
)
@property
def host(self) -> str:
'The domain and, if set, port as a string'
if self.port:
return f'{self.domain}:{self.port}'
return self.domain
@cached_property
def ipaddress(self) -> str:
'Resolve the domain to an IP address'
return socket.gethostbyname(self.domain)
@property
def path_full(self) -> str:
'Return the path with the query pairs and anchor'
string = self.path
if self.query_string:
string += f'?{query_string}'
if self.anchor:
string += f'#{self.anchor}'
return string
@property
def top(self) -> str:
'Returns the domain without sub-domains'
if not self.domain and not self.path:
raise ValueError('No domain')
domain = self.domain or self.path
if not self._tldcache:
if not self._tldcache_path.exists() or self._tldcache_path.mtime + timedelta(days=7) < datetime.now():
resp = urlopen('https://publicsuffix.org/list/public_suffix_list.dat')
with self._tldcache_path.open('w') as fd:
for line in resp.read().decode('utf-8').splitlines():
if 'end icann domains' in line.lower():
break
if not line or line.startswith('//'):
continue
if line.startswith('*'):
line = line[2:]
fd.write(line + '\n')
with self._tldcache_path.open() as fd:
self._tldcache = set(fd.read().splitlines())
domain_split = domain.split('.')
try:
if '.'.join(domain_split[-2:]) in self._tldcache:
return '.'.join(domain_split[-3:])
except IndexError:
pass
if '.'.join(domain_split[-1:]) in self._tldcache:
return '.'.join(domain_split[-2:])
raise ValueError('Cannot find TLD')
@property
def without_query(self) -> str:
'Return the url without the query or anchor on the end'
return Url(self.split('?')[0])
@classmethod
def new(cls, domain:str, path:Union[Path,str]='/', proto:Union[str,Protocol]='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None):
'Create a new `Url` based on the url parts'
if type(proto) == str:
try:
protocol = Protocol[proto.upper()]
except KeyError:
protocol = None
if protocol and port == protocol.value:
port = None
url = f'{proto}://'
if username and password:
url += f'{username}:{password}@'
elif username:
url += f'{username}@'
url += domain
if port:
url += f':{port}'
url += '/' + path if not path.startswith('/') else path
if query:
url += '?' + '&'.join(f'{quote(key)}={quote(value)}' for key,value in query.items())
if anchor:
url += f'#{anchor}'
return cls(url)
def copy(self):
return Url.new(**self.dict)
def join(self, new_path):
'Add to the path portion of the url'
data = self.dict
domain = data.pop('domain')
data['path'] = data['path'].join(new_path)
return self.new(domain, **data)
def replace_property(self, key, value):
data = self.dict
if key not in data.keys():
raise KeyError('Invalid Url property')
data[key] = value
return self.new(**data)