This commit is contained in:
Izalia Mae 2022-03-08 22:49:56 -05:00
parent e349945918
commit 391bdc8054
7 changed files with 157 additions and 84 deletions

View file

@ -36,7 +36,16 @@ from .http_client import (
)
def add_builtins(*classes):
def register_global(obj, name=None):
# This doesn't work and I'm not sure why
#assert isinstance(obj, (callable, object))
__builtins__[name or class_name(obj)] = obj
return obj
def add_builtins(*classes, **kwargs):
new_builtins = [
BaseConfig,
DotDict,
@ -54,4 +63,8 @@ def add_builtins(*classes):
*classes
]
__builtins__.update({cls.__name__: cls for cls in new_builtins})
for cls in new_builtins:
register_global(cls)
for name, cls in kwargs.items():
register_global(cls, name)

View file

@ -86,6 +86,12 @@ class BaseCache(OrderedDict):
self.popitem(last=False)
def clear(self):
'Remove all data from the cache'
for key in list(self.keys()):
self.remove(key)
def items(self) -> dict:
'Return cached items as a dict'
return [[k, v.data] for k,v in super().items()]
@ -123,17 +129,16 @@ class BaseCache(OrderedDict):
if not item:
return
with self._lock:
if self.ttl:
timestamp = int(datetime.timestamp(datetime.now()))
if self.ttl:
timestamp = int(datetime.timestamp(datetime.now()))
if timestamp >= self[key].timestamp:
del self[key]
return
if timestamp >= self[key].timestamp:
del self[key]
return
self[key]['timestamp'] = timestamp + self.ttl
self[key]['timestamp'] = timestamp + self.ttl
self.move_to_end(key)
self.move_to_end(key)
return item['data']

View file

@ -298,7 +298,7 @@ class JsonEncoder(json.JSONEncoder):
# Only implemented to disable the docstring. There's probably a better way to do this tbh
def __init__(self, *args, **kwargs):
''
super().__inti__(*args, **kwargs)
super().__init__(*args, **kwargs)
def default(self, obj):

View file

@ -44,8 +44,8 @@ class Config(BaseConfig):
headers = CapitalDotDict({'User-Agent': f'IzzyLib/{__version__}'}),
appagent = None,
timeout = 60,
request_class = Request,
response_class = Response,
request_class = HttpClientRequest,
response_class = HttpClientResponse,
proxy_type = 'https',
proxy_host = None,
proxy_port = None

View file

@ -16,6 +16,8 @@ class HttpClientRequest(PyRequest):
def __init__(self, url:str, body:bytes=None, headers:dict={}, cookies:dict={}, method:str='GET'):
'An HTTP request. Headers can be accessed, set, or deleted as dict items.'
super().__init__(url)
self._url = None
self._headers = CapitalDotDict(headers)
@ -75,8 +77,8 @@ class HttpClientRequest(PyRequest):
@body.setter
def body(self, data):
self._body = convert_to_bytes(data)
self.set_header('Content-Length', len(self._body))
self._data = convert_to_bytes(data)
#self.set_header('Content-Length', len(self._data))
@property
@ -104,7 +106,7 @@ class HttpClientRequest(PyRequest):
@property
def host(self):
'Domain the request will be sent to.'
return self.url.host_full
return self.url.host
@property
@ -136,12 +138,12 @@ class HttpClientRequest(PyRequest):
@property
def url(self) -> Url:
'The URL of the request'
return self._url
return Url(self.full_url)
@url.setter
def url(self, url):
self._url = Url(url)
self.full_url = Url(url)
def set_chunked(self, value:True):

View file

@ -12,11 +12,12 @@ import string
import time
import timeit
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from getpass import getpass, getuser
from importlib import util
from subprocess import Popen, PIPE
from urllib.parse import urlparse, quote, ParseResult
from urllib.parse import urlparse, quote, quote_plus
from urllib.request import urlopen
from . import izzylog
from .dotdict import DotDict
@ -143,10 +144,12 @@ def check_pid(pid: int) -> bool:
def class_name(cls:object) -> str:
'Get the name of a class'
try:
return cls.__name__
name = cls.__name__
except AttributeError:
return type(cls).__name__
name = type(cls).__name__
return name.split('.')[-1]
def convert_to_boolean(value:Union[str,bool,int,type(None)], return_value:bool=False) -> Union[bool,Any]:
@ -724,14 +727,17 @@ class Url(str):
def __init__(self, url):
self._tldcache = None
self._tldcache_path = Path.cache.join('icann_public_suffix_list.txt')
if isinstance(url, str):
parsed = urlparse(url)
else:
parsed = url
if not all([parsed.scheme, parsed.netloc]):
raise TypeError('Not a valid url')
#if not all([parsed.scheme, parsed.netloc]):
#raise TypeError('Not a valid url')
self._parsed = parsed
self.proto = parsed.scheme
@ -744,9 +750,36 @@ class Url(str):
self.anchor = parsed.fragment
try:
self.host = parsed.netloc.split('@')[1]
self.domain = parsed.netloc.split('@')[1]
except:
self.host = parsed.netloc
self.domain = parsed.netloc
@property
def top(self) -> str:
'Returns the domain without sub-domains'
if not self._tldcache:
if not self._tldcache_path.exists() or self._tldcache_path.mtime + timedelta(hours=24) < datetime.now():
resp = urlopen('https://publicsuffix.org/list/public_suffix_list.dat')
with self._tldcache_path.open('w') as fd:
for line in resp.read().decode('utf-8').splitlines():
if 'end icann domains' in line.lower():
break
if not line or line.startswith('//'):
continue
if line.startswith('*'):
line = line[2:]
fd.write(line + '\n')
with self._tldcache_path.open() as fd:
self._tldcache = set(fd.readlines())
@property
@ -765,20 +798,20 @@ class Url(str):
@property
def host_full(self) -> str:
'The hostname and, if set, port as a string'
def host(self) -> str:
'The domain and, if set, port as a string'
if self.port:
return f'{self.host}:{self.port}'
return f'{self.domain}:{self.port}'
return self.host
return self.domain
@property
def without_query(self) -> str:
'Return the url without the query or anchor on the end'
return self.split('?')[0]
return Url(self.split('?')[0])
@property
@ -787,7 +820,7 @@ class Url(str):
return DotDict(
proto = self.proto,
host = self.host,
domain = self.domain,
port = self.port,
path = self.path,
query = self.query,
@ -798,7 +831,7 @@ class Url(str):
@classmethod
def new(cls, host:str, path:Union[Path,str]='/', proto:str='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None):
def new(cls, domain:str, path:Union[Path,str]='/', proto:str='https', port:int=None, query:dict=None, username:str=None, password:str=None, anchor:str=None):
'Create a new `Url` based on the url parts'
if port == protocol_ports.get(proto):
port = None
@ -811,7 +844,7 @@ class Url(str):
elif username:
url += f'{username}@'
url += host
url += domain
if port:
url += f':{port}'
@ -831,11 +864,22 @@ class Url(str):
'Add to the path portion of the url'
data = self.dict
host = data.pop('host')
domain = data.pop('domain')
data['path'] = data['path'].join(new_path)
return self.new(host, **data)
return self.new(domain, **data)
def replace_property(self, key, value):
data = self.dict
if key not in data.keys():
raise KeyError('Invalid Url property')
data[key] = value
return self.new(*data)
# compat

View file

@ -1,4 +1,4 @@
import json, os, shutil, sys
import enum, json, os, shutil, sys
from datetime import datetime
from functools import cached_property
@ -13,6 +13,19 @@ linux_prefix = dict(
)
class TinyDotDict(dict):
__getattr__ = dict.__getitem__
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class PathType(enum.Enum):
DIR = 'dir'
FILE = 'file'
LINK = 'link'
UNKNOWN = 'unknown'
class PathMeta(type):
@property
def home(cls):
@ -55,15 +68,12 @@ class PathMeta(type):
class Path(str, metaclass=PathMeta):
def __init__(self, path=os.getcwd(), exist=True, missing=True, parents=True):
if not (parents or exist):
self.__check_dir(path)
## todo: move these to direct properties of Path
self.config = {
self.config = TinyDotDict({
'missing': missing,
'parents': parents,
'exist': exist
}
})
def __enter__(self):
@ -89,46 +99,16 @@ class Path(str, metaclass=PathMeta):
def __check_dir(self, path=None):
target = self if not path else Path(path)
if not self.parents and not target.parent.exists():
if not self.config.parents and not target.parent.exists():
raise FileNotFoundError('Parent directories do not exist:', target)
if not self.exist and target.exists():
if not self.config.exist and target.exists():
raise FileExistsError('File or directory already exists:', target)
@property
def missing(self):
return self.config.missing
@property
def exist(self):
return self.config.exist
@property
def parents(self):
return self.config.parents
@cached_property
def isdir(self):
return os.path.isdir(self)
@cached_property
def isfile(self):
return os.path.isfile(self)
@cached_property
def islink(self):
return os.path.islink(self)
@property
def mtime(self):
return os.path.getmtime(self)
return datetime.fromtimestamp(os.path.getmtime(self))
@cached_property
@ -148,12 +128,12 @@ class Path(str, metaclass=PathMeta):
@cached_property
def stem(self):
return os.path.basename(self).split('.')[0]
return os.path.splitext(self.name)[0]
@cached_property
def suffix(self):
return os.path.splitext(self)[1]
return os.path.splitext(self.name)[1]
def append(self, text):
@ -161,7 +141,7 @@ class Path(str, metaclass=PathMeta):
def backup(self, ext='backup', overwrite=False):
target = f'{self.parent}.{ext}'
target = f'{self}.{ext}'
self.copy(target, overwrite)
return Path(target)
@ -187,7 +167,7 @@ class Path(str, metaclass=PathMeta):
except FileNotFoundError:
pass
elif not self.exist:
elif not self.config.exist:
raise FileExistsError(target)
shutil.copy2(self, target)
@ -196,7 +176,7 @@ class Path(str, metaclass=PathMeta):
def delete(self):
if not self.exists():
if not self.exist:
if not self.config.exist:
raise FileNotFoundError(self)
return
@ -218,6 +198,19 @@ class Path(str, metaclass=PathMeta):
return Path(os.path.expanduser(self))
def get_type(self):
if os.path.isfile(self):
return PathType.FILE
elif os.path.isdir(self):
return PathType.DIR
elif os.path.islink(self):
return PathType.LINK
return PathType.UNKNOWN
def glob(self, pattern='*', recursive=True):
paths = PyPath(self).rglob(pattern) if recursive else PyPath(self).glob(pattern)
@ -225,6 +218,22 @@ class Path(str, metaclass=PathMeta):
yield self.join(path)
def isdir(self):
return self.istype(PathType.DIR)
def isfile(self):
return self.istype(PathType.FILE)
def islink(self):
return self.istype(PathType.LINK)
def istype(self, file_type):
return self.get_type() == file_type
def join(self, *paths, **kwargs):
return Path(os.path.join(self, *paths), **kwargs)
@ -245,7 +254,7 @@ class Path(str, metaclass=PathMeta):
self.__check_dir(path)
if target.exists():
if not self.exist:
if not self.config.exist:
raise FileExistsError(target)
target.delete()
@ -262,11 +271,11 @@ class Path(str, metaclass=PathMeta):
def mkdir(self, mode=0o755):
if self.parents:
os.makedirs(self, mode, exist_ok=self.exist)
if self.config.parents:
os.makedirs(self, mode, exist_ok=self.config.exist)
else:
os.makedir(self, mode, exist_ok=self.exist)
os.makedir(self, mode, exist_ok=self.config.exist)
return self.exists