mirror of
https://github.com/virt-manager/virt-manager.git
synced 2025-01-21 18:03:58 +03:00
79da19ad07
Signed-off-by: Cole Robinson <crobinso@redhat.com>
184 lines
6.1 KiB
Python
184 lines
6.1 KiB
Python
# Copyright (C) 2006, 2013 Red Hat, Inc.
|
|
# Copyright (C) 2006 Daniel P. Berrange <berrange@redhat.com>
|
|
#
|
|
# This work is licensed under the GNU GPLv2 or later.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
from gi.repository import Gio
|
|
from gi.repository import GLib
|
|
|
|
from virtinst import log
|
|
|
|
from ..baseclass import vmmGObject
|
|
|
|
|
|
class _vmmSecret(object):
|
|
def __init__(self, name, secret=None, attributes=None):
|
|
self.name = name
|
|
self.secret = secret
|
|
self.attributes = attributes
|
|
|
|
def get_secret(self):
|
|
return self.secret
|
|
def get_name(self):
|
|
return self.name
|
|
|
|
|
|
class vmmKeyring(vmmGObject):
|
|
"""
|
|
freedesktop Secret API abstraction
|
|
"""
|
|
@classmethod
|
|
def get_instance(cls):
|
|
if not cls._instance:
|
|
cls._instance = vmmKeyring()
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
vmmGObject.__init__(self)
|
|
|
|
self._collection = None
|
|
|
|
try:
|
|
self._dbus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
|
|
self._service = Gio.DBusProxy.new_sync(self._dbus, 0, None,
|
|
"org.freedesktop.secrets",
|
|
"/org/freedesktop/secrets",
|
|
"org.freedesktop.Secret.Service", None)
|
|
|
|
self._session = self._service.OpenSession("(sv)", "plain",
|
|
GLib.Variant("s", ""))[1]
|
|
|
|
self._collection = Gio.DBusProxy.new_sync(self._dbus, 0, None,
|
|
"org.freedesktop.secrets",
|
|
"/org/freedesktop/secrets/aliases/default",
|
|
"org.freedesktop.Secret.Collection", None)
|
|
|
|
log.debug("Using keyring session %s", self._session)
|
|
except Exception: # pragma: no cover
|
|
log.exception("Error determining keyring")
|
|
|
|
def _cleanup(self):
|
|
pass # pragma: no cover
|
|
|
|
def _find_secret_item_path(self, uuid, hvuri):
|
|
attributes = {
|
|
"uuid": uuid,
|
|
"hvuri": hvuri,
|
|
}
|
|
unlocked, locked = self._service.SearchItems("(a{ss})", attributes)
|
|
if not unlocked:
|
|
if locked:
|
|
log.warning( # pragma: no cover
|
|
"Item found, but it's locked")
|
|
return None
|
|
return unlocked[0]
|
|
|
|
def _do_prompt_if_needed(self, path):
|
|
if path == "/":
|
|
return
|
|
iface = Gio.DBusProxy.new_sync( # pragma: no cover
|
|
self._dbus, 0, None,
|
|
"org.freedesktop.secrets", path,
|
|
"org.freedesktop.Secret.Prompt", None)
|
|
iface.Prompt("(s)", "") # pragma: no cover
|
|
|
|
def _add_secret(self, secret):
|
|
try:
|
|
props = {
|
|
"org.freedesktop.Secret.Item.Label": GLib.Variant("s", secret.get_name()),
|
|
"org.freedesktop.Secret.Item.Attributes": GLib.Variant("a{ss}", secret.attributes),
|
|
}
|
|
params = (self._session, [],
|
|
[ord(v) for v in secret.get_secret()],
|
|
"text/plain; charset=utf8")
|
|
replace = True
|
|
|
|
dummy, prompt = self._collection.CreateItem("(a{sv}(oayays)b)",
|
|
props, params, replace)
|
|
self._do_prompt_if_needed(prompt)
|
|
except Exception: # pragma: no cover
|
|
log.exception("Failed to add keyring secret")
|
|
|
|
def _del_secret(self, uuid, hvuri):
|
|
try:
|
|
path = self._find_secret_item_path(uuid, hvuri)
|
|
if path is None:
|
|
return None
|
|
|
|
iface = Gio.DBusProxy.new_sync(self._dbus, 0, None,
|
|
"org.freedesktop.secrets", path,
|
|
"org.freedesktop.Secret.Item", None)
|
|
prompt = iface.Delete()
|
|
self._do_prompt_if_needed(prompt)
|
|
except Exception: # pragma: no cover
|
|
log.exception("Failed to delete keyring secret")
|
|
|
|
def _get_secret(self, uuid, hvuri):
|
|
ret = None
|
|
try:
|
|
path = self._find_secret_item_path(uuid, hvuri)
|
|
if path is None:
|
|
return None
|
|
|
|
iface = Gio.DBusProxy.new_sync(self._dbus, 0, None,
|
|
"org.freedesktop.secrets", path,
|
|
"org.freedesktop.Secret.Item", None)
|
|
|
|
secretbytes = iface.GetSecret("(o)", self._session)[2]
|
|
label = iface.get_cached_property("Label").unpack().strip("'")
|
|
dbusattrs = iface.get_cached_property("Attributes").unpack()
|
|
|
|
secret = "".join([chr(c) for c in secretbytes])
|
|
|
|
attrs = {}
|
|
for key, val in dbusattrs.items():
|
|
if key not in ["hvuri", "uuid"]:
|
|
continue
|
|
attrs["%s" % key] = "%s" % val
|
|
|
|
ret = _vmmSecret(label, secret, attrs)
|
|
except Exception: # pragma: no cover
|
|
log.exception("Failed to get keyring secret uuid=%r hvuri=%r", uuid, hvuri)
|
|
|
|
return ret
|
|
|
|
|
|
##############
|
|
# Public API #
|
|
##############
|
|
|
|
def is_available(self):
|
|
return not (self._collection is None)
|
|
|
|
def _get_secret_name(self, vm):
|
|
return "vm-console-" + vm.get_uuid()
|
|
|
|
def get_console_password(self, vm):
|
|
if not self.is_available():
|
|
return ("", "") # pragma: no cover
|
|
|
|
secret = self._get_secret(vm.get_uuid(), vm.conn.get_uri())
|
|
if secret is None:
|
|
return ("", "") # pragma: no cover
|
|
|
|
return (secret.get_secret(), vm.get_console_username() or "")
|
|
|
|
def set_console_password(self, vm, password, username=""):
|
|
if not self.is_available():
|
|
return # pragma: no cover
|
|
|
|
|
|
secret = _vmmSecret(self._get_secret_name(vm), password,
|
|
{"uuid": vm.get_uuid(),
|
|
"hvuri": vm.conn.get_uri()})
|
|
vm.set_console_username(username)
|
|
self._add_secret(secret)
|
|
|
|
def del_console_password(self, vm):
|
|
if not self.is_available():
|
|
return # pragma: no cover
|
|
|
|
self._del_secret(vm.get_uuid(), vm.conn.get_uri())
|
|
vm.del_console_username()
|