This repository has been archived on 2023-02-02. You can view files and clone it, but cannot push or open issues or pull requests.
barkshark-web/barkshark_web/passwords/gnome_keyring.py

312 lines
6.2 KiB
Python

import json, secretstorage
from datetime import datetime
from secretstorage.collection import Collection, get_collection_by_alias, create_collection
from secretstorage.exceptions import ItemNotFoundException, LockedException
from .base import PasswordItem, PasswordStorage, PasswordResult
from ..functions import TimeoutCallback, get_app, run_in_gui_thread
pass_store_keys = ['username', 'domain', 'url', 'note']
pass_keys = [*pass_store_keys, 'created', 'modified', 'label', 'password']
def parse_data(self, username=None, domain=None, password=None, url=None, note=None, label=None, **kwargs):
kwargs.update(
username = username,
domain = domain,
url = url,
note = note,
password = password,
label = label
)
for key, value in tuple(kwargs.items()):
if not value:
del new_data[key]
return kwargs
class GnomeKeyringStorage(PasswordStorage):
def __init__(self, name='BarksharkWeb'):
self.name = name
self.connection = None
self.collection = None
def __getitem__(self, key):
if (item := self.fetch(id=key).one()):
return item
raise KeyError(f'No password with label: {key}')
def __setitem__(self, key, value):
if (item := self[key]):
item.update(**value)
else:
self.insert(label=key, **value)
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.disconnect()
def connect(self):
if self.connection and self.collection:
return
if not self.connection:
self.connection = secretstorage.dbus_init()
if not secretstorage.check_service_availability(self.connection):
self.disconnect()
raise ConnectionError('Failed to connect to Secret Service server')
return self.get_keyring()
def disconnect(self):
if self.connection:
self.connection.close()
self.connection = None
self.collection = None
def get_keyring(self):
if not self.collection:
try:
self.collection = Collection(self.connection, f'/org/freedesktop/secrets/collection/{self.name}')
except ItemNotFoundException:
create_collection(self.connection, self.name)
self.collection = Collection(self.connection, f'/org/freedesktop/secrets/collection/{self.name}')
try:
self.collection.ensure_not_locked()
except LockedException:
self.unlock()
return self.collection
def unlock(self):
return self.collection.unlock()
def lock(self):
return self.collection.lock()
def fetch(self, *args, **kwargs):
data = parse_data(*args, **kwargs)
data.pop('password', None)
if not any(data.values()):
rows = self.collection.get_all_items()
else:
rows = self.collection.search_items(data)
return PasswordResult(self, rows)
def insert(self, *args, label=None, **kwargs):
data = parse_data(*args, **kwargs)
data['id'] = random_str()
required = {key: data.get(key) for key in ['username', 'domain', 'password']}
if None in required.values():
raise ValueError(f'Forgot username, domain, or password: {json.dumps(required)}')
username = data['username']
password = data.pop('password')
if (row := self.fetch(username, password).one()):
logging.verbose('Password already exists:', row.label)
return row
if not label:
label = f'{username} @ {password}'
row = self.collection.create_item(label, data, password.encode())
return PasswordItem(row)
def update(self, username, domain, **kwargs):
row = self.fetch(username, domain).one()
row.update(**kwargs)
return row
def remove(self, *args, **kwargs):
for row in self.fetch(*args, **kwargs):
row.delete()
class GnomeKeyringItem(PasswordItem):
def __init__(self, item):
super().__init__()
self._item = item
self._attrs = item.get_attributes()
if not self._attrs.get('id'):
self.update({'id': random_str()})
def __repr__(self):
return f'PasswordItem(id={self.id}, username={self.username}, domain={self.domain})'
def __getitem__(self, key):
if key not in pass_keys:
raise KeyError(key)
return getattr(self, key)
def __setitem__(self, key, value):
if key not in pass_store_keys:
raise KeyError(key)
self.update(**{key: value})
@property
def created(self):
return datetime.fromtimestamp(self._item.get_created())
@property
def modified(self):
return datetime.fromtimestamp(self._item.get_modified())
@property
def label(self):
if not (label := self._item.get_label()):
return f'{self.username} @ {self.domain}'
return label
@property
def id(self):
return self._attrs.get('id')
@property
def password(self):
return self._item.get_secret().decode()
@property
def username(self):
return self._attrs.get('username')
@property
def domain(self):
return self._attrs.get('domain')
@property
def url(self):
if (url := self._attrs.get('url')):
return Url(url)
@property
def note(self):
return self._attrs.get('note')
@password.setter
def password(self, value):
self.update(password=value)
@username.setter
def username(self, value):
self.update(username=value)
@domain.setter
def domain(self, value):
self.update(domain=value)
@url.setter
def url(self, value):
return self.update(url=str(value))
@note.setter
def note(self, value):
return self.update(note=value)
def copy_password(self, timeout=60):
app = get_app()
app.set_clipboard_text(self.password)
timer = TimeoutCallback(timeout, run_in_gui_thread, app.handle_clipboard_clear_password, self.password)
timer.start()
def copy_username(self):
get_app().set_clipboard_text(self.username)
def as_dict(self):
return DotDict(
id = self.id,
domain = self.domain,
username = self.username,
password = self.password,
url = self.url,
note = self.note,
created = self.created,
modified = self.modified
)
def to_json(self, indent=None):
return self.as_dict().to_json(indent)
def update(self, data):
label = data.pop('label', None)
if (password := data.pop('password', None)):
self._item.set_secret(password.encode())
if not len(data):
return
new_data = parse_data(**self.as_dict())
new_data.update(parse_data(**data))
self._item.set_attributes(new_data)
self._attrs = self._item.get_attributes()
if label:
self._item.set_label(label)
elif any(map(data.get, ['username', 'domain'])) and not self._item.get_label():
self._item.set_label(self.label)
def delete(self):
self._item.delete()