This commit is contained in:
Izalia Mae 2022-03-17 16:10:21 -04:00
parent 95c0d1ff45
commit 038dfa3b3c
14 changed files with 518 additions and 309 deletions

View file

@ -23,12 +23,21 @@ from .cache import *
from .config import *
from .connection import *
from .url import *
from .object_base import ObjectBase
from .mastodon import MastodonApi
from .http_client import (
HttpClient,
HttpClientRequest,
HttpClientResponse
)
from .http_urllib_client import (
HttpUrllibClient,
HttpUrllibClientRequest,
HttpUrllibClientResponse
)
def register_global(obj, name=None):
# This doesn't work and I'm not sure why
@ -47,6 +56,7 @@ def add_builtins(*classes, **kwargs):
HttpClient,
JsonConfig,
LruCache,
ObjectBase,
Path,
TtlCache,
Url,

118
izzylib/datestring.py Normal file
View file

@ -0,0 +1,118 @@
from datetime import datetime, timezone
from .enums import DatetimeFormat
__all__ = ['DateString']
__pdoc__ = {
'DateString.dt': '`datetime.datetime` object used to store the date',
'DateString.format': 'The format used to create the str',
'DateString.tz_local': 'The local timezone as a `datetime.timezone` object',
'DateString.tz_utc': 'UTC timezone as a `datetime.timezone` object',
}
class DateString(str):
'Create a `str` based on a datetime object'
tz_utc = timezone.utc
tz_local = datetime.now(tz_utc).astimezone().tzinfo
dt = None
format = None
def __init__(self, string, format):
try:
self.dt = datetime.strptime(string, DatetimeFormat[format.upper()].value).replace(tzinfo=self.tz_utc)
except ValueError:
if format.upper() == 'ACTIVITYPUB':
self.dt = datetime.strptime(string, DatetimeFormat['ACTIVITYPUB_MICRO'].value).replace(tzinfo=self.tz_utc)
self.format = format
def __new__(cls, string, format):
date = str.__new__(cls, string)
return date
def __getattr__(self, key):
return getattr(self.dt, key)
def __repr__(self):
return f'DateString({self}, format={self.format})'
@classmethod
def new_activitypub(cls, date):
'Create a new `DateString` for use in ActivityPub or Mastodon API messages'
return cls(date, 'activitypub')
@classmethod
def new_http(cls, date):
'Create a new `DateString` for use in HTTP messages'
return cls(date, 'http')
@classmethod
def new_mastodon(cls, date):
return cls(date, 'mastodon')
@classmethod
def from_datetime(cls, date, format):
'Create a new `DateString` from an existing `datetime.datetime` object'
return cls(date.astimezone(cls.tz_utc).strftime(DatetimeFormat[format.upper()].value), format)
@classmethod
def from_timestamp(cls, timestamp, format):
'Create a new `DateString` from a unix timestamp'
return cls.from_datetime(datetime.fromtimestamp(timestamp), format)
@classmethod
def now(cls, format):
'Create a new `DateString` from the current time'
return cls.from_datetime(datetime.now(cls.tz_utc), format)
@property
def activitypub(self):
'Return a new `DateString` in ActivityPub format'
return DateString(self.dump_to_string('activitypub'), 'activitypub')
@property
def http(self):
'Return a new `DateString` in HTTP format'
return DateString(self.dump_to_string('http'), 'http')
@property
def mastodon(self):
'Return a new `DateString` in Mastodon API format'
return DateString(self.dump_to_string('mastodon'), 'mastodon')
@property
def utc(self):
'Return a new `DateString` in UTC time'
return DateString.from_datetime(self.dt.astimezone(self.tz_utc), self.format)
@property
def local(self):
'Return a new `DateString` in local time'
return DateString.from_datetime(self.dt.astimezone(self.tz_local), self.format)
def dump_to_string(self, format=None):
'Return the date as a normal string in the specified format'
if not format:
format = self.format
return self.dt.strftime(DatetimeFormat[format.upper()].value)

View file

@ -3,6 +3,7 @@ from enum import Enum, IntEnum
__all__ = [
'DatetimeFormat',
'IpAddressType',
'LogLevel',
'PathType',
'Protocol'
@ -12,9 +13,20 @@ __all__ = [
class DatetimeFormat(Enum):
HTTP = '%a, %d %b %Y %H:%M:%S GMT'
ACTIVITYPUB = '%Y-%m-%dT%H:%M:%SZ'
ACTIVITYPUB_MICRO = '%Y-%m-%dT%H:%M:%S.%fZ'
MASTODON = '%Y-%m-%d'
class IpAddressType(Enum):
PUBLIC = 'public'
PRIVATE = 'private'
LOOPBACK = 'loopback'
LOCAL = 'local'
MULTICAST = 'multicast'
RESERVED = 'reserved'
UNKNOWN = 'unknown'
class LogLevel(IntEnum):
CRITICAL = 60,
ERROR = 50

View file

@ -1,65 +1,37 @@
__all__ = [
'NoConnectionError'
'HttpError',
'NoConnectionError',
'ReadOnlyError',
'TokenExistsError',
'VersionError'
]
class NoConnectionError(Exception):
'Raise when trying to use a connection that has not been established yet'
class ReadOnlyError(Exception):
'Raise when a read-only property is attempted to be set'
class DBusClientError(Exception):
pass
class TokenExistsError(Exception):
'Raise when trying to create a token or app when a token already exists'
class DBusServerError(Exception):
pass
class HttpFileDownloadedError(Exception):
'raise when a download failed for any reason'
class InvalidMethodException(Exception):
def __init__(self, method):
super().__init__(f'Invalid HTTP method: {method}')
self.method = method
class MethodNotHandledException(Exception):
def __init__(self, method):
super().__init__(f'HTTP method not handled by handler: {method}')
self.method = method
class NoBlueprintForPath(Exception):
'raise when no blueprint is found for a specific path'
class NoConnectionError(Exception):
'Raise when a function requiring a connection gets called when there is no connection'
class MaxConnectionsError(Exception):
'Raise when the max amount of connections has been reached'
class NoTransactionError(Exception):
'Raise when trying to execute an SQL write statement outside a transaction'
class NoTableLayoutError(Exception):
'Raise when a table layout is necessary, but not loaded'
class UpdateAllRowsError(Exception):
'Raise when an UPDATE tries to modify all rows in a table'
class VersionError(Exception):
'Raise when something is not the right version'
class HttpError(Exception):
def __init__(self, response):
self.response = response
super().__init__(f'HTTP ERROR {self.status} for "{response.url}": {self.message[:100]}')
if response.message:
super().__init__(f'HTTP ERROR {response.status} for "{response.url}": {response.message}')
else:
super().__init__(f'HTTP ERROR {response.status} for "{response.url}"')
@property

View file

@ -6,16 +6,17 @@ from .response import HttpClientResponse
from .. import __version__
from ..dotdict import DotDict
from ..exceptions import HttpError
from ..http_utils import Headers
from ..misc import class_name
from ..object_base import ObjectBase
class HttpClient(ObjectBase):
def __init__(self, appagent=None, headers={}, request_class=None, response_class=None):
def __init__(self, appagent=None, headers={}, request_class=None, response_class=None, timeout=60):
super().__init__(
headers = ClientHeaders(headers),
timeout = 60,
timeout = timeout,
request_class = request_class or HttpClientRequest,
response_class = response_class or HttpClientResponse,
readonly = ['headers']
@ -25,6 +26,11 @@ class HttpClient(ObjectBase):
self.set_appagent(appagent)
@property
def useragent(self):
return self.headers.get('User-Agent')
def build_request(self, url, method='GET', body=b'', headers={}, cookies=[]):
return self.request_class(url,
method = method,
@ -34,15 +40,15 @@ class HttpClient(ObjectBase):
)
def request(self, url, method='GET', body=b'', headers={}, cookies=[]):
def request(self, url, method='GET', body=b'', headers={}, cookies=[], raise_error=False):
req = self.build_request(url, method, body, headers, cookies)
return self.send_request(req)
return self.send_request(req, raise_error)
def send_request(self, request):
def send_request(self, request, raise_error=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.settimeout(self.timeout)
sock.connect((request.url.ipaddress, request.url.port))
sock.connect((request.url.address, request.url.port))
if request.url.proto.lower() in ['https', 'wss']:
context = ssl.create_default_context()
@ -50,7 +56,12 @@ class HttpClient(ObjectBase):
sock.sendall(request.compile())
return self.response_class(self, sock, request.url)
response = self.response_class(self, sock, request.url)
if raise_error and response.status not in range(200, 300):
raise HttpError(response)
return response
def set_appagent(self, appagent):

View file

@ -1,6 +1,7 @@
from ..datestring import DateString
from ..enums import DatetimeFormat
from ..http_utils import CookieItem, Headers, create_request_message, methods
from ..misc import convert_to_bytes
from ..object_base import ObjectBase
from ..url import Url

View file

@ -1,5 +1,7 @@
import traceback
from datetime import datetime, timedelta
from ..dotdict import DotDict
from ..http_utils import Headers, parse_headers
from ..misc import class_name
@ -43,6 +45,10 @@ class HttpClientResponse(ObjectBase):
pass
def __del__(self):
self.sock.close()
def _read_headers(self):
#for line in self.read_lines():
#self.raw_headers += line
@ -77,12 +83,27 @@ class HttpClientResponse(ObjectBase):
def _read_body(self):
if self.length <= len(self._body):
return
return self._body
self._body += self.read(self.length - len(self._body))
return self._body
def _read_body_nolen(self):
self.sock.setblocking(False)
current_time = datetime.now()
timeout_time = current_time + timedelta(seconds=self.client.timeout)
while current_time < timeout_time:
try:
self._body += self.read()
except:
break
return self._body
@property
def length(self):
try:
@ -103,18 +124,17 @@ class HttpClientResponse(ObjectBase):
@property
def body(self):
if self.length:
self._read_body()
if not self._body.endswith(b'\r\n\r\n'):
if self.length:
self._read_body()
self.close()
else:
self._read_body_nolen()
self.close()
return self._body
#@property
#def body(self):
#if not self._body:
#self._read_body_lines()
#return self._body
@property
def text(self):
@ -131,6 +151,10 @@ class HttpClientResponse(ObjectBase):
return DotDict(self.text)
def close(self):
self.sock.close()
def cookies(self):
return self.headers.cookies()
@ -147,16 +171,6 @@ class HttpClientResponse(ObjectBase):
return self.sock.recv(length)
#def read_lines(self):
#while True:
#line = self.fd.readline()
#if not line or line == b'\r\n':
#break
#yield line
def save_to_file(self, path=None, filename=None, chunk=8192):
path = Path(path) if path else Path.cwd
data_left = self.length

View file

@ -16,7 +16,6 @@ from .response import HttpUrllibClientResponse
from .. import izzylog, __version__
from ..dotdict import DefaultDotDict, DotDict
from ..exceptions import HttpFileDownloadedError
from ..path import Path
from ..url import Url

View file

@ -217,106 +217,6 @@ class LogFile:
self.fd.flush()
class LogFile2(threading.Thread):
def __init__(self, path):
super().__init__()
self.daemon = True
self.path = Path(path).resolve()
self.lockfile = Path(str(self.path)+'.lck')
self.lockfd = None
## Make sure a lock file does not already exist
self.is_locked
self.queue = queue.Queue()
self.close_thread = False
@property
def is_locked(self):
if not self.lockfd and self.lockfile.exists():
with self.lockfile.open('r') as fd:
try: pid = int(fd.read())
except: return False
if self.pid != pid and check_pid(pid):
raise FileExistsError('Cannot aquire lock')
self.lockfile.unlink()
return True
if self.lockfd:
return True
return False
@property
def pid(self):
return os.getpid()
def aquire_lock(self):
if self.is_locked:
return
self.lockfd = self.lockfile.open('w')
self.lockfd.write(str(self.pid))
self.lockfd.flush()
def release_lock(self):
try:
if not self.is_locked:
return
except FileExistsError:
return
self.lockfd.close()
self.lockfile.unlink()
self.lockfd = None
def run(self):
self.aquire_lock()
with self.path.open('a') as fd:
while True:
if self.queue.empty():
if self.close_thread:
print('done')
break
time.sleep(1)
continue
print('line write')
try:
fd.write(self.queue.get_nowait())
fd.flush()
self.queue.task_done()
except queue.Empty:
print('empty')
pass
self.release_lock()
def stop(self):
self.close_thread = True
self.queue.join()
self.join()
def write(self, *message):
self.queue.put(' '.join(message))
def get_logger(name, **config):
try:
return logger[name.lower()]

View file

@ -1,103 +1,134 @@
import json
import json, re, select
from . import __version__
from .config import BaseConfig
from .datestring import DateString
from .dotdict import DotDict
from .exceptions import HttpError
from .exceptions import HttpError, NoConnectionError, TokenExistsError, VersionError
from .http_client import HttpClient
from .misc import join_list
from .object_base import ObjectBase
from .url import Url
class MastodonApi:
def __init__(self, domain, **kwargs):
self.cfg = BaseConfig(
appname = kwargs.get('appname', 'MastoAPI Client'),
vpattern = re.compile('([0-9]+)\.([0-9]+)\.([0-9]+)')
def version_check(minimum=None, maximum=None):
min_tpl = tuple(minimum.split('.')) if minimum else None
max_tpl = tuple(maximum.split('.')) if maximum else None
def api_deco(func):
def funcwrap(self, *args, **kwargs):
if not self.version:
raise NoConnectionError('Connect to the instance fist')
if minimum and self.version < min_tpl:
raise VersionError(f'Instance version must be at least {minimum}. Current version: {self.version_string}')
elif maximum and self.version > max_tpl:
raise VersionError(f'Endpoint does not exist anymore')
return func(self, *args, **kwargs)
return funcwrap
return api_deco
class MastodonApi(ObjectBase):
def __init__(self, domain, token=None, appname='Mastodon Api Client', timeout=5, **kwargs):
super().__init__(
appname = appname,
domain = domain,
token = kwargs.get('token'),
token = token,
vapid_key = kwargs.get('vapid_key'),
redirect = kwargs.get('redirect', 'urn:ietf:wg:oauth:2.0:oob'),
scopes = Scopes(*kwargs.get('scopes', [])),
website = kwargs.get('website')
website = kwargs.get('website'),
version = None,
readonly = ['appname', 'domain', 'website']
)
self._client = Client(appagent=self.cfg.appname)
self._client = HttpClient(
appagent = appname,
timeout = timeout,
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'identity'
}
)
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()
@property
def scopes(self):
return self.cfg.scopes
def version_string(self):
return join_list(self.version)
def __send_request(self, endpoint, data=None, query={}, token=None, method='GET'):
if method.upper() not in ['GET', 'POST']:
raise ValueError(f'Method must be a GET or POST, not a {method}')
url = Url.new(self.cfg.domain, path=endpoint, query=query)
headers = {}
if token:
headers['Authorization'] = f'Bearer {token}'
if method.upper() in ['POST']:
headers['Content-Type'] = 'application/json'
request = self._client.create_request(url, data, headers, method=method)
response = self._client.run_request(request)
body = self._client.read_body(response)
print(body)
if response.status != 200:
print(type(response.status), response.status)
raise HttpError(response.status, body)
def connect(self):
instance = self.api('/api/v1/instance')
try:
return DotDict(body)
version = vpattern.findall(instance['version'])[0]
except IndexError:
raise ConnectionError('Unable to find instance version')
except:
try:
message = DotDict(body).error
except:
message = body
raise HttpError(response.status, message)
self.version = version
def api(self, endpoint, *args, **kwargs):
return self.__send_request(f'/api/v1/{endpoint}', *args, **kwargs)
def disconnect(self):
self.version = None
def oauth(self, endpoint, *args, **kwargs):
return self.__send_request(f'/oauth/{endpoint}', *args, **kwargs)
def api(self, endpoint, data={}, token=None, method='GET'):
if method.upper() not in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
raise ValueError(f'Invalid HTTP method: {method}')
def me(self):
return Account(self.api('accounts/verify_credentials', token=self.cfg.token))
def create_app(self):
data = DotDict(
client_name = self.cfg.appname,
redirect_uris = self.cfg.redirect
kwargs = DotDict(
headers = {}
)
if self.cfg.scopes:
data.scopes = self.cfg.scopes.compile()
url = Url.new(self.domain, path=endpoint, query=data, proto='HTTPS')
if self.cfg.website:
data.website = Url(self.cfg.website)
if data:
if method == 'GET':
url = url.replace_property('query', data)
return self.api('apps', data=data, method='POST')
else:
kwargs.body = data
if token or self.token:
kwargs.headers.authorization = f'Bearer {token or self.token}'
if method.upper() in ['POST']:
kwargs.headers.headers.content_type = 'application/json'
with self._client.request(url, method, **kwargs, raise_error=True) as response:
body_lines = response.text.splitlines()
body = '\n'.join(body_lines[1:-2]) if len(body_lines) > 1 else body_lines[0]
return json.loads(body)
@version_check()
def create_token(self, type, id, secret, auth_code=None, username=None, password=None):
if self.token:
raise TokenExistsError('Refusing to create a new token')
data = DotDict(
grant_type = type,
client_id = id,
client_secret = secret,
redirect_uri = self.cfg.redirect,
scope = self.cfg.scopes.compile()
redirect_uri = self.redirect,
scope = self.scopes.compile()
)
if auth_code:
@ -107,43 +138,65 @@ class MastodonApi:
data.username = username
data.password = password
return self.oauth('token', data, method='POST')
return self.api('/oauth/token', data, method='POST')
def login_client(self, id, secret):
return self.create_token('client_credentials', id, secret)
def login_password(self, id, secret, username, password):
#auth = self.oauth('authorize', data=dict(
#response_type = 'code',
#client_id = id,
#redirect_uri = self.cfg.redirect,
#scope = self.cfg.scopes.compile(),
#username = username,
#password = password
#))
#print(auth)
return self.create_token('password', id, secret, username=username, password=password)
## Does not work atm for some reason (returns 406 Not Acceptable)
@version_check()
def authorize(self, id, force_login=False):
data = DotDict(
response_type = 'code',
client_id = id,
redirect_uri = self.cfg.redirect,
scope = self.cfg.scopes.compile()
redirect_uri = self.redirect,
scope = self.scopes.compile(),
force_login = force_login
)
if force_login:
data.force_login = True
return self.oauth('authorize', query=data)
return self.api('/oauth/authorize', data)
@version_check()
def revoke(self, client_id, client_secret, token=None):
assert token or self.token
data = dict(
client_id = client_id,
client_secret = client_secret,
token = token or self.token
)
return self.api('/oauth/revoke', data, token=token, method='POST')
@version_check()
def create_app(self):
if self.token:
raise TokenExistsError('Refusing to create a new app')
data = DotDict(
client_name = self.appname,
redirect_uris = self.redirect
)
if self.scopes:
data.scopes = self.scopes.compile()
if self.website:
data.website = Url(self.website)
return self.api('/api/v1/apps', data=data, method='POST')
@version_check('2.0.0')
def verify_credentials(self):
return self.api('/api/v1/apps/verify_credentials')
@version_check('2.7.0')
def register(self, username, email, password, locale='en-us', reason=None):
app = self.create_app()
data = DotDict(
@ -159,7 +212,22 @@ class MastodonApi:
token = self.login_client(app.client_id, app.client_secret)
return self.api('accounts', data, token=token.access_token, method='POST')
return self.api('/api/v1/accounts', data, token=token.access_token, method='POST')
@version_check('1.1.1')
def update_profile(self, kwargs):
return self.api('/api/v1/accounts/update_credentials', kwargs, method='PATCH')
@version_check()
def me(self):
return Account(self.api('/api/v1/accounts/verify_credentials'))
@version_check('1.1.0')
def instance(self):
return self.api('/api/v1/instance')
class Scopes:
@ -316,7 +384,7 @@ class Account(DotDict):
super().__init__(*args, **kwargs)
self.created_at = DateString.new_activitypub(self.created_at)
self.last_status_at = DateString(self.last_status_at, 'activitypub-date')
self.last_status_at = DateString(self.last_status_at, 'mastodon')
@property

View file

@ -271,6 +271,11 @@ def import_from_path(mod_path:Union[str,Path]):
return module
def join_list(data: Union[list,tuple,set], join_value: str = ' ') -> str:
'Takes a list, tuple, or set, converts all items to a string, and then combines them into a single string'
return join_value.join(str(v) for v in data)
def nfs_check(path:Union[str,Path]) -> bool:
'''
Check if a file or directory is on an NFS share.

View file

@ -7,7 +7,12 @@ class ObjectBase:
def __init__(self, readonly_props=[], **kwargs):
self.__props = dict()
self.__readonly = tuple(readonly_props)
if readonly_props == True:
self.__readonly = True
else:
self.__readonly = tuple(readonly_props)
for key, value in kwargs.items():
self.set_property(key, value)
@ -29,7 +34,7 @@ class ObjectBase:
if key not in self.__props:
return object.__setattr__(self, key, value)
if key in self.__readonly:
if self.__readonly == True or key in self.__readonly:
raise ReadOnlyError(key)
self.__props[key] = self._parse_property(key, value)
@ -39,19 +44,30 @@ class ObjectBase:
if key not in self.__props:
return object.__delattr__(self, key, value)
if self.__readonly == True or key in self.__readonly:
raise ReadOnlyError(key)
del self.__props[key]
#def __getitem__(self, key):
#return self.__getattr__(key)
def __getitem__(self, key):
return self.__getattr__(key)
#def __setitem__(self, key, value):
#self.__setattr__(key, value)
def __setitem__(self, key, value):
try:
self.__setattr__(key, value)
except ReadOnlyError:
raise AttributeError(key)
#def __delitem__(self, key):
#self.__delattr__(key)
def __delitem__(self, key):
try:
self.__delattr__(key)
except ReadOnlyError:
raise AttributeError(key)
def _parse_property(self, key, value):

View file

@ -1,4 +1,4 @@
import json, os, shutil, sys
import grp, json, os, pwd, shutil, sys
from datetime import datetime
from functools import cached_property
@ -108,6 +108,16 @@ class Path(str, metaclass=PathMeta):
raise FileExistsError('File or directory already exists:', target)
@property
def gid(self):
return os.stat(self).st_gid
@property
def group(self):
return grp.getgrgid(self.gid)[0]
@property
def mtime(self):
return datetime.fromtimestamp(os.path.getmtime(self))
@ -135,7 +145,17 @@ class Path(str, metaclass=PathMeta):
@cached_property
def suffix(self):
return os.path.splitext(self.name)[1]
return os.path.splitext(self.name)[1][1:]
@property
def uid(self):
return os.stat(self).st_uid
@property
def user(self):
return pwd.getpwuid(self.uid)[0]
def append(self, text):
@ -267,9 +287,12 @@ class Path(str, metaclass=PathMeta):
def listdir(self, recursive=True):
if recursive:
return tuple(self.join(f) for dp, dn, fn in os.walk(self) for f in fn)
for dp, dn, fn in os.walk(self):
for path in fn:
yield self.join(path)
return tuple(self.join(path) for path in os.listdir(self))
for path in os.listdir(self):
yield self.join(path)
def mkdir(self, mode=0o755):

View file

@ -2,12 +2,13 @@ import socket
from datetime import datetime, timedelta
from functools import cached_property
from ipaddress import ip_address
from urllib.parse import urlparse, quote
from urllib.request import urlopen
from . import izzylog
from .dotdict import DotDict
from .enums import Protocol
from .enums import IpAddressType, Protocol
from .path import Path
# typing modules
@ -46,40 +47,14 @@ class Url(str):
self._tldcache = None
self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt')
if isinstance(url, str):
parsed = urlparse(url)
if not isinstance(url, str):
raise TypeError('url is not a string')
else:
parsed = url
self._parsed = urlparse(url)
#if not all([parsed.scheme, parsed.netloc]):
#if not all([self.proto, self.domain]):
#raise TypeError('Not a valid url')
self._parsed = parsed
self.proto = parsed.scheme
if not parsed.port:
try:
self.port = Protocol[self.proto.upper()].value
except KeyError:
self.port = None
else:
self.port = None
self.path = parsed.path
self.query_string = parsed.query
self.query = DotDict.new_from_query_string(parsed.query)
self.username = parsed.username
self.password = parsed.password
self.anchor = parsed.fragment
try:
self.domain = parsed.netloc.split('@')[1]
except:
self.domain = parsed.netloc
@property
def dict(self) -> DotDict:
@ -97,6 +72,48 @@ class Url(str):
)
@property
def address(self) -> str:
'Resolve the domain to an IP address'
return socket.gethostbyname(self.domain)
@property
def address_type(self) -> IpAddressType:
ip = ip_address(self.address)
if ip.is_loopback:
return IpAddressType.LOOPBACK
elif ip.is_reserved:
return IpAddressType.RESERVED
elif ip.is_link_local:
return IpAddressType.LOCAL
elif ip.is_multicast:
return IpAddressType.MULTICAST
elif ip.is_global:
return IpAddressType.GLOBAL
elif ip.is_private:
return IpAddressType.PRIVATE
return IpAddressType.UNKNOWN
@property
def anchor(self) -> str:
return self._parsed.fragment
@cached_property
def domain(self) -> str:
return self._parsed.hostname
@property
def host(self) -> str:
'The domain and, if set, port as a string'
@ -107,14 +124,17 @@ class Url(str):
return self.domain
@cached_property
def ipaddress(self) -> str:
'Resolve the domain to an IP address'
return socket.gethostbyname(self.domain)
@property
def password(self) -> str:
return self._parsed.password
@cached_property
def path(self) -> Path:
return Path(self._parsed.path)
@cached_property
def path_full(self) -> str:
'Return the path with the query pairs and anchor'
@ -129,6 +149,34 @@ class Url(str):
return string
@cached_property
def port(self) -> int:
if not self._parsed.port:
try:
return Protocol[self.proto.upper()].value
except KeyError:
return None
else:
return self._parsed.port
@cached_property
def proto(self) -> str:
return self._parsed.scheme
@cached_property
def query_string(self) -> str:
return self._parsed.query
@cached_property
def query(self) -> DotDict:
return DotDict.new_from_query_string(self._parsed.query)
@property
def top(self) -> str:
'Returns the domain without sub-domains'
@ -173,6 +221,11 @@ class Url(str):
raise ValueError('Cannot find TLD')
@property
def username(self) -> str:
return self._parsed.username
@property
def without_query(self) -> str:
'Return the url without the query or anchor on the end'
@ -221,6 +274,13 @@ class Url(str):
return Url.new(**self.dict)
def is_address_type(self, iptype):
if type(iptype) != IpAddressType:
iptype = IpAddressType[iptype.upper()]
return self.address_type == iptype
def join(self, new_path):
'Add to the path portion of the url'