diff --git a/izzylib/__init__.py b/izzylib/__init__.py index 81a8c50..f7159de 100644 --- a/izzylib/__init__.py +++ b/izzylib/__init__.py @@ -15,20 +15,14 @@ from . import logging izzylog = logging.logger['IzzyLib'] izzylog.set_config('level', os.environ.get('IZZYLOG_LEVEL', 'INFO')) -from .dotdict import ( - DotDict, - DefaultDotDict, - LowerDotDict, - MultiDotDict, - JsonEncoder -) - from .path import * +from .datestring import * from .dotdict import * from .misc import * from .cache import * from .config import * from .connection import * +from .url import * from .http_client import ( HttpClient, HttpClientRequest, @@ -48,6 +42,7 @@ def register_global(obj, name=None): def add_builtins(*classes, **kwargs): new_builtins = [ BaseConfig, + DateString, DotDict, HttpClient, JsonConfig, diff --git a/izzylib/enums.py b/izzylib/enums.py new file mode 100644 index 0000000..9c9da65 --- /dev/null +++ b/izzylib/enums.py @@ -0,0 +1,47 @@ +from enum import Enum, IntEnum + + +__all__ = [ + 'DatetimeFormat', + 'LogLevel', + 'PathType', + 'Protocol' +] + + +class DatetimeFormat(Enum): + HTTP = '%a, %d %b %Y %H:%M:%S GMT' + ACTIVITYPUB = '%Y-%m-%dT%H:%M:%SZ' + MASTODON = '%Y-%m-%d' + + +class LogLevel(IntEnum): + CRITICAL = 60, + ERROR = 50 + WARNING = 40 + INFO = 30 + VERBOSE = 20 + DEBUG = 10 + MERP = 0 + + +class PathType(Enum): + DIR = 'dir' + FILE = 'file' + LINK = 'link' + UNKNOWN = 'unknown' + + +class Protocol(IntEnum): + FTP = 21 + SSH = 22 + SMTP = 25 + DNS = 53 + TFTP = 69 + HTTP = 80 + WS = 80 + HTTPS = 443 + WSS = 443 + FTPS = 990 + MYSQL = 3306 + PSQL = 5432 diff --git a/izzylib/exceptions.py b/izzylib/exceptions.py index decd3df..a01c09c 100644 --- a/izzylib/exceptions.py +++ b/izzylib/exceptions.py @@ -3,6 +3,10 @@ __all__ = [ ] +class ReadOnlyError(Exception): + 'Raise when a read-only property is attempted to be set' + + class DBusClientError(Exception): pass diff --git a/izzylib/http_client/__init__.py b/izzylib/http_client/__init__.py index 1fcc7cc..3632876 100644 --- a/izzylib/http_client/__init__.py +++ b/izzylib/http_client/__init__.py @@ -1,28 +1,3 @@ -content_types = { - 'json': 'application/json', - 'activity': 'application/activity+json', - 'css': 'text/css', - 'html': 'text/html', - 'js': 'application/javascript', - 'png': 'image/png', - 'jpeg': 'image/jpeg', - 'gif': 'image/gif' -} - - -http_methods = { - 'CONNECT', - 'DELETE', - 'GET', - 'HEAD', - 'OPTIONS', - 'PATCH', - 'POST', - 'PUT', - 'TRACE' -} - - from .client import HttpClient from .request import HttpClientRequest from .response import HttpClientResponse diff --git a/izzylib/http_client/client.py b/izzylib/http_client/client.py index 49be56a..0e125bb 100644 --- a/izzylib/http_client/client.py +++ b/izzylib/http_client/client.py @@ -1,210 +1,74 @@ -import functools, json, sys +import socket +import ssl - -from base64 import b64decode, b64encode -from datetime import datetime -from functools import cached_property, partial -from io import BytesIO -from ssl import SSLCertVerificationError -from urllib.error import HTTPError -from urllib.request import urlopen - -from . import http_methods -from .config import Config from .request import HttpClientRequest from .response import HttpClientResponse -from .. import izzylog, __version__ -from ..dotdict import DefaultDotDict, DotDict -from ..exceptions import HttpFileDownloadedError -from ..misc import Url -from ..path import Path +from .. import __version__ +from ..dotdict import DotDict +from ..http_utils import Headers +from ..misc import class_name +from ..object_base import ObjectBase -try: - from PIL import Image -except ImportError: - izzylog.verbose('Pillow module not found. Image downloading is disabled') - Image = False -from typing import * +class HttpClient(ObjectBase): + def __init__(self, appagent=None, headers={}, request_class=None, response_class=None): + super().__init__( + headers = ClientHeaders(headers), + timeout = 60, + request_class = request_class or HttpClientRequest, + response_class = response_class or HttpClientResponse, + readonly = ['headers'] + ) + if appagent: + self.set_appagent(appagent) -__pdoc__ = {f'Client.{method.lower()}': f'Send a {method.upper()} request' for method in http_methods} + def build_request(self, url, method='GET', body=b'', headers={}, cookies=[]): + return self.request_class(url, + method = method, + body = body, + headers = {**headers, **self.headers}, + cookies = cookies + ) -class HttpClient: - 'Basic HTTP client based on `urllib.request.urlopen`' + def request(self, url, method='GET', body=b'', headers={}, cookies=[]): + req = self.build_request(url, method, body, headers, cookies) + return self.send_request(req) - def __init__(self, **kwargs): - self._cfg = Config(**kwargs) - for method in http_methods: - self.__set_method(method) + def send_request(self, request): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + sock.settimeout(self.timeout) + sock.connect((request.url.ipaddress, request.url.port)) + if request.url.proto.lower() in ['https', 'wss']: + context = ssl.create_default_context() + sock = context.wrap_socket(sock, server_hostname=request.url.domain) - @property - def cfg(self): - 'The config options for the client as a dict' - return self._cfg + sock.sendall(request.compile()) + return self.response_class(self, sock, request.url) - def get(self, *args, **kwargs): - return self.request(*args, method='GET', **kwargs) + def set_appagent(self, appagent): + self.headers['User-Agent'] = f'IzzyLib/{__version__} ({appagent})' - def post(self, *args, **kwargs): - return self.request(*args, method='POST', **kwargs) +class ClientHeaders(DotDict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) - def head(self, *args, **kwargs): - return self.request(*args, method='HEAD', **kwargs) + def __getitem__(self, key): + return super().__getitem__(key.replace('_', '-').title()) - def connect(self, *args, **kwargs): - return self.request(*args, method='CONNECT', **kwargs) + def __setitem__(self, key, value): + super().__setitem__(key.replace('_', '-').title(), value) - def delete(self, *args, **kwargs): - return self.request(*args, method='DELETE', **kwargs) - - def put(self, *args, **kwargs): - return self.request(*args, method='PUT', **kwargs) - - - def patch(self, *args, **kwargs): - return self.request(*args, method='PATCH', **kwargs) - - - def trace(self, *args, **kwargs): - return self.request(*args, method='TRACE', **kwargs) - - - def options(self, *args, **kwargs): - return self.request(*args, method='OPTIONS', **kwargs) - - - def __set_method(self, method): - setattr(self, method.lower(), partial(self.request, method=method.upper())) - - - def build_request(self, *args, **kwargs) -> HttpClientRequest: - 'Creates a new HttpClientRequest object. It can be used with `Client.send_request` See `izzylib.http_client.request.HttpClientRequest` for available arguments..' - - request = self.cfg.request_class(*args, **kwargs) - request._set_params(self.cfg) - - return request - - - def send_request(self, request: HttpClientRequest) -> HttpClientResponse: - 'Sends a request' - - if not isinstance(request, HttpClientRequest): - raise TypeError(f'Must be a izzylib.http_client.request.HttpClientRequest object (or subclassed), not {type(request).__name__}') - - if not request._params_set: - request._set_params(self.cfg) - - try: - response = urlopen(request) - - except HTTPError as e: - response = e.fp - - return self.cfg.response_class(response) - - - def request(self, *args, **kwargs) -> HttpClientResponse: - 'Create and send a request' - - request = self.build_request(*args, **kwargs) - return self.send_request(request) - - - def json_request(self, *args, headers:dict={}, activity:bool=False, **kwargs): - 'Create and send a request with the Accept header set for JSON. If `activity` is `True`, the header value will be `aplication/activity+json`, otherwise `application/json`' - - for key in list(headers.keys()): - if key.lower() == 'accept': - del headers[key] - - json_type = 'activity+json' if activity else 'json' - headers['Accept'] = f'application/{json_type}' - - return self.request(*args, headers=headers, **kwargs) - - - def download_file(self, url:str, filepath:Path, *args, overwrite:bool=False, create_dirs:bool=True, chunk_size:int=2048, **kwargs) -> Path: - 'Make a request and save it to the specified path' - - filepath = Path(filepath) - tmppath = filepath.parent.joinpath(filepath.name + '.dltemp') - - if not overwrite and filepath.exists: - raise FileExistsError(f'File already exists: {filepath}') - - if not filepath.parent.exists: - if not create_dirs: - raise FileNotFoundError(f'Path does not exist: {filepath.parent}') - - filepath.parent.mkdir() - - if tmpath.exists: - kwargs['headers']['range'] = f'bytes={tmppath.size}' - - resp = self.request(url, *args, stream=True, **kwargs) - - if not resp.headers.get('content-length'): - try: tmppath.delete() - except: pass - - raise FileExistsError('File already downloaded fully') - - if resp.status != 200: - raise HttpError(resp) - - with tmppath.open('ab') as fd: - for chunk in resp.chunks(chunk_size): - fd.write(chunk) - - shutil.move(tmppath, filepath) - - return filepath - - - def download_image(self, url:str, filepath:Path, *args, output_format:str='png', dimensions:List[int]=None, create_dirs:bool=True, **kwargs) -> BytesIO: - ''' - Download an image and save it in the specified format. Optionally resize the image with `dimensions` up to the original size. - - example: `dimensions = [50, 50]` - ''' - - if not Image: - raise ValueError('Pillow module is not installed') - - filepath = Path(filepath) - - if not filepath.parent.exists and not create_dirs: - raise FileNotFoundError(f'Path does not exist: {filepath.parent}') - - filepath.parent.mkdir() - - resp = self.request(url, *args, **kwargs) - - if resp.status != 200: - raise HttpError(resp) - - byte = BytesIO() - image = Image.open(BytesIO(resp.body)) - - if dimensions: - image.thumbnail(dimensions) - - image.save(byte, format=output_format.upper()) - - with filepath.open('wb') as fd: - fd.write(byte.getvalue()) - - return byte + def __delitem__(self, key): + super().__delitem__(key.replace('_', '-').title()) diff --git a/izzylib/http_client/request.py b/izzylib/http_client/request.py index d206860..299023d 100644 --- a/izzylib/http_client/request.py +++ b/izzylib/http_client/request.py @@ -1,51 +1,28 @@ -import json - -from datetime import datetime -from urllib.parse import urlencode -from urllib.request import Request as PyRequest - -from . import http_methods, content_types -from ..dotdict import CapitalDotDict -from ..misc import Url, convert_to_boolean, convert_to_bytes, protocol_ports - -try: import magic -except ImportError: magic = None +from ..datestring import DateString +from ..enums import DatetimeFormat +from ..http_utils import CookieItem, Headers, create_request_message, methods +from ..object_base import ObjectBase +from ..url import Url -class HttpClientRequest(PyRequest): - def __init__(self, url:str, body:bytes=None, headers:dict={}, cookies:dict={}, method:str='GET'): - 'An HTTP request. Headers can be accessed, set, or deleted as dict items.' +class HttpClientRequest(ObjectBase): + def __init__(self, url, method='GET', body=b'', headers={}, cookies={}): + assert method.upper() in methods - super().__init__(url) - - self._url = None - self._headers = CapitalDotDict(headers) - - self.body = body - self.url = url - self.method = method - - if self.url.proto.upper() not in protocol_ports.keys(): - raise ValueError(f'Invalid protocol in url: {self.url.proto}') - - self._params_set = False + super().__init__( + url = url, + method = method, + body = body, + headers = headers, + readonly_props = ['headers'] + ) - def __getattr__(self, key): - if key == 'origin_req_host': - return object.__getattribute__(self, '_host') - - elif key == 'full_url': - return object.__getattribute__(self, '_url') - - return object.__getattribute__(self, key) + self.headers['Host'] = self.url.domain - def __setattr__(self, key, value): - if key in ['headers', 'host']: - return - - object.__setattr__(self, key, value) + def __bytes__(self): + return self.compile() def __getitem__(self, key): @@ -53,154 +30,71 @@ class HttpClientRequest(PyRequest): def __setitem__(self, key, value): - self.headers[key] = str(value) + self.headers[key] = value def __delitem__(self, key): del self.headers[key] - def _set_params(self, config): - self.headers.update(config.headers) + def _parse_property(self, key, value): + if key == 'url': + if not isinstance(value, Url): + value = Url(value) - if config.proxy_host: - self.set_proxy(f'{config.proxy_host}:{config.proxy_port}', config.proxy_type) + value = value.replace_property('anchor', None) - self._params_set = True + elif key == 'body': + if not isinstance(value, bytes): + value = convert_to_bytes(value) + + elif key == 'headers': + value = Headers(value) + + return value @property - def body(self): - 'Data to be sent along with the request' - return self._data + def cookies(self): + for cookie in self.headers.get('cookie', []): + yield cookie - @body.setter - def body(self, data): - self._data = convert_to_bytes(data) - #self.set_header('Content-Length', len(self._data)) + def compile(self): + if 'Content-Length' not in self.headers and self.body: + self.headers['Content-Length'] = len(self.body) + + if 'Host' not in self.headers: + self.headers['Host'] = DateString.now(DatetimeFormat.HTTP) + + return create_request_message(self.method, self.url.path, body=self.body, headers=self.headers) - @property - def data(self): - 'Alias for `HttpClientRequest.body`' - return self._data + def get_cookie(self, key): + return self.headers['Cookies'] - @data.setter - def data(self, data): - self.body = data + def get_header(self, key, default=None): + try: + return self.headers[key] + except KeyError: + self.headers[key] = default - @property - def headers(self): - 'Headers to send with this request.' - return self._headers - - - @headers.setter - def headers(self, data:dict): - self._headers.update(dict) - - - @property - def host(self): - 'Domain the request will be sent to.' - return self.url.host - - - @property - def method(self): - 'Method used for this request.' - return self._method - - - @method.setter - def method(self, value): - if value.upper() not in http_methods: - raise ValueError(f'Invalid HTTP method: {method}') - - self._method = value.upper() - - - @property - def port(self): - 'Port the request will use when contacting the server.' - return self.url.port - - - @property - def secure(self): - 'Whether or not the request will use SSL.' - return self.url.proto in ['https', 'wss', 'ftps'] - - - @property - def url(self) -> Url: - 'The URL of the request' - return Url(self.full_url) - - - @url.setter - def url(self, url): - self.full_url = Url(url) - - - def set_chunked(self, value:True): - 'Set the `Encoding` header to `chunked` if `value` is `True`' - - if convert_to_boolean(value): - self.set_header('Encoding', 'chunked') - self.unset_header('Content-Length') - - else: - self.unset_header('Encoding') - self.set_header('Content-Length', len(self.body)) - - - def get_header(self, key): - 'Get a header if it exists' - return self.headers.get(key) + return self.headers[key] def set_header(self, key, value): - 'Set a header to the specified value' self.headers[key] = value - def unset_header(self, key): - 'Remove a header' - self.headers.pop(key, None) + def set_cookie(self, key, value, **kwargs): + for cookie in self.cookies: + if key == cookie.key: + cookie.value = value + cookie.update(kwargs) + return cookie - - def update_headers(self, data={}, **kwargs): - 'Update headers' - kwargs.update(data) - self.headers.update(kwargs) - - - def set_type(self, content_type): - 'Set the `Content-Type` header. Either a mimetype or a `content_types` key can be used.' - self.set_header('Content-Type', content_types.get(content_type, content_type)) - - - def set_type_from_body(self) -> str: - 'Set the `Content-Type` header based on `HttpClientRequest.body` and return the mimetype.' - - if not self.body: - return - - mimetype = magic.from_buffer(self.body, mime=True) - self.set_type(mimetype) - return mimetype - - - def sign_headers(self, privkey:str, keyid:str): - '[Not implemented yet] Create an ActivityPub signature of the headers via a private RSA key. Will create `Host` and `Date` headers if they do not exist.' - - raise ImportError(f'Not implemented yet') - - if not sign_request: - raise ImportError(f'Could not import HTTP signatures. Header signing disabled') - - return sign_request(self, privkey, keyid) + item = CookieItem(key, value, **kwargs) + self.cookies.append(item) + return item diff --git a/izzylib/http_client/response.py b/izzylib/http_client/response.py index a892f54..20bf2a4 100644 --- a/izzylib/http_client/response.py +++ b/izzylib/http_client/response.py @@ -1,100 +1,178 @@ -from io import BytesIO +import traceback from ..dotdict import DotDict -from ..http_utils import Cookies, Headers -from ..misc import Url +from ..http_utils import Headers, parse_headers +from ..misc import class_name +from ..object_base import ObjectBase +from ..path import Path -class HttpClientResponse: - headers = None - cookies = None +class HttpClientResponse(ObjectBase): + _body = b'' - def __init__(self, response): - self.__response = response - self.__body = b'' - self.__url = Url(response.url) + def __init__(self, client, sock, url): + super().__init__( + client = client, + sock = sock, + url = url, + headers = Headers(request=False), + version = None, + status = None, + message = None, + raw_headers = b'', + readonly_props = ['client', 'socket'] + ) - headers = [] - cookies = [] + self._read_headers() - for key, value in response.getheaders(): - if key.lower in ['set-cookie']: - cookies.append(value) + + def __enter__(self): + return self + + + def __exit__(self, exc_type, exc_value, exc_traceback): + if exc_traceback: + traceback.print_tb(exc_traceback) + + try: + self.sock.close() + + except Exception as e: + print(f'{class_name(e)}: {e}') + pass + + + def _read_headers(self): + #for line in self.read_lines(): + #self.raw_headers += line + + data = b'' + + while True: + data += self.read(2048) + + if not data or b'\r\n\r\n' in data: + break + + try: + self.raw_headers, self._body = data.split(b'\r\n\r\n', 1) + except ValueError: + self.raw_headers = data + + parsed = parse_headers(self.raw_headers, request=False) + + for key, value in parsed.items(): + if key == 'headers': + for k, v in value: + self.headers.append(k, v) else: - headers.append((key, value)) + self.set_property(key, value) - self.headers = Headers(headers, readonly=True) - self.cookies = Cookies(cookies, readonly=True) + self.set_readonly(key) + + self.headers._readonly = True - def __getitem__(self, key): - return self.get_header(key.capitalize()) + def _read_body(self): + if self.length <= len(self._body): + return + + self._body += self.read(self.length - len(self._body)) + return self._body + + + @property + def length(self): + try: + return self.headers['Content-Length'].one() + + except: + return 0 + + + @property + def mimetype(self): + try: + return self.headers['Content-Type'].one() + + except: + return @property def body(self): - if not self.__body: - self.read() + if self.length: + self._read_body() - return self.__body + return self._body + #@property + #def body(self): + #if not self._body: + #self._read_body_lines() - # todo: fetch encoding from headers if possible - @property - def encoding(self): - return 'utf-8' - - - @property - def status(self): - return self.__response.status - - - @property - def url(self): - return self.__url - - - @property - def version(self): - vers = self.__response.version - - if vers == 10: - return 1.0 - - elif vers == 11: - return 1.1 - - elif vers == 20: - return 2.0 - - print('Warning! Invalid HTTP version:', type(vers).__name__, vers) - return vers - - - @property - def bytes(self): - return self.__body + #return self._body @property def text(self): - return self.body.decode(self.encoding) + return self.body.decode('utf-8') + + + @property + def form(self): + return DotDict.new_from_query_string(self.text) @property def json(self): - return DotDict(self.body) + return DotDict(self.text) - def get_header(self, name): - return self.headers.get(name.lower()) + def cookies(self): + return self.headers.cookies() - def read(self, amount=None): - data = self.__response.read(amount) - self.__body += data + def get_cookie(self, key): + return self.headers.get_cookie(key) - return data + + def query(self): + return self.url.query + + + def read(self, length=8192): + return self.sock.recv(length) + + + #def read_lines(self): + #while True: + #line = self.fd.readline() + + #if not line or line == b'\r\n': + #break + + #yield line + + + def save_to_file(self, path=None, filename=None, chunk=8192): + path = Path(path) if path else Path.cwd + data_left = self.length + + if path.is_dir: + path = path.join(filename or self.url.path.name) + + with path.open('wb') as fd: + fd.write(self._body) + + while True: + if chunk > data_left: + fd.write(self.read(data_left)) + break + + fd.write(self.read(chunk)) + data_left -= chunk + + return path diff --git a/izzylib/http_urllib_client/__init__.py b/izzylib/http_urllib_client/__init__.py index 07fee70..6a759c9 100644 --- a/izzylib/http_urllib_client/__init__.py +++ b/izzylib/http_urllib_client/__init__.py @@ -1,12 +1,36 @@ -from . import error -from .client import HttpUrllibClient, set_default_client -from .request import HttpUrllibRequest -from .response import HttpUrllibResponse +import mimetypes -__all__ = [ - 'HttpUrllibClient', - 'HttpUrllibRequest', - 'HttpUrllibResponse', - 'set_default_client', - 'error' -] +mimetypes.add_type('application/activity+json', '.activity') + + +content_types = { + 'json': 'application/json', + 'activity': 'application/activity+json', + 'css': 'text/css', + 'html': 'text/html', + 'js': 'application/javascript', + 'png': 'image/png', + 'jpeg': 'image/jpeg', + 'gif': 'image/gif' +} + + +http_methods = { + 'CONNECT', + 'DELETE', + 'GET', + 'HEAD', + 'OPTIONS', + 'PATCH', + 'POST', + 'PUT', + 'TRACE' +} + + +valid_protocols = ['http', 'https', 'ws', 'wss', 'ftp', 'ftps'] + + +from .client import HttpUrllibClient +from .request import HttpUrllibClientRequest +from .response import HttpUrllibClientResponse diff --git a/izzylib/http_urllib_client/client.py b/izzylib/http_urllib_client/client.py index d1d4193..5628c13 100644 --- a/izzylib/http_urllib_client/client.py +++ b/izzylib/http_urllib_client/client.py @@ -1,218 +1,210 @@ -import json, sys, urllib3 +import functools, json, sys -from PIL import Image -from base64 import b64encode +from base64 import b64decode, b64encode from datetime import datetime -from functools import cached_property, lru_cache +from functools import cached_property, partial from io import BytesIO from ssl import SSLCertVerificationError +from urllib.error import HTTPError +from urllib.request import urlopen -from .request import HttpUrllibRequest -from .response import HttpUrllibResponse +from . import http_methods +from .config import Config +from .request import HttpUrllibClientRequest +from .response import HttpUrllibClientResponse -from .. import __version__ -from ..dotdict import DefaultDotDict, DotDict, LowerDotDict +from .. import izzylog, __version__ +from ..dotdict import DefaultDotDict, DotDict from ..exceptions import HttpFileDownloadedError -from ..misc import Url from ..path import Path +from ..url import Url + +try: + from PIL import Image +except ImportError: + izzylog.verbose('Pillow module not found. Image downloading is disabled') + Image = False + +from typing import * -Client = None - -proxy_ports = { - 'http': 80, - 'https': 443 -} +__pdoc__ = {f'Client.{method.lower()}': f'Send a {method.upper()} request' for method in http_methods} class HttpUrllibClient: - def __init__(self, headers={}, useragent=None, appagent=None, proxy_type='https', proxy_host=None, proxy_port=None, num_pools=20): - if not useragent: - useragent = f'IzzyLib/{__version__}' + 'Basic HTTP client based on `urllib.request.urlopen`' - self.headers = {k:v.lower() for k,v in headers.items()} - self.agent = f'{useragent} ({appagent})' if appagent else useragent - if proxy_type not in ['http', 'https']: - raise ValueError(f'Not a valid proxy type: {proxy_type}') + def __init__(self, **kwargs): + self._cfg = Config(**kwargs) - if proxy_host: - proxy = f'{proxy_type}://{proxy_host}:{proxy_ports[proxy_type] if not proxy_port else proxy_port}' - self.pool = urllib3.ProxyManager(proxy, num_pools=num_pools) - - else: - self.pool = urllib3.PoolManager(num_pools=num_pools) + for method in http_methods: + self.__set_method(method) @property - def agent(self): - return self.headers['user-agent'] + def cfg(self): + 'The config options for the client as a dict' + return self._cfg - @agent.setter - def agent(self, value): - self.headers['user-agent'] = value + def get(self, *args, **kwargs): + return self.request(*args, method='GET', **kwargs) - def set_global(self): - set_default_client(self) + def post(self, *args, **kwargs): + return self.request(*args, method='POST', **kwargs) - def build_request(self, *args, **kwargs): - return HttpUrllibRequest(*args, **kwargs) + def head(self, *args, **kwargs): + return self.request(*args, method='HEAD', **kwargs) - def handle_request(self, request): - request.headers.update(self.headers) - response = self.pool.urlopen(*request._args, **request._kwargs) - return HttpUrllibResponse(response) + def connect(self, *args, **kwargs): + return self.request(*args, method='CONNECT', **kwargs) - def request(self, *args, **kwargs): - return self.handle_request(self.build_request(*args, **kwargs)) + def delete(self, *args, **kwargs): + return self.request(*args, method='DELETE', **kwargs) - def signed_request(self, privkey, keyid, *args, **kwargs): - return self.request(*args, privkey=privkey, keyid=keyid, **kwargs) + def put(self, *args, **kwargs): + return self.request(*args, method='PUT', **kwargs) - def download(self, url, filepath, *args, filename=None, **kwargs): - resp = self.request(url, *args, **kwargs) - - if resp.status != 200: - raise HttpFileDownloadedError(f'Failed to download {url}: Status: {resp.status}, Body: {resp.body}') - - return resp.save(filepath) + def patch(self, *args, **kwargs): + return self.request(*args, method='PATCH', **kwargs) - def image(self, url, filepath, *args, filename=None, ext='png', dimensions=(50, 50), **kwargs): - if not Image: - izzylog.error('Pillow module is not installed') - return - - resp = self.request(url, *args, **kwargs) - - if resp.status != 200: - izzylog.error(f'Failed to download {url}:', resp.status, resp.body) - return False - - if not filename: - filename = Path(url).stem() - - path = Path(filepath) - - if not path.exists: - izzylog.error('Path does not exist:', path) - return False - - byte = BytesIO() - image = Image.open(BytesIO(resp.body)) - image.thumbnail(dimensions) - image.save(byte, format=ext.upper()) - - with path.join(filename).open('wb') as fd: - fd.write(byte.getvalue()) + def trace(self, *args, **kwargs): + return self.request(*args, method='TRACE', **kwargs) - def json(self, *args, headers={}, activity=False, **kwargs): + def options(self, *args, **kwargs): + return self.request(*args, method='OPTIONS', **kwargs) + + + def __set_method(self, method): + setattr(self, method.lower(), partial(self.request, method=method.upper())) + + + def build_request(self, *args, **kwargs) -> HttpUrllibClientRequest: + 'Creates a new `HttpUrllibClientRequest` object. It can be used with `Client.send_request` See `HttpUrllibClientRequest` for available arguments..' + + request = self.cfg.request_class(*args, **kwargs) + request._set_params(self.cfg) + + return request + + + def send_request(self, request: HttpUrllibClientRequest) -> HttpUrllibClientResponse: + 'Sends a request' + + if not isinstance(request, HttpUrllibClientRequest): + raise TypeError(f'Must be a HttpUrllibClientRequest object (or subclassed), not {type(request).__name__}') + + if not request._params_set: + request._set_params(self.cfg) + + try: + response = urlopen(request) + + except HTTPError as e: + response = e.fp + + return self.cfg.response_class(response) + + + def request(self, *args, **kwargs) -> HttpUrllibClientResponse: + 'Create and send a request' + + request = self.build_request(*args, **kwargs) + return self.send_request(request) + + + def json_request(self, *args, headers:dict={}, activity:bool=False, **kwargs): + 'Create and send a request with the Accept header set for JSON. If `activity` is `True`, the header value will be `aplication/activity+json`, otherwise `application/json`' + + for key in list(headers.keys()): + if key.lower() == 'accept': + del headers[key] + json_type = 'activity+json' if activity else 'json' - headers.update({ - 'accept': f'application/{json_type}' - }) + headers['Accept'] = f'application/{json_type}' return self.request(*args, headers=headers, **kwargs) -def set_default_client(client=None): - global Client - Client = client or HttpClient() + def download_file(self, url:str, filepath:Path, *args, overwrite:bool=False, create_dirs:bool=True, chunk_size:int=2048, **kwargs) -> Path: + 'Make a request and save it to the specified path' + + filepath = Path(filepath) + tmppath = filepath.parent.joinpath(filepath.name + '.dltemp') + + if not overwrite and filepath.exists: + raise FileExistsError(f'File already exists: {filepath}') + + if not filepath.parent.exists: + if not create_dirs: + raise FileNotFoundError(f'Path does not exist: {filepath.parent}') + + filepath.parent.mkdir() + + if tmpath.exists: + kwargs['headers']['range'] = f'bytes={tmppath.size}' + + resp = self.request(url, *args, stream=True, **kwargs) + + if not resp.headers.get('content-length'): + try: tmppath.delete() + except: pass + + raise FileExistsError('File already downloaded fully') + + if resp.status != 200: + raise HttpError(resp) + + with tmppath.open('ab') as fd: + for chunk in resp.chunks(chunk_size): + fd.write(chunk) + + shutil.move(tmppath, filepath) + + return filepath -@lru_cache(maxsize=512) -def fetch_actor(url): - if not Client: - raise ValueError('Please set global client with "HttpUrllibClient.set_global()"') + def download_image(self, url:str, filepath:Path, *args, output_format:str='png', dimensions:List[int]=None, create_dirs:bool=True, **kwargs) -> BytesIO: + ''' + Download an image and save it in the specified format. Optionally resize the image with `dimensions` up to the original size. - url = url.split('#')[0] - headers = {'Accept': 'application/activity+json'} - resp = Client.request(url, headers=headers) + example: `dimensions = [50, 50]` + ''' - try: - actor = resp.json + if not Image: + raise ValueError('Pillow module is not installed') - except json.decoder.JSONDecodeError: - return + filepath = Path(filepath) - except Exception as e: - izzylog.debug(f'HTTP {resp.status}: {resp.body}') - raise e from None + if not filepath.parent.exists and not create_dirs: + raise FileNotFoundError(f'Path does not exist: {filepath.parent}') - actor.web_domain = Url(url).host - actor.shared_inbox = actor.inbox - actor.pubkey = None - actor.handle = actor.preferredUsername + filepath.parent.mkdir() - if actor.get('endpoints'): - actor.shared_inbox = actor.endpoints.get('sharedInbox', actor.inbox) + resp = self.request(url, *args, **kwargs) - if actor.get('publicKey'): - actor.pubkey = actor.publicKey.get('publicKeyPem') + if resp.status != 200: + raise HttpError(resp) - return actor + byte = BytesIO() + image = Image.open(BytesIO(resp.body)) + if dimensions: + image.thumbnail(dimensions) -@lru_cache(maxsize=512) -def fetch_instance(domain): - if not Client: - raise ValueError('Please set global client with "HttpUrllibClient.set_global()"') + image.save(byte, format=output_format.upper()) - headers = {'Accept': 'application/json'} - resp = Client.request(f'https://{domain}/api/v1/instance', headers=headers) + with filepath.open('wb') as fd: + fd.write(byte.getvalue()) - try: - return resp.json - - except json.decoder.JSONDecodeError: - return - - except Exception as e: - izzylog.debug(f'HTTP {resp.status}: {resp.body}') - raise e from None - - -@lru_cache(maxsize=512) -def fetch_nodeinfo(domain): - if not Client: - raise ValueError('Please set global client with HttpUrllibClient.set_global()') - - webfinger = Client.request(f'https://{domain}/.well-known/nodeinfo') - webfinger_data = DotDict(webfinger.body) - - for link in webfinger.json.links: - if link['rel'] == 'http://nodeinfo.diaspora.software/ns/schema/2.0': - nodeinfo_url = link['href'] - break - - nodeinfo = Client.request(nodeinfo_url) - return nodeinfo.json - - -@lru_cache(maxsize=512) -def fetch_webfinger_account(handle, domain): - if not Client: - raise ValueError('Please set global client with HttpUrllibClient.set_global()') - - data = DefaultDotDict() - webfinger = Client.request(f'https://{domain}/.well-known/webfinger?resource=acct:{handle}@{domain}') - - if not webfinger.body: - raise ValueError('Webfinger body empty') - - data.handle, data.domain = webfinger.json.subject.replace('acct:', '').split('@') - - for link in webfinger.json.links: - if link['rel'] == 'self' and link['type'] == 'application/activity+json': - data.actor = link['href'] - - return data + return byte diff --git a/izzylib/http_client/config.py b/izzylib/http_urllib_client/config.py similarity index 93% rename from izzylib/http_client/config.py rename to izzylib/http_urllib_client/config.py index 7ae555e..e4be0e0 100644 --- a/izzylib/http_client/config.py +++ b/izzylib/http_urllib_client/config.py @@ -1,5 +1,5 @@ -from .request import HttpClientRequest -from .response import HttpClientResponse +from .request import HttpUrllibClientRequest +from .response import HttpUrllibClientResponse from .. import __version__ from ..config import BaseConfig @@ -44,8 +44,8 @@ class Config(BaseConfig): headers = CapitalDotDict({'User-Agent': f'IzzyLib/{__version__}'}), appagent = None, timeout = 60, - request_class = HttpClientRequest, - response_class = HttpClientResponse, + request_class = HttpUrllibClientRequest, + response_class = HttpUrllibClientResponse, proxy_type = 'https', proxy_host = None, proxy_port = None diff --git a/izzylib/http_urllib_client/error.py b/izzylib/http_urllib_client/error.py deleted file mode 100644 index 958074d..0000000 --- a/izzylib/http_urllib_client/error.py +++ /dev/null @@ -1,36 +0,0 @@ -from urllib3.exceptions import ( - HTTPError, - PoolError, - RequestError, - SSLError, - ProxyError, - DecodeError, - ProtocolError, - MaxRetryError, - HostChangedError, - TimeoutStateError, - TimeoutError, - ReadTimeoutError, - NewConnectionError, - EmptyPoolError, - ClosedPoolError, - LocationValueError, - LocationParseError, - URLSchemeUnknown, - ResponseError, - SecurityWarning, - InsecureRequestWarning, - SystemTimeWarning, - InsecurePlatformWarning, - SNIMissingWarning, - DependencyWarning, - ResponseNotChunked, - BodyNotHttplibCompatible, - IncompleteRead, - InvalidChunkLength, - InvalidHeader, - ProxySchemeUnknown, - ProxySchemeUnsupported, - HeaderParsingError, - UnrewindableBodyError -) diff --git a/izzylib/http_urllib_client/request.py b/izzylib/http_urllib_client/request.py index 9f7688e..8939a92 100644 --- a/izzylib/http_urllib_client/request.py +++ b/izzylib/http_urllib_client/request.py @@ -1,92 +1,209 @@ import json from datetime import datetime +from mimetypes import types_map as content_types +from urllib.parse import urlencode +from urllib.request import Request as PyRequest -from ..dotdict import DotDict, LowerDotDict -from ..misc import Url, boolean +from . import http_methods, valid_protocols +from ..dotdict import CapitalDotDict +from ..enums import Protocol +from ..misc import convert_to_boolean, convert_to_bytes +from ..url import Url -try: - from ..http_signatures import sign_request - -except ModuleNotFoundError: - sign_request = None +try: import magic +except ImportError: magic = None -methods = ['delete', 'get', 'head', 'options', 'patch', 'post', 'put'] +class HttpUrllibClientRequest(PyRequest): + def __init__(self, url:str, body:bytes=None, headers:dict={}, cookies:dict={}, method:str='GET'): + 'An HTTP request. Headers can be accessed, set, or deleted as dict items.' + super().__init__(url) -class HttpUrllibRequest: - def __init__(self, url, **kwargs): - self._body = b'' + self._url = None + self._headers = CapitalDotDict(headers) - method = kwargs.get('method', 'get').lower() - - if method not in methods: - raise ValueError(f'Invalid method: {method}') - - self.url = Url(url) - self.body = kwargs.get('body') + self.body = body + self.url = url self.method = method - self.headers = LowerDotDict(kwargs.get('headers', {})) - self.redirect = boolean(kwargs.get('redirect', True)) - self.retries = int(kwargs.get('retries', 10)) - self.timeout = int(kwargs.get('timeout', 5)) - privkey = kwargs.get('privkey') - keyid = kwargs.get('keyid') + if self.url.proto.lower() not in valid_protocols: + raise ValueError(f'Invalid protocol in url: {self.url.proto}') - if privkey and keyid: - self.sign(privkey, keyid) + self._params_set = False - @property - def _args(self): - return [self.method.upper(), self.url] + def __getattr__(self, key): + if key == 'origin_req_host': + return object.__getattribute__(self, '_host') + + elif key == 'full_url': + return object.__getattribute__(self, '_url') + + return object.__getattribute__(self, key) - @property - def _kwargs(self): - return { - 'body': self.body, - 'headers': self.headers, - 'redirect': self.redirect, - 'retries': self.retries, - 'timeout': self.timeout - } + def __setattr__(self, key, value): + if key in ['headers', 'host']: + return + + object.__setattr__(self, key, value) + + + def __getitem__(self, key): + return self.headers[key] + + + def __setitem__(self, key, value): + self.headers[key] = str(value) + + + def __delitem__(self, key): + del self.headers[key] + + + def _set_params(self, config): + self.headers.update(config.headers) + + if config.proxy_host: + self.set_proxy(f'{config.proxy_host}:{config.proxy_port}', config.proxy_type) + + self._params_set = True @property def body(self): - return self._body + 'Data to be sent along with the request' + return self._data @body.setter def body(self, data): - if isinstance(data, dict): - data = DotDict(data).to_json() + self._data = convert_to_bytes(data) + #self.set_header('Content-Length', len(self._data)) - elif any(map(isinstance, [data], [list, tuple])): - data = json.dumps(data) - if data == None: - data = b'' + @property + def data(self): + 'Alias for `HttpClientRequest.body`' + return self._data - elif not isinstance(data, bytes): - data = bytes(data, 'utf-8') - self._body = data + @data.setter + def data(self, data): + self.body = data + + + @property + def headers(self): + 'Headers to send with this request.' + return self._headers + + + @headers.setter + def headers(self, data:dict): + self._headers.update(dict) + + + @property + def host(self): + 'Domain the request will be sent to.' + return self.url.host + + + @property + def method(self): + 'Method used for this request.' + return self._method + + + @method.setter + def method(self, value): + if value.upper() not in http_methods: + raise ValueError(f'Invalid HTTP method: {method}') + + self._method = value.upper() + + + @property + def port(self): + 'Port the request will use when contacting the server.' + return self.url.port + + + @property + def secure(self): + 'Whether or not the request will use SSL.' + return self.url.proto in ['https', 'wss', 'ftps'] + + + @property + def url(self) -> Url: + 'The URL of the request' + return Url(self.full_url) + + + @url.setter + def url(self, url): + self.full_url = Url(url) + + + def set_chunked(self, value:True): + 'Set the `Encoding` header to `chunked` if `value` is `True`' + + if convert_to_boolean(value): + self.set_header('Encoding', 'chunked') + self.unset_header('Content-Length') + + else: + self.unset_header('Encoding') + self.set_header('Content-Length', len(self.body)) + + + def get_header(self, key): + 'Get a header if it exists' + return self.headers.get(key) def set_header(self, key, value): + 'Set a header to the specified value' self.headers[key] = value def unset_header(self, key): + 'Remove a header' self.headers.pop(key, None) - def sign(self, privkey, keyid): - if not sign_request: - raise AttributeError('PyCryptodome not installed. Request signing is disabled.') + def update_headers(self, data={}, **kwargs): + 'Update headers' + kwargs.update(data) + self.headers.update(kwargs) - sign_request(self, privkey, keyid) + + def set_type(self, content_type): + 'Set the `Content-Type` header. Either a mimetype or file extension can be used.' + self.set_header('Content-Type', content_types.get(f'.{content_type}', content_type)) + + + def set_type_from_body(self) -> str: + 'Set the `Content-Type` header based on `HttpClientRequest.body` and return the mimetype.' + + if not self.body: + return + + mimetype = magic.from_buffer(self.body, mime=True) + self.set_type(mimetype) + return mimetype + + + def sign_headers(self, privkey:str, keyid:str): + '[Not implemented yet] Create an ActivityPub signature of the headers via a private RSA key. Will create `Host` and `Date` headers if they do not exist.' + + raise ImportError(f'Not implemented yet') + + if not sign_request: + raise ImportError(f'Could not import HTTP signatures. Header signing disabled') + + return sign_request(self, privkey, keyid) diff --git a/izzylib/http_urllib_client/response.py b/izzylib/http_urllib_client/response.py index a05ab14..3c8f0a3 100644 --- a/izzylib/http_urllib_client/response.py +++ b/izzylib/http_urllib_client/response.py @@ -1,62 +1,82 @@ -import json - from io import BytesIO -from izzylib import DefaultDotDict, DotDict, Path, Url + +from ..dotdict import DotDict +from ..http_utils import Headers +from ..url import Url + + +class HttpUrllibClientResponse: + headers = None + cookies = None -class HttpUrllibResponse: def __init__(self, response): - self.response = response + self.__response = response + self.__body = b'' + self.__url = Url(response.url) - self._dict = None + headers = [] + cookies = [] + + for key, value in response.getheaders(): + if key.lower in ['set-cookie']: + cookies.append(value) + + else: + headers.append((key, value)) + + self.headers = Headers(headers, readonly=True) + #self.cookies = Cookies(cookies, readonly=True) def __getitem__(self, key): - return self.dict[key] - - - def __setitem__(self, key, value): - self.dict[key] = value - - - @property - def encoding(self): - for line in self.headers.get('content-type', '').split(';'): - try: - k,v = line.split('=') - - if k.lower == 'charset': - return v.lower() - - except: - pass - - return 'utf-8' - - - @property - def headers(self): - return self.response.headers - - - @property - def status(self): - return self.response.status - - - @property - def url(self): - return Url(self.response.geturl()) + return self.get_header(key.capitalize()) @property def body(self): - data = self.response.read(cache_content=True) + if not self.__body: + self.read() - if not data: - data = self.response.data + return self.__body - return data + + # todo: fetch encoding from headers if possible + @property + def encoding(self): + return 'utf-8' + + + @property + def status(self): + return self.__response.status + + + @property + def url(self): + return self.__url + + + @property + def version(self): + vers = self.__response.version + + if vers == 10: + return 1.0 + + elif vers == 11: + return 1.1 + + elif vers == 20: + return 2.0 + + print('Warning! Invalid HTTP version:', type(vers).__name__, vers) + return vers + + + @property + def bytes(self): + return self.__body @property @@ -65,33 +85,16 @@ class HttpUrllibResponse: @property - def dict(self): - if not self._dict: - self._dict = DotDict(self.text) - - return self._dict + def json(self): + return DotDict(self.body) - def json_pretty(self, indent=4): - return self.dict.to_json(indent) + def get_header(self, name): + return self.headers.get(name.lower()) - def chunks(self, size=1024): - return self.response.stream(amt=size) + def read(self, amount=None): + data = self.__response.read(amount) + self.__body += data - - def save(self, path, overwrite=True, create_parents=True): - path = Path(path) - - if not path.parent.exists: - if not create_parents: - raise ValueError(f'Path does not exist: {path.parent}') - - path.parent.mkdir() - - if overwrite and path.exists: - path.delete() - - with path.open('wb') as fd: - for chunk in self.chunks(): - fd.write(chunk) + return data diff --git a/izzylib/http_utils.py b/izzylib/http_utils.py index aa64ab1..9f2a27d 100644 --- a/izzylib/http_utils.py +++ b/izzylib/http_utils.py @@ -1,168 +1,168 @@ +import asyncio, json + +from datetime import datetime, timezone +from io import BytesIO + +from .datestring import DateString from .dotdict import DotDict -from .misc import DateString +from .misc import class_name +from .url import Url + +from typing import Dict -cookie_fields = { - 'expires': 'Expires', - 'max_age': 'Max-Age', - 'domain': 'Domain', - 'path': 'Path', - 'secure': 'Secure', - 'httponly': 'HttpOnly', - 'samesite': 'SameSite' -} - -samesite_values = { - 'String', - 'Lax', - 'None' -} - - -def parse_header_key_name(key): - return key.replace('_', '-').title() - - -def parse_cookie_key_name(key): - return cookie_fields.get(key.replace('_', '-').lower(), key) - - if key not in cookie_fields.values(): - raise KeyError(f'Invalid cookie key: {key}') - - return key +methods = [ + 'CONNECT', + 'DELETE', + 'GET', + 'HEAD', + 'OPTIONS', + 'PATCH', + 'POST', + 'PUT', + 'TRACE' +] class Headers(DotDict): - __readonly = False + _readonly = False + _cookie_header = None - def __init__(self, data=(), readonly=False): - super().__init__() - - for key, value in data: - self[key] = value - - self.__readonly = readonly + def __init__(self, *args, request=True, readonly=False, **kwargs): + super().__init__(*args, **kwargs) + self._cookie_header = 'Cookie' if request else 'Set-Cookie' + self._readonly = readonly def __getitem__(self, key): - return super().__getitem__(parse_header_key_name(key)) + return super().__getitem__(self._format_key(key)) def __setitem__(self, key, value): - if key.startswith('_'): - super().__setattr__(key, value) + if self._readonly: + raise ReadOnlyError('Headers are readonly') + + key = self._format_key(key) + + if type(value) == HeaderItem: + super().__setitem__(key, value) return - if self.readonly: - raise AssertionError('Headers are read-only') + if key == self._cookie_header: + if type(value) == str: + value = CookieItem.from_string(value) - key = parse_header_key_name(key) + elif type(value) != CookieItem: + raise TypeError(f'{key} header must be a `str` or `CookieItem`, not a `{class_name(value)}`') - if key in ['Cookie', 'Set-Cookie']: - izzylog.warning('Do not set the "Cookie" or "Set-Cookie" headers') - return + try: + for cookie in self[key]: + if cookie.key == value.key: + new_data = {k:v for k,v in value.as_dict().items() if k != key} + + cookie.value = value.value + cookie.update() + return + + self[key].append(value) + return + + except KeyError: + pass elif key == 'Date': value = DateString(value, 'http') - try: - super().__getitem__(key).update(value) + elif key == 'Content-Length': + value = int(value) - except KeyError: - super().__setitem__(key, HeaderItem(key, value)) + super().__setitem__(key, HeaderItem(key, value)) + + #try: + #self[key].append(value) + + #except KeyError: + #super().__setitem__(key, HeaderItem(key, value)) def __delitem__(self, key): - if self.readonly: - raise AssertionError('Headers are read-only') - - super().__delitem__(parse_header_key_name(key)) + super().__delitem__(self._format_key(key)) - @property - def readonly(self): - return self.__readonly + def _format_key(self, key): + return key.replace('_', '-').title() - def get_one(self, key): - return self[key].one() + def append(self, key, value): + try: + self[key].append(value) + + except KeyError: + self[key] = value - def set(self, key, value): + def cookies(self): + for cookie in self.get(self._cookie_header, []): + yield cookie + + + def items(self): + for key, item in super().items(): + for value in item: + yield (key, value) + + + def get(self, key, default=None): + try: + self.__getitem__(key) + + except KeyError: + return default + + + def get_cookie(self, key): + for cookie in self.cookies(): + if cookie.key == key: + return cookie + + raise KeyError(key) + + + def get_one(self, key, default=None): + try: + return self[key].one() + + except (KeyError, IndexError): + return default + + + def new_cookie(self, key, value, **kwargs): + return self.set_cookie(CookieItem(key, value, **kwargs)) + + + def new_cookie_from_string(self, value): + return self.set_cookie(CookieItem.from_string(value)) + + + def set_all(self, key, value): try: self[key].set(value) - except: + except KeyError: self[key] = value - def update(self, data={}, **kwargs): - kwargs.update(data) - - for key, value in kwargs.items(): - self[key] = value - - -class Cookies(DotDict): - __readonly = False - - - def __init__(self, cookies=[], readonly=False): - super().__init__() - - for cookie in cookies: - self.new_from_string(cookie) - - self.__readonly = readonly - - - def __setattr__(self, key, value): - if key.startswith('_'): - super().__setattr__(key, value) - return - - if self.readonly: - raise AssertionError('Cookies are read-only') - - if isinstance(value, str): - value = CookieItem.from_string(value) - - elif not isinstance(value, CookieItem): - raise TypeError(f'Cookie must be a str or CookieItem, not a {type(value).__name__}') - - super().__setattr__(key, value) - - - def __delattr__(self, key): - if self.readonly: - raise AssertionError('Cookies are read-only') - - super().__delattr__(key) - - - @property - def readonly(self): - return self.__readonly - - - def new_from_string(self, string): - cookie = CookieItem.new_from_string(string) - self[cookie.name] = cookie - + def set_cookie(self, cookie): + self[self._cookie_header] = cookie return cookie - def update(self, data={}, **kwargs): - kwargs.update(data) - - for key, value in kwargs.items(): - self[key] = value - - class HeaderItem(list): - def __init__(self, key, *values): - super().__init__(values) + def __init__(self, key, values): + super().__init__() + self.update(values) + self.key = key @@ -170,6 +170,10 @@ class HeaderItem(list): return ','.join(str(v) for v in self) + def __repr__(self): + return f'HeaderItem({self.key})' + + def set(self, *values): self.clear() @@ -188,121 +192,44 @@ class HeaderItem(list): class CookieItem(DotDict): def __init__(self, key, value, **kwargs): - super().__init__(kwargs) + self.key = key + self.value = value + self.args = DotDict() - try: - parse_cookie_key_name(key) - raise ValueError(f'The key name for a cookie cannot be {key}') + for k,v in kwargs.items(): + if k not in cookie_params.values(): + raise AttributeError(f'Not a valid cookie parameter: {key}') - except KeyError: - pass - - self.__key = key - self.__value = value - - - def __getitem__(self, key): - return parse_cookie_key_name(key) - - - def __setitem__(self, key, value): - key = parse_cookie_key_name(key) - - if key == 'Expires': - value = self._parse_expires(value) - - elif key == 'Max-Age': - value = self._parse_max_age(value) - - elif key == 'SameSite': - value = self._parse_samesite(value) - - elif key in ['Secure', 'HttpOnly']: - value = boolean(value) - - super().__setitem__(key, value) - - - def __delitem__(self, key): - super().__delitem(parse_cookie_key_name(key)) + setattr(self, k, v) def __str__(self): text = f'{self.key}={self.value}' - try: text += f'; Expires={self.expires}' - except KeyError: pass + if self.expires: + text += f'; Expires={self.expires.strftime("%a, %d %b %Y %H:%M:%S GMT")}' - try: text += f'; Max-Age={self.maxage}' - except KeyError: pass + if self.maxage != None: + text += f'; Max-Age={self.maxage}' - try: text += f'; Domain={self.domain}' - except KeyError: pass + if self.domain: + text += f'; Domain={self.domain}' - try: text += f'; Path={self.path}' - except KeyError: pass + if self.path: + text += f'; Path={self.path}' - try: text += f'; SameSite={self.samesite}' - except KeyError: pass + if self.samesite: + text += f'; SameSite={self.samesite}' - if self.get('secure'): + if self.secure: text += f'; Secure' - if self.get('httponly'): + if self.httponly: text += f'; HttpOnly' return text - def _parse_expires(self, data): - if type(data) == str: - data = DateString.new_http(data) - - elif type(data) in [int, float]: - data = DateString.from_timestamp(data, 'http') - - elif type(data) == datetime: - data = DateString.from_datetime(data, 'http') - - elif type(data) == timedelta: - data = DateSTring.from_datetime(datetime.now(timezone.utc) + data, 'http') - - elif type(data) != DateString: - raise TypeError(f'Expires must be a http date string, timestamp, datetime, or DateString object, not {type(data).__name__}') - - return data - - - def _parse_max_age(self, data): - if isinstance(data, float): - data = int(data) - - elif isinstance(date, timedelta): - data = data.seconds - - elif isinstance(date, datetime): - data = (datetime.now() - date).seconds - - elif not isinstance(data, int): - raise TypeError(f'Max-Age must be an integer, timedelta object, or datetime object, not {data.__class__.__name__}') - - return data - - - def _parse_samesite(self, data): - if isinstance(data, bool): - data = 'Strict' if data else 'None' - - elif isinstance(data, str): - if data.title() not in samesite_values: - raise ValueError(f'Valid SameSite values are Strict, Lax, or None, not {data}') - - else: - raise TypeError(f'SameSite must be a boolean or string, not {data.__class__.__name__}') - - return data.title() - - @classmethod def from_string(cls, data): kwargs = {} @@ -333,8 +260,125 @@ class CookieItem(DotDict): return cls(key, value, **kwargs) + @property + def expires(self): + return self.args.get('Expires') + + + @expires.setter + def expires(self, data): + if isinstance(data, str): + data = DateString(data, 'http') + + elif isinstance(data, int) or isinstance(data, float): + data = datetime.fromtimestamp(data).replace(tzinfo=timezone.utc) + + elif isinstance(data, datetime): + if not data.tzinfo: + data = data.replace(tzinfo=timezone.utc) + + elif isinstance(data, timedelta): + data = datetime.now(timezone.utc) + data + + else: + raise TypeError(f'Expires must be a http date string, timestamp, or datetime object, not {data.__class__.__name__}') + + self.args['Expires'] = data + + + @property + def maxage(self): + return self.args.get('Max-Age') + + + @maxage.setter + def maxage(self, data): + if isinstance(data, int): + pass + + elif isinstance(date, timedelta): + data = data.seconds + + elif isinstance(date, datetime): + data = (datetime.now() - date).seconds + + else: + raise TypeError(f'Max-Age must be an integer, timedelta object, or datetime object, not {data.__class__.__name__}') + + self.args['Max-Age'] = data + + + @property + def domain(self): + return self.args.get('Domain') + + + @domain.setter + def domain(self, data): + if not isinstance(data, str): + raise ValueError(f'Domain must be a string, not {data.__class__.__name__}') + + self.args['Domain'] = data + + + @property + def path(self): + return self.args.get('Path') + + + @path.setter + def path(self, data): + if not isinstance(data, str): + raise ValueError(f'Path must be a string or izzylib.Path object, not {data.__class__.__name__}') + + self.args['Path'] = Path(data) + + + @property + def secure(self): + return self.args.get('Secure') + + + @secure.setter + def secure(self, data): + self.args['Secure'] = boolean(data) + + + @property + def httponly(self): + return self.args.get('HttpOnly') + + + @httponly.setter + def httponly(self, data): + self.args['HttpOnly'] = boolean(data) + + + @property + def samesite(self): + return self.args.get('SameSite') + + + @samesite.setter + def samesite(self, data): + if isinstance(data, bool): + data = 'Strict' if data else 'None' + + elif isinstance(data, str) and data.title() in ['Strict', 'Lax', 'None']: + data = data.title() + + else: + raise TypeError(f'SameSite must be a boolean or one of Strict, Lax, or None, not {data.__class__.__name__}') + + self.args['SameSite'] = data + self.args['Secure'] = True + + def as_dict(self): - return DotDict({self.key: self.value}, **self) + data = DotDict({self.key: self.value}) + data.update(self.args) + + return data def set_defaults(self): @@ -347,3 +391,70 @@ class CookieItem(DotDict): self.maxage = 0 return self + + + +def parse_headers(raw_headers, request=True): + data = DotDict( + headers = [] + ) + + for idx, line in enumerate(raw_headers.decode('utf-8').splitlines()): + if idx == 0: + if request: + method, path, version = line.split() + + data.update({ + 'method': method, + 'path': Path(path) + }) + + else: + version, status, *message = line.split() + + data.update({ + 'status': int(status), + 'message': ' '.join(message) + }) + + else: + try: key, value = line.split(': ', 1) + except: continue + + data.headers.append((key, value)) + + data.version = float(version.split('/')[1]) + + return data + + +def create_message(first_line: str, headers: Dict[str,str], body: bytes=None): + data = first_line + + for k,v in headers.items(): + data += f'\r\n{k}: {v}' + + data += '\r\n\r\n' + data = data.encode('utf-8') + + if body: + if not body.endswith(b'\r\n\r\n'): + body += b'\r\n\r\n' + + return data + body + + return data + + +def create_request_message(method, path, body=None, headers=None, version='1.1'): + assert method.upper() in methods + return create_message(f'{method.upper()} {path} HTTP/{version}', headers, body) + + +def create_response_message(status, message=None, body=None, headers=None, version='1.1'): + msg = f'HTTP/{version} {status}' + + if message: + msg += f' {message}' + + return create_message(msg, headers, body) diff --git a/izzylib/logging.py b/izzylib/logging.py index aa35c23..533be61 100644 --- a/izzylib/logging.py +++ b/izzylib/logging.py @@ -1,18 +1,9 @@ import os, queue, sys, threading, time from datetime import datetime -from enum import IntEnum from pathlib import Path - -class Levels(IntEnum): - CRITICAL = 60, - ERROR = 50 - WARNING = 40 - INFO = 30 - VERBOSE = 20 - DEBUG = 10 - MERP = 0 +from .enums import LogLevel class Log: @@ -24,14 +15,14 @@ class Log: def __init__(self, name, **config): self.name = name - self.level = Levels.INFO + self.level = LogLevel.INFO self.date = True self.format = '%Y-%m-%d %H:%M:%S' self.logfile = None self.update_config(**config) - for level in Levels: + for level in LogLevel: self._set_log_function(level) @@ -57,8 +48,8 @@ class Log: def parse_level(self, level): - try: return Levels(int(level)) - except ValueError: return Levels[level.upper()] + try: return LogLevel(int(level)) + except ValueError: return LogLevel[level.upper()] def set_level_from_env(self, env_name): @@ -96,7 +87,7 @@ class Log: def log(self, level, *msg): if isinstance(level, str): - Levels[level.upper()] + LogLevel[level.upper()] if level < self.level: return diff --git a/izzylib/mastodon.py b/izzylib/mastodon.py index d53e2e9..3c4a788 100644 --- a/izzylib/mastodon.py +++ b/izzylib/mastodon.py @@ -2,10 +2,11 @@ import json from . import __version__ from .config import BaseConfig +from .datestring import DateString from .dotdict import DotDict from .exceptions import HttpError from .http_client import HttpClient -from .misc import DateString, Url +from .url import Url class MastodonApi: diff --git a/izzylib/misc.py b/izzylib/misc.py index 82ad35d..024ad84 100644 --- a/izzylib/misc.py +++ b/izzylib/misc.py @@ -13,6 +13,7 @@ import time import timeit from datetime import datetime, timedelta, timezone +from functools import cached_property from getpass import getpass, getuser from importlib import util from subprocess import Popen, PIPE @@ -21,11 +22,12 @@ from urllib.request import urlopen from . import izzylog from .dotdict import DotDict +from .enums import DatetimeFormat from .path import Path # typing modules from collections.abc import Callable -from typing import * +from typing import Any, Dict, Union __all__ = [ @@ -54,42 +56,9 @@ __all__ = [ 'timestamp', 'var_name', 'ArgParser', - 'Argument', - 'DateString', - 'Url' + 'Argument' ] -__pdoc__ = { - 'DateString.dt': '`datetime.datetime` object used to store the date', - 'DateString.format': 'The format used to create the str', - 'DateString.tz_local': 'The local timezone as a `datetime.timezone` object', - 'DateString.tz_utc': 'UTC timezone as a `datetime.timezone` object', - 'Url.anchor': 'Text after the "#" at the end of the url', - 'Url.host': 'Hostname or IP address portion of the url', - 'Url.password': 'Password portion of the url', - 'Url.path': 'Path portion of a url', - 'Url.port': 'Port number for the url. It one is not specified, it will be guessed based on the protocol', - 'Url.proto': 'Protocol portion of the url', - 'Url.query': 'Query key/value pairs as a dict', - 'Url.query_string': 'Query key/value pairs as a string', - 'Url.username': 'Username portion of the url' -} - -datetime_formats = { - 'http': '%a, %d %b %Y %H:%M:%S GMT', - 'activitypub': '%Y-%m-%dT%H:%M:%SZ', - 'activitypub-date': '%Y-%m-%d' -} - -protocol_ports = DotDict( - HTTP = 80, - HTTPS = 443, - WS = 80, - WSS = 443, - FTP = 21, - FTPS = 990 -) - def app_data_dirs(author:str, name:str) -> Dict[str,Path]: 'Returns the cache and config paths for software' @@ -621,266 +590,5 @@ class Argument(DotDict): self.callback = lambda *cliargs: callback(*cliargs, *args, **kwargs) -class DateString(str): - 'Create a `str` based on a datetime object' - - tz_utc = timezone.utc - tz_local = datetime.now(tz_utc).astimezone().tzinfo - dt = None - format = None - - - def __init__(self, string, format): - assert format in datetime_formats - - self.dt = datetime.strptime(string, datetime_formats[format]).replace(tzinfo=self.tz_utc) - self.format = format - - - def __new__(cls, string, format): - date = str.__new__(cls, string) - return date - - - def __getattr__(self, key): - return getattr(self.dt, key) - - - def __repr__(self): - return f'DateString({self}, format={self.format})' - - - @classmethod - def new_activitypub(cls, date): - 'Create a new `DateString` for use in ActivityPub or Mastodon API messages' - return cls(date, 'activitypub') - - - @classmethod - def new_http(cls, date): - 'Create a new `DateString` for use in HTTP messages' - return cls(date, 'http') - - - @classmethod - def from_datetime(cls, date, format): - 'Create a new `DateString` from an existing `datetime.datetime` object' - assert format in datetime_formats - return cls(date.astimezone(cls.tz_utc).strftime(datetime_formats[format]), format) - - - @classmethod - def from_timestamp(cls, timestamp, format): - 'Create a new `DateString` from a unix timestamp' - return cls.from_datetime(datetime.fromtimestamp(timestamp), format) - - - @classmethod - def now(cls, format): - 'Create a new `DateString` from the current time' - return cls.from_datetime(datetime.now(cls.tz_utc), format) - - - @property - def http(self): - 'Return a new `DateString` in HTTP format' - return DateString(self.dump_to_string('http'), 'http') - - - @property - def activitypub(self): - 'Return a new `DateString` in ActivityPub format' - return DateString(self.dump_to_string('activitypub'), 'activitypub') - - - @property - def utc(self): - 'Return a new `DateString` in UTC time' - return DateString.from_datetime(self.dt.astimezone(self.tz_utc), self.format) - - - @property - def local(self): - 'Return a new `DateString` in local time' - return DateString.from_datetime(self.dt.astimezone(self.tz_local), self.format) - - - def dump_to_string(self, format=None): - 'Return the date as a normal string in the specified format' - if not format: format = self.format - assert format in datetime_formats - return self.dt.strftime(datetime_formats[format]) - - -class Url(str): - '`str` representation of a url parsed with urlparse' - - proto:str = None - host:str = None - port:int = None - path:Path = None - query_string:str = None - query:DotDict = None - username:str = None - password:str = None - anchor:str = None - - - def __init__(self, url): - self._tldcache = None - self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt') - - if isinstance(url, str): - parsed = urlparse(url) - - else: - parsed = url - - #if not all([parsed.scheme, parsed.netloc]): - #raise TypeError('Not a valid url') - - self._parsed = parsed - self.proto = parsed.scheme - self.port = protocol_ports.get(self.proto.upper()) if not parsed.port else None - self.path = parsed.path - self.query_string = parsed.query - self.query = DotDict.new_from_query_string(parsed.query) - self.username = parsed.username - self.password = parsed.password - self.anchor = parsed.fragment - - try: - self.domain = parsed.netloc.split('@')[1] - except: - self.domain = parsed.netloc - - - @property - def top(self) -> str: - 'Returns the domain without sub-domains' - - if not self._tldcache: - if not self._tldcache_path.exists() or self._tldcache_path.mtime + timedelta(hours=24) < datetime.now(): - resp = urlopen('https://publicsuffix.org/list/public_suffix_list.dat') - - with self._tldcache_path.open('w') as fd: - for line in resp.read().decode('utf-8').splitlines(): - if 'end icann domains' in line.lower(): - break - - if not line or line.startswith('//'): - continue - - if line.startswith('*'): - line = line[2:] - - fd.write(line + '\n') - - with self._tldcache_path.open() as fd: - self._tldcache = set(fd.readlines()) - - - - - @property - def path_full(self) -> str: - 'Return the path with the query pairs and anchor' - - string = self.path - - if self.query_string: - string += f'?{query_string}' - - if self.anchor: - string += f'#{self.anchor}' - - return string - - - @property - def host(self) -> str: - 'The domain and, if set, port as a string' - - if self.port: - return f'{self.domain}:{self.port}' - - return self.domain - - - @property - def without_query(self) -> str: - 'Return the url without the query or anchor on the end' - - return Url(self.split('?')[0]) - - - @property - def dict(self) -> DotDict: - 'Return the parsed url as a dict' - - return DotDict( - proto = self.proto, - domain = self.domain, - port = self.port, - path = self.path, - query = self.query, - username = self.username, - password = self.password, - anchor = self.anchor - ) - - - @classmethod - def new(cls, domain:str, path:Union[Path,str]='/', proto:str='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None): - 'Create a new `Url` based on the url parts' - if port == protocol_ports.get(proto): - port = None - - url = f'{proto}://' - - if username and password: - url += f'{username}:{password}@' - - elif username: - url += f'{username}@' - - url += domain - - if port: - url += f':{port}' - - url += '/' + path if not path.startswith('/') else path - - if query: - url += '?' + '&'.join(f'{quote(key)}={quote(value)}' for key,value in query.items()) - - if anchor: - url += f'#{anchor}' - - return cls(url) - - - def join(self, new_path): - 'Add to the path portion of the url' - - data = self.dict - domain = data.pop('domain') - - data['path'] = data['path'].join(new_path) - - return self.new(domain, **data) - - - def replace_property(self, key, value): - data = self.dict - - if key not in data.keys(): - raise KeyError('Invalid Url property') - - data[key] = value - - return self.new(*data) - - # compat boolean = convert_to_boolean diff --git a/izzylib/object_base.py b/izzylib/object_base.py new file mode 100644 index 0000000..bf9c3bd --- /dev/null +++ b/izzylib/object_base.py @@ -0,0 +1,70 @@ +from .exceptions import ReadOnlyError +from .misc import class_name + + +class ObjectBase: + __props = {} + + def __init__(self, readonly_props=[], **kwargs): + self.__props = dict() + self.__readonly = tuple(readonly_props) + + for key, value in kwargs.items(): + self.set_property(key, value) + + + def __repr__(self): + attrs = ', '.join(f'{key}={repr(value)}' for key, value in self.__properties) + return f'{class_name(self)}({attrs})' + + + def __getattr__(self, key): + if key not in self.__props: + return object.__getattribute__(self, key) + + return self.__props[key] + + + def __setattr__(self, key, value): + if key not in self.__props: + return object.__setattr__(self, key, value) + + if key in self.__readonly: + raise ReadOnlyError(key) + + self.__props[key] = self._parse_property(key, value) + + + def __delattr__(self, key): + if key not in self.__props: + return object.__delattr__(self, key, value) + + del self.__props[key] + + + #def __getitem__(self, key): + #return self.__getattr__(key) + + + #def __setitem__(self, key, value): + #self.__setattr__(key, value) + + + #def __delitem__(self, key): + #self.__delattr__(key) + + + def _parse_property(self, key, value): + return value + + + def get_property(self, key): + return self.__props[key] + + + def set_property(self, key, value): + self.__props[key] = self._parse_property(key, value) + + + def set_readonly(self, *props): + self.__readonly = tuple([*props, *self.__readonly]) diff --git a/izzylib/path.py b/izzylib/path.py index 081b19c..cbb4274 100644 --- a/izzylib/path.py +++ b/izzylib/path.py @@ -1,9 +1,11 @@ -import enum, json, os, shutil, sys +import json, os, shutil, sys from datetime import datetime from functools import cached_property from pathlib import Path as PyPath +from .enums import PathType + linux_prefix = dict( bin = '.local/bin', @@ -19,17 +21,17 @@ class TinyDotDict(dict): __delattr__ = dict.__delitem__ -class PathType(enum.Enum): - DIR = 'dir' - FILE = 'file' - LINK = 'link' - UNKNOWN = 'unknown' + class PathMeta(type): @property - def home(cls): - return cls('~').expanduser() + def cache(cls): + try: + return cls(os.environ['XDG_CACHE_HOME']) + + except KeyError: + return cls.home.join('.cache') @property @@ -37,17 +39,17 @@ class PathMeta(type): return cls(os.getcwd()).resolve() + @property + def home(cls): + return cls('~').expanduser() + + @property def script(cls): try: path = sys.modules['__main__'].__file__ except: path = sys.argv[0] - return Path(path).parent - - - @property - def cache(cls): - return cls.home.join('.cache') + return cls(path).parent def module(cls, module): diff --git a/izzylib/url.py b/izzylib/url.py new file mode 100644 index 0000000..39eaefb --- /dev/null +++ b/izzylib/url.py @@ -0,0 +1,243 @@ +import socket + +from datetime import datetime, timedelta +from functools import cached_property +from urllib.parse import urlparse, quote +from urllib.request import urlopen + +from . import izzylog +from .dotdict import DotDict +from .enums import Protocol +from .path import Path + +# typing modules +from typing import Union + + +__all__ = ['Url'] +__pdoc__ = { + 'Url.anchor': 'Text after the "#" at the end of the url', + 'Url.host': 'Hostname or IP address portion of the url', + 'Url.password': 'Password portion of the url', + 'Url.path': 'Path portion of a url', + 'Url.port': 'Port number for the url. It one is not specified, it will be guessed based on the protocol', + 'Url.proto': 'Protocol portion of the url', + 'Url.query': 'Query key/value pairs as a dict', + 'Url.query_string': 'Query key/value pairs as a string', + 'Url.username': 'Username portion of the url' +} + + +class Url(str): + '`str` representation of a url parsed with urlparse' + + proto:str = None + host:str = None + port:int = None + path:Path = None + query_string:str = None + query:DotDict = None + username:str = None + password:str = None + anchor:str = None + + + def __init__(self, url): + self._tldcache = None + self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt') + + if isinstance(url, str): + parsed = urlparse(url) + + else: + parsed = url + + #if not all([parsed.scheme, parsed.netloc]): + #raise TypeError('Not a valid url') + + self._parsed = parsed + self.proto = parsed.scheme + + if not parsed.port: + try: + self.port = Protocol[self.proto.upper()].value + + except KeyError: + self.port = None + + else: + self.port = None + + self.path = parsed.path + self.query_string = parsed.query + self.query = DotDict.new_from_query_string(parsed.query) + self.username = parsed.username + self.password = parsed.password + self.anchor = parsed.fragment + + try: + self.domain = parsed.netloc.split('@')[1] + except: + self.domain = parsed.netloc + + + @property + def dict(self) -> DotDict: + 'Return the parsed url as a dict' + + return DotDict( + proto = self.proto, + domain = self.domain, + port = self.port, + path = self.path, + query = self.query, + username = self.username, + password = self.password, + anchor = self.anchor + ) + + + @property + def host(self) -> str: + 'The domain and, if set, port as a string' + + if self.port: + return f'{self.domain}:{self.port}' + + return self.domain + + + @cached_property + def ipaddress(self) -> str: + 'Resolve the domain to an IP address' + + return socket.gethostbyname(self.domain) + + + @property + def path_full(self) -> str: + 'Return the path with the query pairs and anchor' + + string = self.path + + if self.query_string: + string += f'?{query_string}' + + if self.anchor: + string += f'#{self.anchor}' + + return string + + + @property + def top(self) -> str: + 'Returns the domain without sub-domains' + + if not self.domain and not self.path: + raise ValueError('No domain') + + domain = self.domain or self.path + + if not self._tldcache: + if not self._tldcache_path.exists() or self._tldcache_path.mtime + timedelta(days=7) < datetime.now(): + resp = urlopen('https://publicsuffix.org/list/public_suffix_list.dat') + + with self._tldcache_path.open('w') as fd: + for line in resp.read().decode('utf-8').splitlines(): + if 'end icann domains' in line.lower(): + break + + if not line or line.startswith('//'): + continue + + if line.startswith('*'): + line = line[2:] + + fd.write(line + '\n') + + with self._tldcache_path.open() as fd: + self._tldcache = set(fd.read().splitlines()) + + domain_split = domain.split('.') + + try: + if '.'.join(domain_split[-2:]) in self._tldcache: + return '.'.join(domain_split[-3:]) + + except IndexError: + pass + + if '.'.join(domain_split[-1:]) in self._tldcache: + return '.'.join(domain_split[-2:]) + + raise ValueError('Cannot find TLD') + + + @property + def without_query(self) -> str: + 'Return the url without the query or anchor on the end' + + return Url(self.split('?')[0]) + + + @classmethod + def new(cls, domain:str, path:Union[Path,str]='/', proto:Union[str,Protocol]='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None): + 'Create a new `Url` based on the url parts' + + if type(proto) == str: + try: + protocol = Protocol[proto.upper()] + except KeyError: + protocol = None + + if protocol and port == protocol.value: + port = None + + url = f'{proto}://' + + if username and password: + url += f'{username}:{password}@' + + elif username: + url += f'{username}@' + + url += domain + + if port: + url += f':{port}' + + url += '/' + path if not path.startswith('/') else path + + if query: + url += '?' + '&'.join(f'{quote(key)}={quote(value)}' for key,value in query.items()) + + if anchor: + url += f'#{anchor}' + + return cls(url) + + + def copy(self): + return Url.new(**self.dict) + + + def join(self, new_path): + 'Add to the path portion of the url' + + data = self.dict + domain = data.pop('domain') + + data['path'] = data['path'].join(new_path) + + return self.new(domain, **data) + + + def replace_property(self, key, value): + data = self.dict + + if key not in data.keys(): + raise KeyError('Invalid Url property') + + data[key] = value + + return self.new(**data)