misc: add save_json and load_json to DotDict, logging: complete rework
This commit is contained in:
parent
da85c268bb
commit
86a7737e6e
|
@ -1,198 +1,112 @@
|
|||
'''Simple logging module'''
|
||||
|
||||
import sys
|
||||
|
||||
from os import environ as env
|
||||
from IzzyLib.misc import DotDict
|
||||
from os import getppid, environ as env
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
|
||||
stdout = sys.stdout
|
||||
class Levels(Enum):
|
||||
CRITICAL = 60,
|
||||
ERROR = 50
|
||||
WARNING = 40
|
||||
INFO = 30
|
||||
VERBOSE = 20
|
||||
DEBUG = 10
|
||||
MERP = 0
|
||||
|
||||
|
||||
class Log():
|
||||
def __init__(self, config=dict()):
|
||||
'''setup the logger'''
|
||||
if not isinstance(config, dict):
|
||||
raise TypeError(f'config is not a dict')
|
||||
class Log:
|
||||
__slots__ = [
|
||||
'name', 'level', 'date', 'format',
|
||||
'critical', 'error', 'warning',
|
||||
'info', 'verbose', 'debug', 'merp'
|
||||
]
|
||||
|
||||
self.levels = {
|
||||
'CRIT': 60,
|
||||
'ERROR': 50,
|
||||
'WARN': 40,
|
||||
'INFO': 30,
|
||||
'VERB': 20,
|
||||
'DEBUG': 10,
|
||||
'MERP': 0
|
||||
}
|
||||
def __init__(self, name, **config):
|
||||
self.name = name
|
||||
self.level = Levels.INFO
|
||||
self.date = True
|
||||
self.format = '%Y-%m-%d %H:%M:%S'
|
||||
self.update_config(**config)
|
||||
|
||||
self.long_levels = {
|
||||
'CRITICAL': 'CRIT',
|
||||
'ERROR': 'ERROR',
|
||||
'WARNING': 'WARN',
|
||||
'INFO': 'INFO',
|
||||
'VERBOSE': 'VERB',
|
||||
'DEBUG': 'DEBUG',
|
||||
'MERP': 'MERP'
|
||||
}
|
||||
|
||||
self.config = {'windows': sys.executable.endswith('pythonw.exe')}
|
||||
self.setConfig(self._parseConfig(config))
|
||||
for level in Levels:
|
||||
self._set_log_function(level)
|
||||
|
||||
|
||||
def _lvlCheck(self, level):
|
||||
'''make sure the minimum logging level is an int'''
|
||||
def _set_log_function(self, level):
|
||||
setattr(self, level.name.lower(), lambda *args: self.log(level, *args))
|
||||
|
||||
|
||||
def print(self, *args):
|
||||
sys.stdout.write(' '.join([str(arg) for arg in args]))
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def parse_level(self, level):
|
||||
try:
|
||||
value = int(level)
|
||||
|
||||
return Levels(int(level))
|
||||
except ValueError:
|
||||
level = self.long_levels.get(level.upper(), level)
|
||||
value = self.levels.get(level)
|
||||
|
||||
if value not in self.levels.values():
|
||||
raise InvalidLevel(f'Invalid logging level: {level}')
|
||||
|
||||
return value
|
||||
return getattr(Levels, level.upper())
|
||||
|
||||
|
||||
def _getLevelName(self, level):
|
||||
for name, num in self.levels.items():
|
||||
if level == num:
|
||||
return name
|
||||
|
||||
raise InvalidLevel(f'Invalid logging level: {level}')
|
||||
def update_config(self, **data):
|
||||
for key, value in data.items():
|
||||
self.set_config(key, value)
|
||||
|
||||
|
||||
def _parseConfig(self, config):
|
||||
'''parse the new config and update the old values'''
|
||||
date = config.get('date', self.config.get('date',True))
|
||||
systemd = config.get('systemd', self.config.get('systemd,', True))
|
||||
windows = config.get('windows', self.config.get('windows', False))
|
||||
def set_config(self, key, value):
|
||||
if key == 'level' and type(value) == str:
|
||||
value = self.parse_level(value)
|
||||
|
||||
if not isinstance(date, bool):
|
||||
raise TypeError(f'value for "date" is not a boolean: {date}')
|
||||
|
||||
if not isinstance(systemd, bool):
|
||||
raise TypeError(f'value for "systemd" is not a boolean: {date}')
|
||||
|
||||
level_num = self._lvlCheck(config.get('level', self.config.get('level', 'INFO')))
|
||||
|
||||
newconfig = {
|
||||
'level': self._getLevelName(level_num),
|
||||
'levelnum': level_num,
|
||||
'datefmt': config.get('datefmt', self.config.get('datefmt', '%Y-%m-%d %H:%M:%S')),
|
||||
'date': date,
|
||||
'systemd': systemd,
|
||||
'windows': windows,
|
||||
'systemnotif': config.get('systemnotif', None)
|
||||
}
|
||||
|
||||
return newconfig
|
||||
setattr(self, key, value)
|
||||
|
||||
|
||||
def setConfig(self, config):
|
||||
'''set the config'''
|
||||
self.config = self._parseConfig(config)
|
||||
def get_config(self, key):
|
||||
return self[key]
|
||||
|
||||
|
||||
def getConfig(self, key=None):
|
||||
'''return the current config'''
|
||||
if key:
|
||||
if self.config.get(key):
|
||||
return self.config.get(key)
|
||||
else:
|
||||
raise ValueError(f'Invalid config option: {key}')
|
||||
return self.config
|
||||
|
||||
|
||||
def printConfig(self):
|
||||
for k,v in self.config.items():
|
||||
stdout.write(f'{k}: {v}\n')
|
||||
|
||||
stdout.flush()
|
||||
|
||||
|
||||
def setLevel(self, level):
|
||||
self.minimum = self._lvlCheck(level)
|
||||
def print_config(self):
|
||||
self.print(*(f'{k}: {v}\n' for k,v in self.items()))
|
||||
|
||||
|
||||
def log(self, level, *msg):
|
||||
if self.config['windows']:
|
||||
if level.value < self.level.value:
|
||||
return
|
||||
|
||||
'''log to the console'''
|
||||
levelNum = self._lvlCheck(level)
|
||||
default = self.name == 'Default'
|
||||
options = [
|
||||
level.name + ':',
|
||||
' '.join([str(message) for message in msg]),
|
||||
'\n'
|
||||
]
|
||||
|
||||
if type(level) == int:
|
||||
level = _getLevelName(level)
|
||||
if self.date and not getppid() == 1:
|
||||
options.insert(0, datetime.now().strftime(self.format))
|
||||
|
||||
if levelNum < self.config['levelnum']:
|
||||
return
|
||||
if not default:
|
||||
options.insert(0 if not self.date else 1, f'[{self.name}]')
|
||||
|
||||
message = ' '.join([str(message) for message in msg])
|
||||
output = f'{level}: {message}\n'
|
||||
|
||||
if self.config['systemnotif']:
|
||||
self.config['systemnotif'].New(level, message)
|
||||
|
||||
if self.config['date'] and (self.config['systemd'] and not env.get('INVOCATION_ID')):
|
||||
'''only show date when not running in systemd and date var is True'''
|
||||
date = datetime.now().strftime(self.config['datefmt'])
|
||||
output = f'{date} {output}'
|
||||
|
||||
stdout.write(output)
|
||||
stdout.flush()
|
||||
self.print(*options)
|
||||
|
||||
|
||||
def critical(self, *msg):
|
||||
self.log('CRIT', *msg)
|
||||
def get_logger(name, **config):
|
||||
try:
|
||||
return logger[name.lower()]
|
||||
|
||||
def error(self, *msg):
|
||||
self.log('ERROR', *msg)
|
||||
|
||||
def warning(self, *msg):
|
||||
self.log('WARN', *msg)
|
||||
|
||||
def info(self, *msg):
|
||||
self.log('INFO', *msg)
|
||||
|
||||
def verbose(self, *msg):
|
||||
self.log('VERB', *msg)
|
||||
|
||||
def debug(self, *msg):
|
||||
self.log('DEBUG', *msg)
|
||||
|
||||
def merp(self, *msg):
|
||||
self.log('MERP', *msg)
|
||||
|
||||
|
||||
def getLogger(loginst, config=None):
|
||||
'''get a logging instance and create one if it doesn't exist'''
|
||||
Logger = logger.get(loginst)
|
||||
|
||||
if not Logger:
|
||||
if config:
|
||||
logger[loginst] = Log(config)
|
||||
|
||||
else:
|
||||
raise InvalidLogger(f'logger "{loginst}" doesn\'t exist')
|
||||
|
||||
return logger[loginst]
|
||||
|
||||
class InvalidLevel(Exception):
|
||||
'''Raise when an invalid logging level was specified'''
|
||||
|
||||
class InvalidLogger(Exception):
|
||||
'''Raise when the specified logger doesn't exist'''
|
||||
except KeyError:
|
||||
log = Log(name, **config)
|
||||
logger[name.lower()] = log
|
||||
return log
|
||||
|
||||
|
||||
'''create a default logger'''
|
||||
logger = {
|
||||
'default': Log()
|
||||
'default': Log('Default')
|
||||
}
|
||||
|
||||
DefaultLog = logger['default']
|
||||
|
||||
|
||||
'''aliases for default logger's log output functions'''
|
||||
critical = DefaultLog.critical
|
||||
error = DefaultLog.error
|
||||
|
@ -203,7 +117,7 @@ debug = DefaultLog.debug
|
|||
merp = DefaultLog.merp
|
||||
|
||||
'''aliases for the default logger's config functions'''
|
||||
setConfig = DefaultLog.setConfig
|
||||
getConfig = DefaultLog.getConfig
|
||||
setLevel = DefaultLog.setLevel
|
||||
printConfig = DefaultLog.printConfig
|
||||
update_config = DefaultLog.update_config
|
||||
set_config = DefaultLog.set_config
|
||||
get_config = DefaultLog.get_config
|
||||
print_config = DefaultLog.print_config
|
||||
|
|
|
@ -230,10 +230,6 @@ class DotDict(dict):
|
|||
return (k, v)
|
||||
|
||||
|
||||
def update(self, data):
|
||||
super().update(data)
|
||||
|
||||
|
||||
def get(self, key, default=None):
|
||||
value = dict.get(self, key, default)
|
||||
return DotDict(value) if type(value) == dict else value
|
||||
|
@ -275,6 +271,20 @@ class DotDict(dict):
|
|||
self.update(data)
|
||||
|
||||
|
||||
def load_json(self, path):
|
||||
path = Path(path)
|
||||
|
||||
with path.open('r') as fd:
|
||||
self.update(json.load(fd))
|
||||
|
||||
|
||||
def save_json(self, path, indent=None):
|
||||
path = Path(path)
|
||||
|
||||
with path.open('w') as fd:
|
||||
json.dump(self, fd, indent=indent)
|
||||
|
||||
|
||||
class DefaultDict(DotDict):
|
||||
def __getattr__(self, key):
|
||||
try:
|
||||
|
|
Loading…
Reference in a new issue