mirror of
https://github.com/virt-manager/virt-manager.git
synced 2025-01-12 09:18:00 +03:00
416 lines
13 KiB
Python
416 lines
13 KiB
Python
|
#
|
||
|
# XML API wrappers
|
||
|
#
|
||
|
# This program is free software; you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU General Public License as published by
|
||
|
# the Free Software Foundation; either version 2 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU General Public License
|
||
|
# along with this program; if not, write to the Free Software
|
||
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||
|
# MA 02110-1301 USA.
|
||
|
|
||
|
import libxml2
|
||
|
|
||
|
from . import util
|
||
|
|
||
|
# pylint: disable=protected-access
|
||
|
|
||
|
|
||
|
class _XPathSegment(object):
|
||
|
"""
|
||
|
Class representing a single 'segment' of an xpath string. For example,
|
||
|
the xpath:
|
||
|
|
||
|
./qemu:foo/bar[1]/baz[@somepro='someval']/@finalprop
|
||
|
|
||
|
will be split into the following segments:
|
||
|
|
||
|
#1: nodename=., fullsegment=.
|
||
|
#2: nodename=foo, nsname=qemu, fullsegment=qemu:foo
|
||
|
#3: nodename=bar, condition_num=1, fullsegment=bar[1]
|
||
|
#4: nodename=baz, condition_prop=somepro, condition_val=someval,
|
||
|
fullsegment=baz[@somepro='somval']
|
||
|
#5: nodename=finalprop, is_prop=True, fullsegment=@finalprop
|
||
|
"""
|
||
|
def __init__(self, fullsegment):
|
||
|
self.fullsegment = fullsegment
|
||
|
self.nodename = fullsegment
|
||
|
|
||
|
self.condition_prop = None
|
||
|
self.condition_val = None
|
||
|
self.condition_num = None
|
||
|
if "[" in self.nodename:
|
||
|
self.nodename, cond = self.nodename.strip("]").split("[")
|
||
|
if "=" in cond:
|
||
|
(cprop, cval) = cond.split("=")
|
||
|
self.condition_prop = cprop.strip("@")
|
||
|
self.condition_val = cval.strip("'")
|
||
|
elif cond.isdigit():
|
||
|
self.condtion_num = int(cond)
|
||
|
|
||
|
self.is_prop = self.nodename.startswith("@")
|
||
|
if self.is_prop:
|
||
|
self.nodename = self.nodename[1:]
|
||
|
|
||
|
self.nsname = None
|
||
|
if ":" in self.nodename:
|
||
|
self.nsname, self.nodename = self.nodename.split(":")
|
||
|
|
||
|
|
||
|
class _XPath(object):
|
||
|
"""
|
||
|
Helper class for performing manipulations of XPath strings. Splits
|
||
|
the xpath into segments.
|
||
|
"""
|
||
|
def __init__(self, fullxpath):
|
||
|
self.fullxpath = fullxpath
|
||
|
self.segments = [_XPathSegment(s) for s in self.fullxpath.split("/")]
|
||
|
|
||
|
self.is_prop = self.segments[-1].is_prop
|
||
|
self.propname = (self.is_prop and self.segments[-1].nodename or None)
|
||
|
if self.is_prop:
|
||
|
self.segments = self.segments[:-1]
|
||
|
self.xpath = self.join(self.segments)
|
||
|
|
||
|
@staticmethod
|
||
|
def join(segments):
|
||
|
return "/".join(s.fullsegment for s in segments)
|
||
|
|
||
|
def parent_xpath(self):
|
||
|
return self.join(self.segments[:-1])
|
||
|
|
||
|
|
||
|
class _XMLBase(object):
|
||
|
NAMESPACES = {
|
||
|
"qemu": "http://libvirt.org/schemas/domain/qemu/1.0",
|
||
|
}
|
||
|
|
||
|
def copy_api(self):
|
||
|
raise NotImplementedError()
|
||
|
def count(self, xpath):
|
||
|
raise NotImplementedError()
|
||
|
def _find(self, fullxpath):
|
||
|
raise NotImplementedError()
|
||
|
def _node_tostring(self, node):
|
||
|
raise NotImplementedError()
|
||
|
def _node_get_text(self, node):
|
||
|
raise NotImplementedError()
|
||
|
def _node_set_text(self, node, setval):
|
||
|
raise NotImplementedError()
|
||
|
def _node_get_property(self, node, propname):
|
||
|
raise NotImplementedError()
|
||
|
def _node_set_property(self, node, propname, setval):
|
||
|
raise NotImplementedError()
|
||
|
def _node_new(self, xpathseg):
|
||
|
raise NotImplementedError()
|
||
|
def _node_add_child(self, parentxpath, parentnode, newnode):
|
||
|
raise NotImplementedError()
|
||
|
def _node_remove_child(self, parentnode, childnode):
|
||
|
raise NotImplementedError()
|
||
|
def _node_from_xml(self, xml):
|
||
|
raise NotImplementedError()
|
||
|
def _node_has_content(self, node):
|
||
|
raise NotImplementedError()
|
||
|
def node_clear(self, xpath):
|
||
|
raise NotImplementedError()
|
||
|
def _sanitize_xml(self, xml):
|
||
|
raise NotImplementedError()
|
||
|
|
||
|
def get_xml(self, xpath):
|
||
|
node = self._find(xpath)
|
||
|
if node is None:
|
||
|
return ""
|
||
|
return self._sanitize_xml(self._node_tostring(node))
|
||
|
|
||
|
def get_xpath_content(self, xpath, is_bool):
|
||
|
node = self._find(xpath)
|
||
|
if node is None:
|
||
|
return None
|
||
|
if is_bool:
|
||
|
return True
|
||
|
xpathobj = _XPath(xpath)
|
||
|
if xpathobj.is_prop:
|
||
|
return self._node_get_property(node, xpathobj.propname)
|
||
|
return self._node_get_text(node)
|
||
|
|
||
|
def set_xpath_content(self, xpath, setval):
|
||
|
node = self._find(xpath)
|
||
|
if setval is False:
|
||
|
# Boolean False, means remove the node entirely
|
||
|
self.node_force_remove(xpath)
|
||
|
elif setval is None:
|
||
|
if node is not None:
|
||
|
self._node_set_content(xpath, node, None)
|
||
|
self._node_remove_empty(xpath)
|
||
|
else:
|
||
|
if node is None:
|
||
|
node = self._node_make_stub(xpath)
|
||
|
|
||
|
if setval is True:
|
||
|
# Boolean property, creating the node is enough
|
||
|
return
|
||
|
self._node_set_content(xpath, node, setval)
|
||
|
|
||
|
def node_add_xml(self, xml, xpath):
|
||
|
newnode = self._node_from_xml(xml)
|
||
|
parentnode = self._node_make_stub(xpath)
|
||
|
self._node_add_child(xpath, parentnode, newnode)
|
||
|
|
||
|
def node_force_remove(self, fullxpath):
|
||
|
"""
|
||
|
Remove the element referenced at the passed xpath, regardless
|
||
|
of whether it has children or not, and then clean up the XML
|
||
|
chain
|
||
|
"""
|
||
|
xpathobj = _XPath(fullxpath)
|
||
|
parentnode = self._find(xpathobj.parent_xpath())
|
||
|
childnode = self._find(fullxpath)
|
||
|
if parentnode is None or childnode is None:
|
||
|
return
|
||
|
self._node_remove_child(parentnode, childnode)
|
||
|
|
||
|
def _node_set_content(self, xpath, node, setval):
|
||
|
xpathobj = _XPath(xpath)
|
||
|
if setval is not None:
|
||
|
setval = str(setval)
|
||
|
if xpathobj.is_prop:
|
||
|
self._node_set_property(node, xpathobj.propname, setval)
|
||
|
else:
|
||
|
self._node_set_text(node, setval)
|
||
|
|
||
|
def _node_make_stub(self, fullxpath):
|
||
|
"""
|
||
|
Build all nodes for the passed xpath. For example, if XML is <foo/>,
|
||
|
and xpath=./bar/@baz, after this function the XML will be:
|
||
|
|
||
|
<foo>
|
||
|
<bar baz=''/>
|
||
|
</foo>
|
||
|
|
||
|
And the node pointing to @baz will be returned, for the caller to
|
||
|
do with as they please.
|
||
|
|
||
|
There's also special handling to ensure that setting
|
||
|
xpath=./bar[@baz='foo']/frob will create
|
||
|
|
||
|
<bar baz='foo'>
|
||
|
<frob></frob>
|
||
|
</bar>
|
||
|
|
||
|
Even if <bar> didn't exist before. So we fill in the dependent property
|
||
|
expression values
|
||
|
"""
|
||
|
xpathobj = _XPath(fullxpath)
|
||
|
parentxpath = "."
|
||
|
parentnode = self._find(parentxpath)
|
||
|
if parentnode is None:
|
||
|
raise RuntimeError("programming error: "
|
||
|
"Did not find XML root node for xpath=%s" % fullxpath)
|
||
|
|
||
|
for xpathseg in xpathobj.segments[1:]:
|
||
|
oldxpath = parentxpath
|
||
|
parentxpath += "/%s" % xpathseg.fullsegment
|
||
|
tmpnode = self._find(parentxpath)
|
||
|
if tmpnode is not None:
|
||
|
# xpath node already exists, nothing to create yet
|
||
|
parentnode = tmpnode
|
||
|
continue
|
||
|
|
||
|
newnode = self._node_new(xpathseg)
|
||
|
self._node_add_child(oldxpath, parentnode, newnode)
|
||
|
parentnode = newnode
|
||
|
|
||
|
# For a conditional xpath like ./foo[@bar='baz'],
|
||
|
# we also want to implicitly set <foo bar='baz'/>
|
||
|
if xpathseg.condition_prop:
|
||
|
self._node_set_property(parentnode, xpathseg.condition_prop,
|
||
|
xpathseg.condition_val)
|
||
|
|
||
|
return parentnode
|
||
|
|
||
|
def _node_remove_empty(self, fullxpath):
|
||
|
"""
|
||
|
Walk backwards up the xpath chain, and remove each element
|
||
|
if it doesn't have any children or attributes, so we don't
|
||
|
leave stale elements in the XML
|
||
|
"""
|
||
|
xpathobj = _XPath(fullxpath)
|
||
|
segments = xpathobj.segments[:]
|
||
|
parent = None
|
||
|
while segments:
|
||
|
xpath = _XPath.join(segments)
|
||
|
segments.pop()
|
||
|
child = parent
|
||
|
parent = self._find(xpath)
|
||
|
if parent is None:
|
||
|
break
|
||
|
if child is None:
|
||
|
continue
|
||
|
if self._node_has_content(child):
|
||
|
break
|
||
|
|
||
|
self._node_remove_child(parent, child)
|
||
|
|
||
|
|
||
|
class _Libxml2API(_XMLBase):
|
||
|
def __init__(self, xml):
|
||
|
_XMLBase.__init__(self)
|
||
|
self._doc = libxml2.parseDoc(xml)
|
||
|
self._ctx = self._doc.xpathNewContext()
|
||
|
self._ctx.setContextNode(self._doc.children)
|
||
|
for key, val in self.NAMESPACES.items():
|
||
|
self._ctx.xpathRegisterNs(key, val)
|
||
|
|
||
|
def __del__(self):
|
||
|
self._doc.freeDoc()
|
||
|
self._doc = None
|
||
|
self._ctx.xpathFreeContext()
|
||
|
self._ctx = None
|
||
|
|
||
|
def _sanitize_xml(self, xml):
|
||
|
# Strip starting <?...> line
|
||
|
if xml.startswith("<?"):
|
||
|
ignore, xml = xml.split("\n", 1)
|
||
|
if not xml.endswith("\n") and "\n" in xml:
|
||
|
xml += "\n"
|
||
|
return xml
|
||
|
|
||
|
def copy_api(self):
|
||
|
return _Libxml2API(self._doc.children.serialize())
|
||
|
|
||
|
def _find(self, fullxpath):
|
||
|
xpath = _XPath(fullxpath).xpath
|
||
|
node = self._ctx.xpathEval(xpath)
|
||
|
return (node and node[0] or None)
|
||
|
|
||
|
def count(self, xpath):
|
||
|
return len(self._ctx.xpathEval(xpath))
|
||
|
|
||
|
def _node_tostring(self, node):
|
||
|
return node.serialize()
|
||
|
def _node_from_xml(self, xml):
|
||
|
return libxml2.parseDoc(xml).children
|
||
|
|
||
|
def _node_get_text(self, node):
|
||
|
return node.content
|
||
|
def _node_set_text(self, node, setval):
|
||
|
if setval is not None:
|
||
|
setval = util.xml_escape(setval)
|
||
|
node.setContent(setval)
|
||
|
|
||
|
def _node_get_property(self, node, propname):
|
||
|
prop = node.hasProp(propname)
|
||
|
if prop:
|
||
|
return prop.content
|
||
|
def _node_set_property(self, node, propname, setval):
|
||
|
if setval is None:
|
||
|
prop = node.hasProp(propname)
|
||
|
if prop:
|
||
|
prop.unlinkNode()
|
||
|
prop.freeNode()
|
||
|
else:
|
||
|
node.setProp(propname, util.xml_escape(setval))
|
||
|
|
||
|
def _node_new(self, xpathseg):
|
||
|
newnode = libxml2.newNode(xpathseg.nodename)
|
||
|
if not xpathseg.nsname:
|
||
|
return newnode
|
||
|
|
||
|
ctxnode = self._ctx.contextNode()
|
||
|
for ns in util.listify(ctxnode.nsDefs()):
|
||
|
if ns.name == xpathseg.nsname:
|
||
|
break
|
||
|
else:
|
||
|
ns = ctxnode.newNs(
|
||
|
self.NAMESPACES[xpathseg.nsname], xpathseg.nsname)
|
||
|
newnode.setNs(ns)
|
||
|
return newnode
|
||
|
|
||
|
def node_clear(self, xpath):
|
||
|
node = self._find(xpath)
|
||
|
if node:
|
||
|
propnames = [p.name for p in (node.properties or [])]
|
||
|
for p in propnames:
|
||
|
node.unsetProp(p)
|
||
|
node.setContent(None)
|
||
|
|
||
|
def _node_has_content(self, node):
|
||
|
return node.type == "element" and (node.children or node.properties)
|
||
|
|
||
|
def _node_remove_child(self, parentnode, childnode):
|
||
|
node = childnode
|
||
|
|
||
|
# Look for preceding whitespace and remove it
|
||
|
white = node.get_prev()
|
||
|
if white and white.type == "text" and "<" not in white.content:
|
||
|
white.unlinkNode()
|
||
|
white.freeNode()
|
||
|
|
||
|
node.unlinkNode()
|
||
|
node.freeNode()
|
||
|
if all([n.type == "text" for n in parentnode.children]):
|
||
|
parentnode.setContent(None)
|
||
|
|
||
|
def _node_add_child(self, parentxpath, parentnode, newnode):
|
||
|
ignore = parentxpath
|
||
|
def node_is_text(n):
|
||
|
return bool(n and n.type == "text" and "<" not in n.content)
|
||
|
|
||
|
def prevSibling(node):
|
||
|
parent = node.get_parent()
|
||
|
if not parent:
|
||
|
return None
|
||
|
|
||
|
prev = None
|
||
|
for child in parent.children:
|
||
|
if child == node:
|
||
|
return prev
|
||
|
prev = child
|
||
|
|
||
|
return None
|
||
|
|
||
|
sib = parentnode.get_last()
|
||
|
if not node_is_text(sib):
|
||
|
# This case is when we add a child element to a node for the
|
||
|
# first time, like:
|
||
|
#
|
||
|
# <features/>
|
||
|
# to
|
||
|
# <features>
|
||
|
# <acpi/>
|
||
|
# </features>
|
||
|
prevsib = prevSibling(parentnode)
|
||
|
if node_is_text(prevsib):
|
||
|
sib = libxml2.newText(prevsib.content)
|
||
|
else:
|
||
|
sib = libxml2.newText("\n")
|
||
|
parentnode.addChild(sib)
|
||
|
|
||
|
# This case is adding a child element to an already properly
|
||
|
# spaced element. Example:
|
||
|
# <features>
|
||
|
# <acpi/>
|
||
|
# </features>
|
||
|
# to
|
||
|
# <features>
|
||
|
# <acpi/>
|
||
|
# <apic/>
|
||
|
# </features>
|
||
|
sib = parentnode.get_last()
|
||
|
content = sib.content
|
||
|
sib = sib.addNextSibling(libxml2.newText(" "))
|
||
|
txt = libxml2.newText(content)
|
||
|
|
||
|
sib.addNextSibling(newnode)
|
||
|
newnode.addNextSibling(txt)
|
||
|
|
||
|
|
||
|
XMLAPI = _Libxml2API
|