tests: Refactor testlib and introduce the state object

Move the state-related logic from `testlib.assertlib` to
a new class named `State` in `testlib.statelib`.

Signed-off-by: Edward Haas <edwardh@redhat.com>
This commit is contained in:
Edward Haas 2018-10-24 11:11:40 +03:00 committed by Gris Ge
parent 2e052aca6c
commit 0499443ce5
2 changed files with 95 additions and 52 deletions

View File

@ -15,62 +15,19 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import collections
import copy
import six
from libnmstate import netinfo
from . import statelib
from .statelib import INTERFACES
def assert_state(desired_state):
def assert_state(desired_state_data):
"""Given a state, assert it against the current state."""
filtered_current_state = statelib.filter_current_state(desired_state)
normalized_desired_state = _normalize_desired_state(desired_state,
filtered_current_state)
current_state_data = netinfo.show()
current_state = statelib.State(current_state_data)
current_state.filter(desired_state_data)
assert normalized_desired_state == filtered_current_state
full_desired_state = statelib.State(current_state.state)
full_desired_state.update(desired_state_data)
def _normalize_desired_state(desired_state, current_state):
"""
Given the desired and current state, complement the desired state with all
missing properties such that it will be a "full" state description.
Note: It is common for the desired state to include only partial config,
expecting nmstate to complement the missing parts from the current state.
"""
desired_state = copy.deepcopy(desired_state)
desired_interfaces_state = desired_state[INTERFACES]
for current_iface_state in current_state[INTERFACES]:
ifname = current_iface_state['name']
desired_iface_state = _lookup_iface_state_by_name(
desired_interfaces_state, ifname)
if desired_iface_state is not None:
normalized_desired_iface_state = _dict_update(
copy.deepcopy(current_iface_state), desired_iface_state)
desired_iface_state.update(normalized_desired_iface_state)
return desired_state
def _lookup_iface_state_by_name(interfaces_state, ifname):
for iface_state in interfaces_state:
if iface_state['name'] == ifname:
return iface_state
return None
def _dict_update(origin_data, to_merge_data):
"""
Recursively performs a dict update (merge), taking the to_merge_data and
updating the origin_data.
The function changes the origin_data in-place.
"""
for key, val in six.viewitems(to_merge_data):
if isinstance(val, collections.Mapping):
origin_data[key] = _dict_update(origin_data.get(key, {}), val)
else:
origin_data[key] = val
return origin_data
assert full_desired_state.state == current_state.state

View File

@ -15,6 +15,10 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import collections
import copy
import six
from libnmstate import netinfo
from libnmstate.schema import Constants
@ -29,7 +33,89 @@ def show_only(ifnames):
base_filter_state = {
INTERFACES: [{'name': ifname} for ifname in ifnames]
}
return filter_current_state(base_filter_state)
current_state = State(netinfo.show())
current_state.filter(base_filter_state)
return current_state.state
class State(object):
def __init__(self, state):
self._state = copy.deepcopy(state)
def __eq__(self, other):
return self._state == other
def __hash__(self):
return hash(self._state)
def __str__(self):
return self._state
def __repr__(self):
return self.__str__()
@property
def state(self):
return self._state
def filter(self, based_on_state):
"""
Given the based_on_state, update the sate with a filtered version
which includes only entities such as interfaces that have been
mentioned in the based_on_state.
In case there are no entities for filtering, all are reported.
"""
base_iface_names = {ifstate['name']
for ifstate in based_on_state[INTERFACES]}
if not base_iface_names:
return
filtered_iface_state = [
ifstate
for ifstate in self._state[INTERFACES]
if ifstate['name'] in base_iface_names
]
self._state = {INTERFACES: filtered_iface_state}
def update(self, other_state):
"""
Given the other_state, update the state with the other_state data.
"""
other_state = copy.deepcopy(other_state)
other_interfaces_state = other_state[INTERFACES]
for base_iface_state in self._state[INTERFACES]:
ifname = base_iface_state['name']
other_iface_state = _lookup_iface_state_by_name(
other_interfaces_state, ifname)
if other_iface_state is not None:
iface_state = _dict_update(base_iface_state, other_iface_state)
other_iface_state.update(iface_state)
self._state = other_state
def _lookup_iface_state_by_name(interfaces_state, ifname):
for iface_state in interfaces_state:
if iface_state['name'] == ifname:
return iface_state
return None
def _dict_update(origin_data, to_merge_data):
"""
Recursively performs a dict update (merge), taking the to_merge_data and
updating the origin_data.
The function changes the origin_data in-place.
"""
for key, val in six.viewitems(to_merge_data):
if isinstance(val, collections.Mapping):
origin_data[key] = _dict_update(origin_data.get(key, {}), val)
else:
origin_data[key] = val
return origin_data
def filter_current_state(desired_state):