651 lines
12 KiB
Python
651 lines
12 KiB
Python
'''Miscellaneous functions'''
|
|
import hashlib, random, string, sys, os, json, socket, time
|
|
|
|
from os import environ as env
|
|
from datetime import datetime
|
|
from getpass import getpass
|
|
from importlib import util
|
|
from pathlib import Path as Pathlib
|
|
from shutil import copyfile, rmtree
|
|
|
|
from . import logging
|
|
|
|
try:
|
|
from passlib.hash import argon2
|
|
except ImportError:
|
|
argon2 = None
|
|
|
|
|
|
def Boolean(v, return_value=False):
|
|
if type(v) not in [str, bool, int, type(None)]:
|
|
raise ValueError(f'Value is not a string, boolean, int, or nonetype: {value}')
|
|
|
|
'''make the value lowercase if it's a string'''
|
|
value = v.lower() if isinstance(v, str) else v
|
|
|
|
if value in [1, True, 'on', 'y', 'yes', 'true', 'enable']:
|
|
'''convert string to True'''
|
|
return True
|
|
|
|
if value in [0, False, None, 'off', 'n', 'no', 'false', 'disable', '']:
|
|
'''convert string to False'''
|
|
return False
|
|
|
|
if return_value:
|
|
'''just return the value'''
|
|
return v
|
|
|
|
return True
|
|
|
|
|
|
def RandomGen(length=20, letters=True, digits=True, extra=None):
|
|
if not isinstance(length, int):
|
|
raise TypeError(f'Character length must be an integer, not {type(length)}')
|
|
|
|
characters = ''
|
|
|
|
if letters:
|
|
characters += string.ascii_letters
|
|
|
|
if digits:
|
|
characters += string.digits
|
|
|
|
if extra:
|
|
characters += extra
|
|
|
|
return ''.join(random.choices(characters, k=length))
|
|
|
|
|
|
def HashString(string, alg='blake2s'):
|
|
if alg not in hashlib.__always_supported:
|
|
logging.error('Unsupported hash algorithm:', alg)
|
|
logging.error('Supported algs:', ', '.join(hashlib.__always_supported))
|
|
return
|
|
|
|
string = string.encode('UTF-8') if type(string) != bytes else string
|
|
salt = salt.encode('UTF-8') if type(salt) != bytes else salt
|
|
|
|
newhash = hashlib.new(alg)
|
|
newhash.update(string)
|
|
return newhash.hexdigest()
|
|
|
|
|
|
def Timestamp(dtobj=None, utc=False):
|
|
dtime = dtobj if dtobj else datetime
|
|
date = dtime.utcnow() if utc else dtime.now()
|
|
|
|
return date.timestamp()
|
|
|
|
|
|
def GetVarName(*kwargs, single=True):
|
|
keys = list(kwargs.keys())
|
|
return key[0] if single else keys
|
|
|
|
|
|
def ApDate(date=None, alt=False):
|
|
if not date:
|
|
date = datetime.utcnow()
|
|
|
|
elif type(date) == int:
|
|
date = datetime.fromtimestamp(date)
|
|
|
|
elif type(date) != datetime:
|
|
raise TypeError(f'Unsupported object type for ApDate: {type(date)}')
|
|
|
|
return date.strftime('%a, %d %b %Y %H:%M:%S GMT' if alt else '%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
|
def GetIp():
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
try:
|
|
s.connect(('10.255.255.255', 1))
|
|
data = s.getsockname()
|
|
ip = data[0]
|
|
|
|
except Exception:
|
|
ip = '127.0.0.1'
|
|
|
|
finally:
|
|
s.close()
|
|
|
|
return ip
|
|
|
|
|
|
def ImportFromPath(mod_path):
|
|
mod_path = Path(mod_path)
|
|
path = mod_path.join('__init__.py') if mod_path.isdir() else mod_path
|
|
name = path.name.replace('.py', '', -1)
|
|
|
|
spec = util.spec_from_file_location(name, path.str())
|
|
module = util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
def Input(prompt, default=None, valtype=str, options=[], password=False):
|
|
input_func = getpass if password else input
|
|
|
|
if default != None:
|
|
prompt += ' [-redacted-]' if password else f' [{default}]'
|
|
|
|
prompt += '\n'
|
|
|
|
if options:
|
|
opt = '/'.join(options)
|
|
prompt += f'[{opt}]'
|
|
|
|
prompt += ': '
|
|
value = input_func(prompt)
|
|
|
|
while value and len(options) > 0 and value not in options:
|
|
input_func('Invalid value:', value)
|
|
value = input(prompt)
|
|
|
|
if not value or value == '':
|
|
return default
|
|
|
|
ret = valtype(value)
|
|
|
|
while valtype == Path and not ret.parent().exists():
|
|
input_func('Parent directory doesn\'t exist')
|
|
ret = Path(input(prompt))
|
|
|
|
return ret
|
|
|
|
|
|
def NfsCheck(path):
|
|
proc = Path('/proc/mounts')
|
|
path = Path(path).resolve()
|
|
|
|
if not proc.exists():
|
|
return True
|
|
|
|
with proc.open() as fd:
|
|
for line in fd:
|
|
line = line.split()
|
|
|
|
if line[2] == 'nfs' and line[1] in path.str():
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def PortCheck(port, address='127.0.0.1', tcp=True):
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) as s:
|
|
try:
|
|
return not s.connect_ex((address, port)) == 0
|
|
|
|
except socket.error as e:
|
|
print(e)
|
|
return False
|
|
|
|
|
|
def PrintMethods(object, include_underscore=False):
|
|
for line in dir(object):
|
|
if line.startswith('_'):
|
|
if include_underscore:
|
|
print(line)
|
|
|
|
else:
|
|
print(line)
|
|
|
|
|
|
class Connection(socket.socket):
|
|
def __init__(self, address='127.0.0.1', port=8080, tcp=True):
|
|
super().__init__(socket.AF_INET, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM)
|
|
self.address = address
|
|
self.port = port
|
|
|
|
|
|
def __enter__(self):
|
|
self.connect((self.address, self.port))
|
|
return self
|
|
|
|
|
|
def __exit__(self, exctype, value, tb):
|
|
self.close()
|
|
|
|
|
|
def send(self, msg):
|
|
self.sendall(msg)
|
|
|
|
|
|
def recieve(self, size=8192):
|
|
return self.recv(size)
|
|
|
|
|
|
class DotDict(dict):
|
|
def __init__(self, value=None, **kwargs):
|
|
'''Python dictionary, but variables can be set/get via attributes
|
|
|
|
value [str, bytes, dict]: JSON or dict of values to init with
|
|
case_insensitive [bool]: Wether keys should be case sensitive or not
|
|
kwargs: key/value pairs to set on init. Overrides identical keys set by 'value'
|
|
'''
|
|
super().__init__()
|
|
|
|
if isinstance(value, (str, bytes)):
|
|
self.fromJson(value)
|
|
|
|
elif isinstance(value, dict) or isinstance(value, list):
|
|
self.update(value)
|
|
|
|
elif value:
|
|
raise TypeError('The value must be a JSON string, list, dict, or another DotDict object, not', value.__class__)
|
|
|
|
if kwargs:
|
|
self.update(kwargs)
|
|
|
|
|
|
def __getattr__(self, key):
|
|
try:
|
|
val = super().__getattribute__(key)
|
|
|
|
except AttributeError:
|
|
val = self.get(key, KeyError)
|
|
|
|
try:
|
|
if val == KeyError:
|
|
raise KeyError(f'Invalid key: {key}')
|
|
|
|
except AttributeError:
|
|
'PyCryptodome.PublicKey.RSA.RsaKey.__eq__ does not seem to play nicely'
|
|
|
|
return DotDict(val) if type(val) == dict else val
|
|
|
|
|
|
def __delattr__(self, key):
|
|
if self.get(key):
|
|
del self[key]
|
|
|
|
super().__delattr__(key)
|
|
|
|
|
|
def __setattr__(self, key, value):
|
|
if key.startswith('_'):
|
|
super().__setattr__(key, value)
|
|
|
|
else:
|
|
super().__setitem__(key, value)
|
|
|
|
|
|
def __str__(self):
|
|
return self.toJson()
|
|
|
|
|
|
def __parse_item__(self, k, v):
|
|
if type(v) == dict:
|
|
v = DotDict(v)
|
|
|
|
if not k.startswith('_'):
|
|
return (k, v)
|
|
|
|
|
|
def get(self, key, default=None):
|
|
value = dict.get(self, key, default)
|
|
return DotDict(value) if type(value) == dict else value
|
|
|
|
|
|
def items(self):
|
|
data = []
|
|
|
|
for k, v in super().items():
|
|
new = self.__parse_item__(k, v)
|
|
|
|
if new:
|
|
data.append(new)
|
|
|
|
return data
|
|
|
|
|
|
def values(self):
|
|
return list(super().values())
|
|
|
|
|
|
def keys(self):
|
|
return list(super().keys())
|
|
|
|
|
|
def asDict(self):
|
|
return dict(self)
|
|
|
|
|
|
def toJson(self, indent=None, **kwargs):
|
|
if 'cls' not in kwargs:
|
|
kwargs['cls'] = JsonEncoder
|
|
|
|
return json.dumps(dict(self), indent=indent, **kwargs)
|
|
|
|
|
|
def fromJson(self, string):
|
|
data = json.loads(string)
|
|
self.update(data)
|
|
|
|
|
|
def load_json(self, path: str=None):
|
|
self.update(Path(path).load_json())
|
|
|
|
|
|
def save_json(self, path: str, **kwargs):
|
|
with Path(path).open(w) as fd:
|
|
write(self.toJson(*kwargs))
|
|
|
|
|
|
class DefaultDict(DotDict):
|
|
def __getattr__(self, key):
|
|
try:
|
|
val = super().__getattribute__(key)
|
|
|
|
except AttributeError:
|
|
val = self.get(key, DefaultDict())
|
|
|
|
return DotDict(val) if type(val) == dict else val
|
|
|
|
|
|
class LowerDotDict(DotDict):
|
|
def __getattr__(self, key):
|
|
key = key.lower()
|
|
|
|
try:
|
|
val = super().__getattribute__(key)
|
|
|
|
except AttributeError:
|
|
val = self.get(key, KeyError)
|
|
|
|
if val == KeyError:
|
|
raise KeyError(f'Invalid key: {key}')
|
|
|
|
return DotDict(val) if type(val) == dict else val
|
|
|
|
|
|
def __setattr__(self, key, value):
|
|
key = key.lower()
|
|
|
|
if key.startswith('_'):
|
|
super().__setattr__(key, value)
|
|
|
|
else:
|
|
super().__setitem__(key, value)
|
|
|
|
|
|
def update(self, data):
|
|
data = {k.lower(): v for k,v in self.items()}
|
|
|
|
super().update(data)
|
|
|
|
|
|
class Path(object):
|
|
def __init__(self, path, exist=True, missing=True, parents=True):
|
|
self.__path = Pathlib(str(path))
|
|
|
|
if str(path).startswith('~'):
|
|
self.__path = self.__path.expanduser()
|
|
|
|
self.json = DotDict()
|
|
self.exist = exist
|
|
self.missing = missing
|
|
self.parents = parents
|
|
self.name = self.__path.name
|
|
self.stem = self.__path.stem
|
|
|
|
|
|
def __str__(self):
|
|
return str(self.__path)
|
|
|
|
|
|
def __repr__(self):
|
|
return f'Path({str(self.__path)})'
|
|
|
|
|
|
def str(self):
|
|
return self.__str__()
|
|
|
|
|
|
def __check_dir(self, path=None):
|
|
target = self if not path else Path(path)
|
|
|
|
if not self.parents and not target.parent().exists():
|
|
raise FileNotFoundError('Parent directories do not exist:', target.str())
|
|
|
|
if not self.exist and target.exists():
|
|
raise FileExistsError('File or directory already exists:', target.str())
|
|
|
|
|
|
def __parse_perm_octal(self, mode):
|
|
return mode if type(mode) == oct else eval(f'0o{mode}')
|
|
|
|
|
|
def size(self):
|
|
return self.__path.stat().st_size
|
|
|
|
|
|
def mtime(self):
|
|
return self.__path.stat().st_mtime
|
|
|
|
|
|
def mkdir(self, mode=0o755):
|
|
self.__path.mkdir(mode, self.parents, self.exist)
|
|
|
|
return True if self.__path.exists() else False
|
|
|
|
|
|
def new(self):
|
|
return Path(self.__path)
|
|
|
|
|
|
def parent(self, new=True):
|
|
path = Pathlib(self.__path).parent
|
|
|
|
if new:
|
|
return Path(path)
|
|
|
|
self.__path = path
|
|
|
|
return self
|
|
|
|
|
|
def copy(self, path, overwrite=False):
|
|
target = Path(path)
|
|
|
|
self.__check_dir(path)
|
|
|
|
if target.exists() and overwrite:
|
|
target.delete()
|
|
|
|
copyfile(self.str(), target.str())
|
|
|
|
|
|
def backup(self, ext='backup', overwrite=False):
|
|
target = f'{self.__path.parent}.{ext}'
|
|
self.copy(target, overwrite)
|
|
|
|
|
|
def move(self, path, overwrite=False):
|
|
self.copy(path, overwrite=overwrite)
|
|
self.delete()
|
|
|
|
|
|
def join(self, path, new=True):
|
|
new_path = self.__path.joinpath(path)
|
|
|
|
if new:
|
|
return Path(new_path)
|
|
|
|
self.__path = new_path
|
|
|
|
return self
|
|
|
|
|
|
def home(self, path=None, new=True):
|
|
new_path = Pathlib.home()
|
|
|
|
if path:
|
|
new_path = new_path.joinpath(path)
|
|
|
|
if new:
|
|
return Path(new_path)
|
|
|
|
self.__path = new_path
|
|
return self
|
|
|
|
|
|
def isdir(self):
|
|
return self.__path.is_dir()
|
|
|
|
|
|
def isfile(self):
|
|
return self.__path.is_file()
|
|
|
|
|
|
def islink(self):
|
|
return self.__path.is_symlink()
|
|
|
|
|
|
def listdir(self, recursive=True):
|
|
paths = self.__path.iterdir() if recursive else os.listdir(self.__path)
|
|
return [Path(path) for path in paths]
|
|
|
|
|
|
def exists(self):
|
|
return self.__path.exists()
|
|
|
|
|
|
def mtime(self):
|
|
return os.path.getmtime(self.str())
|
|
|
|
|
|
def size(self):
|
|
return self.__path.stat().st_size
|
|
|
|
|
|
def link(self, path):
|
|
target = Path(path)
|
|
|
|
self.__check_dir(path)
|
|
|
|
if target.exists():
|
|
target.delete()
|
|
|
|
self.__path.symlink_to(path, target.isdir())
|
|
|
|
|
|
def resolve(self, new=True):
|
|
path = self.__path.resolve()
|
|
|
|
if new:
|
|
return Path(path)
|
|
|
|
self.__path = path
|
|
return self
|
|
|
|
|
|
def chmod(self, mode=None):
|
|
octal = self.__parse_perm_octal(mode)
|
|
self.__path.chmod(octal)
|
|
|
|
|
|
def touch(self, mode=0o666):
|
|
octal = __parse_perm_octal(mode)
|
|
self.__path.touch(octal, self.exist)
|
|
|
|
return self.exists()
|
|
|
|
|
|
def mkdir(self):
|
|
self.__path.mkdir(parents=self.parents, exist_ok=self.exist)
|
|
|
|
return self.exists()
|
|
|
|
|
|
def load_json(self):
|
|
self.json = DotDict(self.read())
|
|
|
|
return self.json
|
|
|
|
|
|
def save_json(self, indent=None):
|
|
with self.__path.open('w') as fp:
|
|
fp.write(json.dumps(self.json.asDict(), indent=indent, cls=JsonEncoder))
|
|
|
|
|
|
def update_json(self, data={}):
|
|
if type(data) == str:
|
|
data = json.loads(data)
|
|
|
|
self.json.update(data)
|
|
|
|
|
|
def delete(self):
|
|
if self.isdir():
|
|
rmtree(self.__path)
|
|
|
|
else:
|
|
self.__path.unlink()
|
|
|
|
return not self.exists()
|
|
|
|
|
|
def open(self, *args):
|
|
return self.__path.open(*args)
|
|
|
|
|
|
def read(self, *args):
|
|
return self.open().read(*args)
|
|
|
|
|
|
def readlines(self):
|
|
return self.open().readlines()
|
|
|
|
|
|
class JsonEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if not any(map(isinstance, [obj], [str, int, float, dict])):
|
|
return str(obj)
|
|
|
|
return json.JSONEncoder.default(self, obj)
|
|
|
|
|
|
class PasswordHash(object):
|
|
def __init__(self, salt=None, rounds=8, bsize=50, threads=os.cpu_count(), length=64):
|
|
if type(salt) == Path:
|
|
if salt.exists():
|
|
with salt.open() as fd:
|
|
self.salt = fd.read()
|
|
|
|
else:
|
|
newsalt = RandomGen(40)
|
|
|
|
with salt.open('w') as fd:
|
|
fd.write(newsalt)
|
|
|
|
self.salt = newsalt
|
|
|
|
else:
|
|
self.salt = salt or RandomGen(40)
|
|
|
|
self.rounds = rounds
|
|
self.bsize = bsize * 1024
|
|
self.threads = threads
|
|
self.length = length
|
|
|
|
|
|
def hash(self, password):
|
|
return argon2.using(
|
|
salt = self.salt.encode('UTF-8'),
|
|
rounds = self.rounds,
|
|
memory_cost = self.bsize,
|
|
max_threads = self.threads,
|
|
digest_size = self.length
|
|
).hash(password)
|
|
|
|
|
|
def verify(self, password, passhash):
|
|
return argon2.using(
|
|
salt = self.salt.encode('UTF-8'),
|
|
rounds = self.rounds,
|
|
memory_cost = self.bsize,
|
|
max_threads = self.threads,
|
|
digest_size = self.length
|
|
).verify(password, passhash)
|