izzylib/IzzyLib/misc.py

651 lines
12 KiB
Python

'''Miscellaneous functions'''
import hashlib, random, string, sys, os, json, socket, time
from os import environ as env
from datetime import datetime
from getpass import getpass
from importlib import util
from pathlib import Path as Pathlib
from shutil import copyfile, rmtree
from . import logging
try:
from passlib.hash import argon2
except ImportError:
argon2 = None
def Boolean(v, return_value=False):
if type(v) not in [str, bool, int, type(None)]:
raise ValueError(f'Value is not a string, boolean, int, or nonetype: {value}')
'''make the value lowercase if it's a string'''
value = v.lower() if isinstance(v, str) else v
if value in [1, True, 'on', 'y', 'yes', 'true', 'enable']:
'''convert string to True'''
return True
if value in [0, False, None, 'off', 'n', 'no', 'false', 'disable', '']:
'''convert string to False'''
return False
if return_value:
'''just return the value'''
return v
return True
def RandomGen(length=20, letters=True, digits=True, extra=None):
if not isinstance(length, int):
raise TypeError(f'Character length must be an integer, not {type(length)}')
characters = ''
if letters:
characters += string.ascii_letters
if digits:
characters += string.digits
if extra:
characters += extra
return ''.join(random.choices(characters, k=length))
def HashString(string, alg='blake2s'):
if alg not in hashlib.__always_supported:
logging.error('Unsupported hash algorithm:', alg)
logging.error('Supported algs:', ', '.join(hashlib.__always_supported))
return
string = string.encode('UTF-8') if type(string) != bytes else string
salt = salt.encode('UTF-8') if type(salt) != bytes else salt
newhash = hashlib.new(alg)
newhash.update(string)
return newhash.hexdigest()
def Timestamp(dtobj=None, utc=False):
dtime = dtobj if dtobj else datetime
date = dtime.utcnow() if utc else dtime.now()
return date.timestamp()
def GetVarName(*kwargs, single=True):
keys = list(kwargs.keys())
return key[0] if single else keys
def ApDate(date=None, alt=False):
if not date:
date = datetime.utcnow()
elif type(date) == int:
date = datetime.fromtimestamp(date)
elif type(date) != datetime:
raise TypeError(f'Unsupported object type for ApDate: {type(date)}')
return date.strftime('%a, %d %b %Y %H:%M:%S GMT' if alt else '%Y-%m-%dT%H:%M:%SZ')
def GetIp():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
data = s.getsockname()
ip = data[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
def ImportFromPath(mod_path):
mod_path = Path(mod_path)
path = mod_path.join('__init__.py') if mod_path.isdir() else mod_path
name = path.name.replace('.py', '', -1)
spec = util.spec_from_file_location(name, path.str())
module = util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def Input(prompt, default=None, valtype=str, options=[], password=False):
input_func = getpass if password else input
if default != None:
prompt += ' [-redacted-]' if password else f' [{default}]'
prompt += '\n'
if options:
opt = '/'.join(options)
prompt += f'[{opt}]'
prompt += ': '
value = input_func(prompt)
while value and len(options) > 0 and value not in options:
input_func('Invalid value:', value)
value = input(prompt)
if not value or value == '':
return default
ret = valtype(value)
while valtype == Path and not ret.parent().exists():
input_func('Parent directory doesn\'t exist')
ret = Path(input(prompt))
return ret
def NfsCheck(path):
proc = Path('/proc/mounts')
path = Path(path).resolve()
if not proc.exists():
return True
with proc.open() as fd:
for line in fd:
line = line.split()
if line[2] == 'nfs' and line[1] in path.str():
return True
return False
def PortCheck(port, address='127.0.0.1', tcp=True):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) as s:
try:
return not s.connect_ex((address, port)) == 0
except socket.error as e:
print(e)
return False
def PrintMethods(object, include_underscore=False):
for line in dir(object):
if line.startswith('_'):
if include_underscore:
print(line)
else:
print(line)
class Connection(socket.socket):
def __init__(self, address='127.0.0.1', port=8080, tcp=True):
super().__init__(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM)
self.address = address
self.port = port
def __enter__(self):
self.connect((self.address, self.port))
return self
def __exit__(self, exctype, value, tb):
self.close()
def send(self, msg):
self.sendall(msg)
def recieve(self, size=8192):
return self.recv(size)
class DotDict(dict):
def __init__(self, value=None, **kwargs):
'''Python dictionary, but variables can be set/get via attributes
value [str, bytes, dict]: JSON or dict of values to init with
case_insensitive [bool]: Wether keys should be case sensitive or not
kwargs: key/value pairs to set on init. Overrides identical keys set by 'value'
'''
super().__init__()
if isinstance(value, (str, bytes)):
self.fromJson(value)
elif isinstance(value, dict) or isinstance(value, list):
self.update(value)
elif value:
raise TypeError('The value must be a JSON string, list, dict, or another DotDict object, not', value.__class__)
if kwargs:
self.update(kwargs)
def __getattr__(self, key):
try:
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, KeyError)
try:
if val == KeyError:
raise KeyError(f'Invalid key: {key}')
except AttributeError:
'PyCryptodome.PublicKey.RSA.RsaKey.__eq__ does not seem to play nicely'
return DotDict(val) if type(val) == dict else val
def __delattr__(self, key):
if self.get(key):
del self[key]
super().__delattr__(key)
def __setattr__(self, key, value):
if key.startswith('_'):
super().__setattr__(key, value)
else:
super().__setitem__(key, value)
def __str__(self):
return self.toJson()
def __parse_item__(self, k, v):
if type(v) == dict:
v = DotDict(v)
if not k.startswith('_'):
return (k, v)
def get(self, key, default=None):
value = dict.get(self, key, default)
return DotDict(value) if type(value) == dict else value
def items(self):
data = []
for k, v in super().items():
new = self.__parse_item__(k, v)
if new:
data.append(new)
return data
def values(self):
return list(super().values())
def keys(self):
return list(super().keys())
def asDict(self):
return dict(self)
def toJson(self, indent=None, **kwargs):
if 'cls' not in kwargs:
kwargs['cls'] = JsonEncoder
return json.dumps(dict(self), indent=indent, **kwargs)
def fromJson(self, string):
data = json.loads(string)
self.update(data)
def load_json(self, path: str=None):
self.update(Path(path).load_json())
def save_json(self, path: str, **kwargs):
with Path(path).open(w) as fd:
write(self.toJson(*kwargs))
class DefaultDict(DotDict):
def __getattr__(self, key):
try:
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, DefaultDict())
return DotDict(val) if type(val) == dict else val
class LowerDotDict(DotDict):
def __getattr__(self, key):
key = key.lower()
try:
val = super().__getattribute__(key)
except AttributeError:
val = self.get(key, KeyError)
if val == KeyError:
raise KeyError(f'Invalid key: {key}')
return DotDict(val) if type(val) == dict else val
def __setattr__(self, key, value):
key = key.lower()
if key.startswith('_'):
super().__setattr__(key, value)
else:
super().__setitem__(key, value)
def update(self, data):
data = {k.lower(): v for k,v in self.items()}
super().update(data)
class Path(object):
def __init__(self, path, exist=True, missing=True, parents=True):
self.__path = Pathlib(str(path))
if str(path).startswith('~'):
self.__path = self.__path.expanduser()
self.json = DotDict()
self.exist = exist
self.missing = missing
self.parents = parents
self.name = self.__path.name
self.stem = self.__path.stem
def __str__(self):
return str(self.__path)
def __repr__(self):
return f'Path({str(self.__path)})'
def str(self):
return self.__str__()
def __check_dir(self, path=None):
target = self if not path else Path(path)
if not self.parents and not target.parent().exists():
raise FileNotFoundError('Parent directories do not exist:', target.str())
if not self.exist and target.exists():
raise FileExistsError('File or directory already exists:', target.str())
def __parse_perm_octal(self, mode):
return mode if type(mode) == oct else eval(f'0o{mode}')
def size(self):
return self.__path.stat().st_size
def mtime(self):
return self.__path.stat().st_mtime
def mkdir(self, mode=0o755):
self.__path.mkdir(mode, self.parents, self.exist)
return True if self.__path.exists() else False
def new(self):
return Path(self.__path)
def parent(self, new=True):
path = Pathlib(self.__path).parent
if new:
return Path(path)
self.__path = path
return self
def copy(self, path, overwrite=False):
target = Path(path)
self.__check_dir(path)
if target.exists() and overwrite:
target.delete()
copyfile(self.str(), target.str())
def backup(self, ext='backup', overwrite=False):
target = f'{self.__path.parent}.{ext}'
self.copy(target, overwrite)
def move(self, path, overwrite=False):
self.copy(path, overwrite=overwrite)
self.delete()
def join(self, path, new=True):
new_path = self.__path.joinpath(path)
if new:
return Path(new_path)
self.__path = new_path
return self
def home(self, path=None, new=True):
new_path = Pathlib.home()
if path:
new_path = new_path.joinpath(path)
if new:
return Path(new_path)
self.__path = new_path
return self
def isdir(self):
return self.__path.is_dir()
def isfile(self):
return self.__path.is_file()
def islink(self):
return self.__path.is_symlink()
def listdir(self, recursive=True):
paths = self.__path.iterdir() if recursive else os.listdir(self.__path)
return [Path(path) for path in paths]
def exists(self):
return self.__path.exists()
def mtime(self):
return os.path.getmtime(self.str())
def size(self):
return self.__path.stat().st_size
def link(self, path):
target = Path(path)
self.__check_dir(path)
if target.exists():
target.delete()
self.__path.symlink_to(path, target.isdir())
def resolve(self, new=True):
path = self.__path.resolve()
if new:
return Path(path)
self.__path = path
return self
def chmod(self, mode=None):
octal = self.__parse_perm_octal(mode)
self.__path.chmod(octal)
def touch(self, mode=0o666):
octal = __parse_perm_octal(mode)
self.__path.touch(octal, self.exist)
return self.exists()
def mkdir(self):
self.__path.mkdir(parents=self.parents, exist_ok=self.exist)
return self.exists()
def load_json(self):
self.json = DotDict(self.read())
return self.json
def save_json(self, indent=None):
with self.__path.open('w') as fp:
fp.write(json.dumps(self.json.asDict(), indent=indent, cls=JsonEncoder))
def update_json(self, data={}):
if type(data) == str:
data = json.loads(data)
self.json.update(data)
def delete(self):
if self.isdir():
rmtree(self.__path)
else:
self.__path.unlink()
return not self.exists()
def open(self, *args):
return self.__path.open(*args)
def read(self, *args):
return self.open().read(*args)
def readlines(self):
return self.open().readlines()
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
if not any(map(isinstance, [obj], [str, int, float, dict])):
return str(obj)
return json.JSONEncoder.default(self, obj)
class PasswordHash(object):
def __init__(self, salt=None, rounds=8, bsize=50, threads=os.cpu_count(), length=64):
if type(salt) == Path:
if salt.exists():
with salt.open() as fd:
self.salt = fd.read()
else:
newsalt = RandomGen(40)
with salt.open('w') as fd:
fd.write(newsalt)
self.salt = newsalt
else:
self.salt = salt or RandomGen(40)
self.rounds = rounds
self.bsize = bsize * 1024
self.threads = threads
self.length = length
def hash(self, password):
return argon2.using(
salt = self.salt.encode('UTF-8'),
rounds = self.rounds,
memory_cost = self.bsize,
max_threads = self.threads,
digest_size = self.length
).hash(password)
def verify(self, password, passhash):
return argon2.using(
salt = self.salt.encode('UTF-8'),
rounds = self.rounds,
memory_cost = self.bsize,
max_threads = self.threads,
digest_size = self.length
).verify(password, passhash)