plugin: Introduce plugin design

The `libnmstate/nmstate.py` will search `plugins` folder or
NMSTATE_PLUGIN_DIR folder and load it.

The newly add ovsdb plugin is doing nothing.

New error `NmstatePluginError` for bug in plugin.

New interface type: `InterfaceType.OTHER` allowing plugin to use this
type in their plugin in free format.

The plugins are using `plugin.priority` to:

 * For querying, higher priority plugin will override information from
   lower ones on the same interface.

 * For applying, higher priority plugin will make changes __after__
   lower ones.

Integration test cases included.

Signed-off-by: Gris Ge <fge@redhat.com>
This commit is contained in:
Gris Ge 2020-06-15 15:31:49 +08:00
parent 4294babb19
commit 4446febee3
13 changed files with 694 additions and 62 deletions

View File

@ -110,3 +110,11 @@ class NmstateTimeoutError(NmstateLibnmError):
"""
pass
class NmstatePluginError(NmstateError):
"""
Unexpected plugin behaviour happens, it is a bug of the plugin.
"""
pass

View File

@ -20,11 +20,16 @@
import copy
import time
from libnmstate import validator
from libnmstate.error import NmstateVerificationError
from .nmstate import create_checkpoints
from .nmstate import destroy_checkpoints
from .nmstate import plugin_context
from .nmstate import show_with_plugin
from .nmstate import plugins_capabilities
from .nmstate import rollback_checkpoints
from .nmstate import show_with_plugins
from .net_state import NetState
MAINLOOP_TIMEOUT = 35
@ -50,17 +55,19 @@ def apply(
:rtype: str
"""
desired_state = copy.deepcopy(desired_state)
with plugin_context() as plugin:
with plugin_context() as plugins:
validator.schema_validate(desired_state)
validator.validate_capabilities(desired_state, plugin.capabilities)
current_state = show_with_plugin(plugin)
current_state = show_with_plugins(plugins, include_status_data=True)
validator.validate_capabilities(
desired_state, plugins_capabilities(plugins)
)
net_state = NetState(desired_state, current_state)
checkpoint = plugin.create_checkpoint(rollback_timeout)
_apply_ifaces_state(plugin, net_state, verify_change)
checkpoints = create_checkpoints(plugins, rollback_timeout)
_apply_ifaces_state(plugins, net_state, verify_change)
if commit:
plugin.destroy_checkpoint(checkpoint)
destroy_checkpoints(plugins, checkpoints)
else:
return checkpoint
return checkpoints
def commit(*, checkpoint=None):
@ -71,8 +78,8 @@ def commit(*, checkpoint=None):
will be selected and committed.
:type checkpoint: str
"""
with plugin_context() as plugin:
plugin.destroy_checkpoint(checkpoint)
with plugin_context() as plugins:
destroy_checkpoints(plugins, checkpoint)
def rollback(*, checkpoint=None):
@ -83,25 +90,26 @@ def rollback(*, checkpoint=None):
will be selected and rolled back.
:type checkpoint: str
"""
with plugin_context() as plugin:
plugin.rollback_checkpoint(checkpoint)
with plugin_context() as plugins:
rollback_checkpoints(plugins, checkpoint)
def _apply_ifaces_state(plugin, net_state, verify_change):
plugin.apply_changes(net_state)
def _apply_ifaces_state(plugins, net_state, verify_change):
for plugin in plugins:
plugin.apply_changes(net_state)
verified = False
if verify_change:
for _ in range(VERIFY_RETRY_TIMEOUT):
try:
_verify_change(plugin, net_state)
_verify_change(plugins, net_state)
verified = True
break
except NmstateVerificationError:
time.sleep(VERIFY_RETRY_INTERNAL)
if not verified:
_verify_change(plugin, net_state)
_verify_change(plugins, net_state)
def _verify_change(plugin, net_state):
current_state = show_with_plugin(plugin)
def _verify_change(plugins, net_state):
current_state = show_with_plugins(plugins)
net_state.verify(current_state)

View File

@ -17,7 +17,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from .nmstate import show_with_plugin
from .nmstate import show_with_plugins
from .nmstate import plugin_context
@ -31,5 +31,5 @@ def show(*, include_status_data=False):
When include_status_data is set, both are reported, otherwise only the
configuration data is reported.
"""
with plugin_context() as plugin:
return show_with_plugin(plugin, include_status_data)
with plugin_context() as plugins:
return show_with_plugins(plugins, include_status_data)

View File

@ -50,12 +50,16 @@ from .common import NM
from .context import NmContext
class NetworkManagerPlugin:
class NetworkManagerPlugin(NmstatePlugin):
def __init__(self):
self._ctx = NmContext()
self._checkpoint = None
self._check_version_mismatch()
@property
def name(self):
return "NetworkManager"
def unload(self):
if self._ctx:
self._ctx.clean_up()
@ -82,6 +86,15 @@ class NetworkManagerPlugin:
capabilities.append(NmstatePlugin.TEAM_CAPABILITY)
return capabilities
@property
def plugin_capabilities(self):
return [
NmstatePlugin.PLUGIN_CAPABILITY_IFACE,
NmstatePlugin.PLUGIN_CAPABILITY_ROUTE,
NmstatePlugin.PLUGIN_CAPABILITY_ROUTE_RULE,
NmstatePlugin.PLUGIN_CAPABILITY_DNS,
]
def get_interfaces(self):
info = []

View File

@ -18,44 +18,222 @@
#
from contextlib import contextmanager
import importlib
import logging
from operator import itemgetter
from operator import attrgetter
import os
import pkgutil
from libnmstate import validator
from libnmstate.error import NmstateError
from libnmstate.error import NmstateValueError
from libnmstate.nm import NetworkManagerPlugin
from libnmstate.schema import DNS
from libnmstate.schema import Interface
from libnmstate.schema import Route
from libnmstate.schema import RouteRule
from .plugin import NmstatePlugin
from .state import merge_dict
@contextmanager
def plugin_context():
nm_plugin = NetworkManagerPlugin()
plugins = _load_plugins()
try:
yield nm_plugin
# Lowest priority plugin should perform actions first.
plugins.sort(key=attrgetter("priority"))
yield plugins
except (Exception, KeyboardInterrupt):
if nm_plugin.checkpoint:
try:
nm_plugin.rollback_checkpoint()
# Don't complex thing by raise exception when handling another
# exception, just log the rollback failure.
except Exception as e:
logging.error(f"Rollback failed with error {e}")
for plugin in plugins:
if plugin.checkpoint:
try:
plugin.rollback_checkpoint()
# Don't complex thing by raise exception when handling another
# exception, just log the rollback failure.
except Exception as e:
logging.error(f"Rollback failed with error {e}")
raise
finally:
nm_plugin.unload()
for plugin in plugins:
plugin.unload()
def show_with_plugin(plugin, include_status_data=None):
plugin.refresh_content()
def show_with_plugins(plugins, include_status_data=None):
for plugin in plugins:
plugin.refresh_content()
report = {}
if include_status_data:
report["capabilities"] = plugin.capabilities
report["capabilities"] = plugins_capabilities(plugins)
report[Interface.KEY] = plugin.get_interfaces()
report[Route.KEY] = plugin.get_routes()
report[RouteRule.KEY] = plugin.get_route_rules()
report[DNS.KEY] = plugin.get_dns_client_config()
report[Interface.KEY] = _get_interface_info_from_plugins(plugins)
route_plugin = _find_plugin_for_capability(
plugins, NmstatePlugin.PLUGIN_CAPABILITY_ROUTE
)
if route_plugin:
report[Route.KEY] = route_plugin.get_routes()
route_rule_plugin = _find_plugin_for_capability(
plugins, NmstatePlugin.PLUGIN_CAPABILITY_ROUTE_RULE
)
if route_rule_plugin:
report[RouteRule.KEY] = route_rule_plugin.get_route_rules()
dns_plugin = _find_plugin_for_capability(
plugins, NmstatePlugin.PLUGIN_CAPABILITY_DNS
)
if dns_plugin:
report[DNS.KEY] = dns_plugin.get_dns_client_config()
validator.schema_validate(report)
return report
def plugins_capabilities(plugins):
capabilities = set()
for plugin in plugins:
capabilities.update(set(plugin.capabilities))
return list(capabilities)
def _load_plugins():
plugins = [NetworkManagerPlugin()]
plugins.extend(_load_external_py_plugins())
return plugins
def _load_external_py_plugins():
"""
Load module from folder defined in system evironment NMSTATE_PLUGIN_DIR,
if empty, use the 'plugins' folder of current python file.
"""
plugins = []
plugin_dir = os.environ.get("NMSTATE_PLUGIN_DIR")
if not plugin_dir:
plugin_dir = f"{os.path.dirname(os.path.realpath(__file__))}/plugins"
for _, name, ispkg in pkgutil.iter_modules([plugin_dir]):
if name.startswith("nmstate_plugin_"):
try:
spec = importlib.util.spec_from_file_location(
name, f"{plugin_dir}/{name}.py"
)
plugin_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(plugin_module)
plugin = plugin_module.NMSTATE_PLUGIN()
plugins.append(plugin)
except Exception as error:
logging.warning(f"Failed to load plugin {name}: {error}")
return plugins
def _find_plugin_for_capability(plugins, capability):
"""
Return the plugin with specified capability and highest priority.
"""
chose_plugin = None
for plugin in plugins:
if (
chose_plugin
and capability in plugin.plugin_capabilities
and plugin.priority > chose_plugin.priority
) or not chose_plugin:
chose_plugin = plugin
return chose_plugin
def _get_interface_info_from_plugins(plugins):
all_ifaces = {}
IFACE_PRIORITY_METADATA = "_plugin_priority"
for plugin in plugins:
if (
NmstatePlugin.PLUGIN_CAPABILITY_IFACE
not in plugin.plugin_capabilities
):
continue
for iface in plugin.get_interfaces():
iface[IFACE_PRIORITY_METADATA] = plugin.priority
iface_name = iface[Interface.NAME]
if iface_name in all_ifaces:
existing_iface = all_ifaces[iface_name]
existing_priority = existing_iface[IFACE_PRIORITY_METADATA]
current_priority = plugin.priority
if current_priority > existing_priority:
merge_dict(iface, existing_iface)
all_ifaces[iface_name] = iface
else:
merge_dict(existing_iface, iface)
else:
all_ifaces[iface_name] = iface
# Remove metadata
for iface in all_ifaces.values():
iface.pop(IFACE_PRIORITY_METADATA)
return sorted(all_ifaces.values(), key=itemgetter(Interface.NAME))
def create_checkpoints(plugins, timeout):
"""
Return a string containing all the check point created by each plugin in
the format:
plugin.name|<checkpoing_path>|plugin.name|<checkpoing_path|...
"""
checkpoints = []
for plugin in plugins:
checkpoint = plugin.create_checkpoint(timeout)
if checkpoint:
checkpoints.append(f"{plugin.name}|{checkpoint}")
return "|".join(checkpoints)
def destroy_checkpoints(plugins, checkpoints):
_checkpoint_action(plugins, _parse_checkpoints(checkpoints), "destroy")
def rollback_checkpoints(plugins, checkpoints):
_checkpoint_action(plugins, _parse_checkpoints(checkpoints), "rollback")
def _checkpoint_action(plugins, checkpoint_index, action):
errors = []
for plugin in plugins:
if checkpoint_index and plugin.name not in checkpoint_index:
continue
checkpoint = (
checkpoint_index[plugin.name] if checkpoint_index else None
)
try:
if action == "destroy":
plugin.destroy_checkpoint(checkpoint)
else:
plugin.rollback_checkpoint(checkpoint)
except (Exception, KeyboardInterrupt) as error:
errors.append(error)
if errors:
if len(errors) == 1:
raise errors[0]
else:
raise NmstateError(
"Got multiple exception during checkpoint "
f"{action}: {errors}"
)
def _parse_checkpoints(checkpoints):
"""
Return a dict mapping plugin name to checkpoint
"""
if not checkpoints:
return None
parsed = checkpoints.split("|")
if len(parsed) % 2:
raise NmstateValueError("Invalid format of checkpoint")
checkpoint_index = {}
for plugin_name, checkpoint in zip(parsed[0::2], parsed[1::2]):
checkpoint_index[plugin_name] = checkpoint

View File

@ -17,7 +17,78 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from abc import ABCMeta
from abc import abstractproperty
from abc import abstractmethod
class NmstatePlugin:
from .error import NmstatePluginError
class NmstatePlugin(metaclass=ABCMeta):
OVS_CAPABILITY = "openvswitch"
TEAM_CAPABILITY = "team"
PLUGIN_CAPABILITY_IFACE = "interface"
PLUGIN_CAPABILITY_ROUTE = "route"
PLUGIN_CAPABILITY_ROUTE_RULE = "route_rule"
PLUGIN_CAPABILITY_DNS = "dns"
DEFAULT_PRIORITY = 10
def unload(self):
pass
@property
def checkpoint(self):
return None
def refresh_content(self):
pass
@abstractproperty
def name(self):
pass
@property
def priority(self):
return NmstatePlugin.DEFAULT_PRIORITY
def get_interfaces(self):
raise NmstatePluginError(
f"Plugin {self.name} BUG: get_interfaces() not implemented"
)
def apply_changes(self, net_state):
pass
@property
def capabilities(self):
return []
@abstractmethod
def plugin_capabilities(self):
pass
def create_checkpoint(self, timeout):
return None
def rollback_checkpoint(self, checkpoint=None):
pass
def destroy_checkpoint(self, checkpoint=None):
pass
def get_routes(self):
raise NmstatePluginError(
f"Plugin {self.name} BUG: get_routes() not implemented"
)
def get_route_rules(self):
raise NmstatePluginError(
f"Plugin {self.name} BUG: get_route_rules() not implemented"
)
def get_dns_client_config(self):
raise NmstatePluginError(
f"Plugin {self.name} BUG: get_dns_client_config() not implemented"
)

View File

@ -0,0 +1,18 @@
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

View File

@ -0,0 +1,40 @@
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from libnmstate.plugin import NmstatePlugin
class NmstateOvsdbPlugin(NmstatePlugin):
@property
def name(self):
return "nmstate-plugin-ovsdb"
@property
def priority(self):
return NmstatePlugin.DEFAULT_PRIORITY + 1
@property
def plugin_capabilities(self):
return NmstatePlugin.PLUGIN_CAPABILITY_IFACE
def get_interfaces(self):
return []
NMSTATE_PLUGIN = NmstateOvsdbPlugin

View File

@ -109,6 +109,7 @@ class InterfaceType:
VLAN = "vlan"
VXLAN = "vxlan"
TEAM = "team"
OTHER = "other"
VIRT_TYPES = (
BOND,

View File

@ -28,6 +28,7 @@ properties:
- "$ref": "#/definitions/interface-vlan/rw"
- "$ref": "#/definitions/interface-vxlan/rw"
- "$ref": "#/definitions/interface-team/rw"
- "$ref": "#/definitions/interface-other/rw"
routes:
type: object
properties:
@ -552,6 +553,13 @@ definitions:
name:
type: string
interface-other:
rw:
properties:
type:
type: string
enum:
- other
route:
type: object
properties:

View File

@ -0,0 +1,285 @@
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import copy
import os
import tempfile
import pytest
import libnmstate
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceType
from libnmstate.schema import InterfaceState
from libnmstate.schema import DNS
from libnmstate.schema import Route
from libnmstate.schema import RouteRule
from libnmstate.plugin import NmstatePlugin
from .testlib import statelib
FOO_IFACE_NAME = "foo1"
FOO_IFACE_STATES = [
{
Interface.NAME: FOO_IFACE_NAME,
Interface.TYPE: InterfaceType.OTHER,
Interface.STATE: InterfaceState.UP,
"foo": {"a": 1, "b": 2},
}
]
BAR_IFACE_NAME = "bar1"
BAR_IFACE_STATES = [
{
Interface.NAME: BAR_IFACE_NAME,
Interface.TYPE: InterfaceType.OTHER,
Interface.STATE: InterfaceState.UP,
"foo": {"a": 2},
},
{
Interface.NAME: FOO_IFACE_NAME,
Interface.TYPE: InterfaceType.OTHER,
Interface.STATE: InterfaceState.UP,
"foo": {"a": 3},
},
]
TEST_ROUTE_STATE = {
Route.RUNNING: [
{
Route.DESTINATION: "198.51.100.0/24",
Route.METRIC: 103,
Route.NEXT_HOP_ADDRESS: "192.0.2.1",
Route.NEXT_HOP_INTERFACE: FOO_IFACE_NAME,
Route.TABLE_ID: 100,
}
],
Route.CONFIG: [],
}
TEST_DNS_STATE = {
DNS.RUNNING: {
DNS.SERVER: ["2001:4860:4860::8888", "1.1.1.1"],
DNS.SEARCH: ["example.org", "example.com"],
},
DNS.CONFIG: [],
}
TEST_ROUTE_RULE_STATE = {
RouteRule.CONFIG: [
{
RouteRule.IP_FROM: "2001:db8:a::/64",
RouteRule.IP_TO: "2001:db8:f::/64",
RouteRule.PRIORITY: 1000,
RouteRule.ROUTE_TABLE: 100,
},
{
RouteRule.IP_FROM: "203.0.113.0/24",
RouteRule.IP_TO: "192.0.2.0/24",
RouteRule.PRIORITY: 1001,
RouteRule.ROUTE_TABLE: 101,
},
]
}
GET_IFACES_FORMAT = """
def get_interfaces(self):
return {ifaces}
"""
GET_ROUTES_FORMAT = """
def get_routes(self):
return {routes}
"""
GET_ROUTE_RULES_FORMAT = """
def get_route_rules(self):
return {route_rules}
"""
GET_DNS_FORMAT = """
def get_dns_client_config(self):
return {dns_config}
"""
@pytest.fixture
def tmp_plugin_dir():
with tempfile.TemporaryDirectory() as plugin_dir:
os.environ["NMSTATE_PLUGIN_DIR"] = plugin_dir
yield plugin_dir
os.environ.pop("NMSTATE_PLUGIN_DIR")
@pytest.fixture
def with_foo_plugin(tmp_plugin_dir):
_gen_plugin_foo(tmp_plugin_dir)
@pytest.fixture
def with_multiple_plugins(tmp_plugin_dir):
_gen_plugin_foo(tmp_plugin_dir)
_gen_plugin_bar(tmp_plugin_dir)
@pytest.fixture
def with_route_plugin(tmp_plugin_dir):
_gen_plugin_route_foo(tmp_plugin_dir)
@pytest.fixture
def with_route_rule_plugin(tmp_plugin_dir):
_gen_plugin_route_rule_foo(tmp_plugin_dir)
@pytest.fixture
def with_dns_plugin(tmp_plugin_dir):
_gen_plugin_dns_foo(tmp_plugin_dir)
def _gen_plugin(
plugin_dir,
plugin_name,
plugin_class,
priority,
ifaces=None,
routes=None,
dns_config=None,
route_rules=None,
):
plugin_capabilities = []
get_funs_txt = ""
if ifaces:
plugin_capabilities.append(NmstatePlugin.PLUGIN_CAPABILITY_IFACE)
get_funs_txt += GET_IFACES_FORMAT.format(ifaces=f"{ifaces}")
if routes:
plugin_capabilities.append(NmstatePlugin.PLUGIN_CAPABILITY_ROUTE)
get_funs_txt += GET_ROUTES_FORMAT.format(routes=f"{routes}")
if route_rules:
plugin_capabilities.append(NmstatePlugin.PLUGIN_CAPABILITY_ROUTE_RULE)
get_funs_txt += GET_ROUTE_RULES_FORMAT.format(
route_rules=f"{route_rules}"
)
if dns_config:
plugin_capabilities.append(NmstatePlugin.PLUGIN_CAPABILITY_DNS)
get_funs_txt += GET_DNS_FORMAT.format(dns_config=f"{dns_config}")
plugin_txt = f"""
from libnmstate.plugin import NmstatePlugin
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceType
from libnmstate.schema import InterfaceState
class {plugin_class}(NmstatePlugin):
@property
def name(self):
return "{plugin_name}"
@property
def priority(self):
return {priority}
@property
def plugin_capabilities(self):
return {plugin_capabilities}
{get_funs_txt}
NMSTATE_PLUGIN = {plugin_class}
"""
with open(f"{plugin_dir}/nmstate_plugin_{plugin_name}.py", "w") as fd:
fd.write(plugin_txt)
def _gen_plugin_foo(plugin_dir):
_gen_plugin(
plugin_dir,
"foo",
"NmstateFooPlugin",
NmstatePlugin.DEFAULT_PRIORITY + 1,
ifaces=FOO_IFACE_STATES,
)
def _gen_plugin_bar(plugin_dir):
_gen_plugin(
plugin_dir,
"bar",
"NmstateBarPlugin",
NmstatePlugin.DEFAULT_PRIORITY + 2,
ifaces=BAR_IFACE_STATES,
)
def _gen_plugin_route_foo(plugin_dir):
_gen_plugin(
plugin_dir,
"route_foo",
"NmstateRouteFooPlugin",
NmstatePlugin.DEFAULT_PRIORITY + 1,
routes=TEST_ROUTE_STATE,
)
def _gen_plugin_dns_foo(plugin_dir):
_gen_plugin(
plugin_dir,
"dns_foo",
"NmstateDnsFooPlugin",
NmstatePlugin.DEFAULT_PRIORITY + 1,
dns_config=TEST_DNS_STATE,
)
def _gen_plugin_route_rule_foo(plugin_dir):
_gen_plugin(
plugin_dir,
"route_rule_foo",
"NmstateRouteRuleFooPlugin",
NmstatePlugin.DEFAULT_PRIORITY + 1,
route_rules=TEST_ROUTE_RULE_STATE,
)
def test_load_foo_plugin(with_foo_plugin):
current_state = statelib.show_only((FOO_IFACE_NAME,))
assert current_state[Interface.KEY] == FOO_IFACE_STATES
def test_two_plugins_with_merged_iface_by_priority(with_multiple_plugins):
current_state = statelib.show_only((BAR_IFACE_NAME, FOO_IFACE_NAME))
expected_ifaces = copy.deepcopy(BAR_IFACE_STATES)
expected_ifaces[1]["foo"]["b"] = 2
assert current_state[Interface.KEY] == expected_ifaces
def test_load_external_route_plugin(with_route_plugin):
state = libnmstate.show()
assert state[Route.KEY] == TEST_ROUTE_STATE
def test_load_external_route_rule_plugin(with_route_rule_plugin):
state = libnmstate.show()
assert state[RouteRule.KEY] == TEST_ROUTE_RULE_STATE
def test_load_external_dns_plugin(with_dns_plugin):
state = libnmstate.show()
assert state[DNS.KEY] == TEST_DNS_STATE

View File

@ -35,19 +35,18 @@ BOND_TYPE = InterfaceType.BOND
@pytest.fixture
def show_with_plugin_mock():
with mock.patch.object(netapplier, "show_with_plugin") as m:
def show_with_plugins_mock():
with mock.patch.object(netapplier, "show_with_plugins") as m:
yield m
@pytest.fixture
def plugin_context_mock():
with mock.patch.object(netapplier, "plugin_context") as m:
def enter(self):
return self
m().__enter__ = enter
# def enter(self):
# return [mock.MagicMock()]
#
# m().__enter__ = enter
yield m
@ -58,7 +57,7 @@ def net_state_mock():
def test_iface_admin_state_change(
show_with_plugin_mock, plugin_context_mock, net_state_mock,
show_with_plugins_mock, plugin_context_mock, net_state_mock,
):
current_config = {
Interface.KEY: [
@ -74,8 +73,9 @@ def test_iface_admin_state_change(
desired_config = copy.deepcopy(current_config)
desired_config[Interface.KEY][0][Interface.STATE] = InterfaceState.DOWN
show_with_plugin_mock.return_value = current_config
plugin = plugin_context_mock()
show_with_plugins_mock.return_value = current_config
plugin = mock.MagicMock()
plugin_context_mock.return_value.__enter__.return_value = [plugin]
netapplier.apply(desired_config, verify_change=False)
plugin.apply_changes.assert_called_once_with(
@ -84,9 +84,9 @@ def test_iface_admin_state_change(
def test_add_new_bond(
plugin_context_mock, show_with_plugin_mock, net_state_mock,
plugin_context_mock, show_with_plugins_mock, net_state_mock,
):
show_with_plugin_mock.return_value = {}
show_with_plugins_mock.return_value = {}
desired_config = {
Interface.KEY: [
@ -105,7 +105,8 @@ def test_add_new_bond(
]
}
plugin = plugin_context_mock()
plugin = mock.MagicMock()
plugin_context_mock.return_value.__enter__.return_value = [plugin]
netapplier.apply(desired_config, verify_change=False)
plugin.apply_changes.assert_called_once_with(
@ -114,7 +115,7 @@ def test_add_new_bond(
def test_edit_existing_bond(
show_with_plugin_mock, plugin_context_mock, net_state_mock,
show_with_plugins_mock, plugin_context_mock, net_state_mock,
):
current_config = {
Interface.KEY: [
@ -132,7 +133,7 @@ def test_edit_existing_bond(
}
]
}
show_with_plugin_mock.return_value = current_config
show_with_plugins_mock.return_value = current_config
desired_config = copy.deepcopy(current_config)
options = desired_config[Interface.KEY][0][Bond.CONFIG_SUBTREE][
@ -140,7 +141,8 @@ def test_edit_existing_bond(
]
options["miimon"] = 200
plugin = plugin_context_mock()
plugin = mock.MagicMock()
plugin_context_mock.return_value.__enter__.return_value = [plugin]
netapplier.apply(desired_config, verify_change=False)
plugin.apply_changes.assert_called_once_with(

View File

@ -32,8 +32,8 @@ from libnmstate.schema import RouteRule
@pytest.fixture
def show_with_plugin_mock():
with mock.patch.object(netinfo, "show_with_plugin") as m:
def show_with_plugins_mock():
with mock.patch.object(netinfo, "show_with_plugins") as m:
yield m
@ -48,7 +48,7 @@ def plugin_context_mock():
yield m
def test_netinfo_show(show_with_plugin_mock, plugin_context_mock):
def test_netinfo_show(show_with_plugins_mock, plugin_context_mock):
current_config = {
DNS.KEY: {DNS.RUNNING: {}, DNS.CONFIG: {}},
Route.KEY: {Route.CONFIG: [], Route.RUNNING: []},
@ -64,13 +64,13 @@ def test_netinfo_show(show_with_plugin_mock, plugin_context_mock):
],
}
show_with_plugin_mock.return_value = current_config
show_with_plugins_mock.return_value = current_config
report = netinfo.show()
assert current_config == report
def test_error_show(show_with_plugin_mock, plugin_context_mock):
def test_error_show(show_with_plugins_mock, plugin_context_mock):
current_config = {
DNS.KEY: {DNS.RUNNING: {}, DNS.CONFIG: {}},
Route.KEY: {"config": [], "running": []},
@ -85,7 +85,7 @@ def test_error_show(show_with_plugin_mock, plugin_context_mock):
}
],
}
show_with_plugin_mock.return_value = current_config
show_with_plugins_mock.return_value = current_config
with pytest.raises(TypeError):
# pylint: disable=too-many-function-args