python3: Remove six

Now that Nmstate is no longer supporting python2, six usage is no longer
needed and therefore removed.

Signed-off-by: Edward Haas <edwardh@redhat.com>
This commit is contained in:
Edward Haas 2019-11-24 22:25:57 +02:00 committed by Gris Ge
parent d4065e3a0c
commit 33c7e0fc90
20 changed files with 74 additions and 120 deletions

View File

@ -25,7 +25,6 @@ yum -y install \
python-jsonschema \
python-setuptools \
python2-pyyaml \
python2-six
yum-builddep -y dbus-python
```

View File

@ -17,8 +17,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import six
def get_slaves_from_state(state, default=()):
ports = state.get('bridge', {}).get('port')
@ -31,9 +29,7 @@ def set_bridge_ports_metadata(master_state, slave_state):
_set_common_slaves_metadata(master_state, slave_state)
ports = master_state.get('bridge', {}).get('port', [])
port = next(
six.moves.filter(lambda n: n['name'] == slave_state['name'], ports), {}
)
port = next(filter(lambda n: n['name'] == slave_state['name'], ports), {})
slave_state['_brport_options'] = port

View File

@ -17,8 +17,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import six
from libnmstate import iplib
from libnmstate.appliers import linux_bridge
from libnmstate.error import NmstateValueError
@ -76,7 +74,7 @@ def generate_ifaces_metadata(desired_state, current_state):
def remove_ifaces_metadata(ifaces_state):
for iface_state in six.viewvalues(ifaces_state.interfaces):
for iface_state in ifaces_state.interfaces.values():
iface_state.pop(MASTER, None)
iface_state.pop(MASTER_TYPE, None)
iface_state.pop(BRPORT_OPTIONS, None)
@ -94,9 +92,7 @@ def _set_ovs_bridge_ports_metadata(master_state, slave_state):
_set_common_slaves_metadata(master_state, slave_state)
ports = master_state.get('bridge', {}).get('port', [])
port = next(
six.moves.filter(lambda n: n['name'] == slave_state['name'], ports), {}
)
port = next(filter(lambda n: n['name'] == slave_state['name'], ports), {})
slave_state[BRPORT_OPTIONS] = port
@ -131,7 +127,7 @@ def _generate_link_master_metadata(
"""
desired_masters = [
(ifname, ifstate)
for ifname, ifstate in six.viewitems(ifaces_desired_state)
for ifname, ifstate in ifaces_desired_state.items()
if ifstate.get('type') == master_type
and ifstate.get('state') not in ('down', 'absent')
]
@ -159,7 +155,7 @@ def _generate_link_master_metadata(
current_masters = (
(ifname, ifstate)
for ifname, ifstate in six.viewitems(ifaces_current_state)
for ifname, ifstate in ifaces_current_state.items()
if ifstate.get('type') == master_type
)
for master_name, master_state in current_masters:
@ -190,7 +186,7 @@ def _generate_route_metadata(desired_state, current_state):
Currently route['next-hop-interface'] is mandatory.
Routes which do not match any current or desired interface are ignored.
"""
for iface_name, routes in six.viewitems(desired_state.config_iface_routes):
for iface_name, routes in desired_state.config_iface_routes.items():
desired_iface_state = desired_state.interfaces.get(iface_name)
current_iface_state = current_state.interfaces.get(iface_name)
if desired_iface_state:
@ -235,9 +231,7 @@ def _generate_dns_metadata(desired_state, current_state):
else:
ifaces_routes = {
ifname: [r.to_dict() for r in routes]
for ifname, routes in six.viewitems(
desired_state.config_iface_routes
)
for ifname, routes in desired_state.config_iface_routes.items()
}
ipv4_iface, ipv6_iface = nm.dns.find_interfaces_for_name_servers(
ifaces_routes
@ -329,7 +323,7 @@ def _dns_config_not_changed(desired_state, current_state):
nm.ipv4.acs_and_ip_profiles(client),
nm.ipv6.acs_and_ip_profiles(client),
)
for iface_name, iface_dns_config in six.viewitems(iface_dns_configs):
for iface_name, iface_dns_config in iface_dns_configs.items():
if iface_name not in desired_state.interfaces:
continue
iface_state = desired_state.interfaces[iface_name]
@ -338,7 +332,7 @@ def _dns_config_not_changed(desired_state, current_state):
and iface_state[Interface.STATE] != InterfaceState.UP
):
return False
for family in six.viewkeys(iface_dns_config):
for family in iface_dns_config:
if family in iface_state and not iface_state[family].get(
InterfaceIP.ENABLED
):
@ -352,10 +346,10 @@ def _preserve_current_dns_metadata(desired_state, current_state):
nm.ipv4.acs_and_ip_profiles(client),
nm.ipv6.acs_and_ip_profiles(client),
)
for iface_name, iface_dns_config in six.viewitems(iface_dns_configs):
for iface_name, iface_dns_config in iface_dns_configs.items():
if iface_name not in desired_state.interfaces:
continue
for family, dns_metadata in six.viewitems(iface_dns_config):
for family, dns_metadata in iface_dns_config.items():
iface_state = desired_state.interfaces[iface_name]
if family not in iface_state:
iface_state[family] = {}

View File

@ -21,7 +21,6 @@ from contextlib import contextmanager
import copy
import logging
import six
import time
from libnmstate import metadata
@ -187,8 +186,8 @@ def _create_editable_desired_state(
def _list_new_interfaces(desired_state, current_state):
return [
name
for name in six.viewkeys(desired_state.interfaces)
- six.viewkeys(current_state.interfaces)
for name in desired_state.interfaces.keys()
- current_state.interfaces.keys()
if desired_state.interfaces[name].get(schema.Interface.STATE)
not in (schema.InterfaceState.ABSENT, schema.InterfaceState.DOWN)
]
@ -230,7 +229,7 @@ def _add_interfaces(new_interfaces, desired_state):
def _edit_interfaces(state2edit):
logging.debug('Editing interfaces: %s', list(state2edit.interfaces))
ifaces2edit = list(six.viewvalues(state2edit.interfaces))
ifaces2edit = list(state2edit.interfaces.values())
iface2prepare = list(
filter(
@ -258,7 +257,7 @@ def _disable_ipv6(desired_state):
This is an intermediate workaround for https://bugzilla.redhat.com/1643841.
"""
for ifstate in six.viewvalues(desired_state.interfaces):
for ifstate in desired_state.interfaces.values():
if ifstate.get(schema.Interface.STATE) != schema.InterfaceState.UP:
continue
ipv6_state = ifstate.get(schema.Interface.IPV6, {})

View File

@ -18,7 +18,6 @@
#
import itertools
import six
from libnmstate.error import NmstateValueError
from libnmstate.schema import Interface
@ -213,7 +212,7 @@ def set_ifaces_admin_state(ifaces_desired_state, con_profiles=()):
for ifname in new_vlan_ifaces_to_activate:
device.activate(dev=None, connection_id=ifname)
for dev, actions in six.viewitems(remove_devs_actions):
for dev, actions in remove_devs_actions.items():
for action in actions:
action(dev)

View File

@ -17,8 +17,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import six
from . import connection
from . import nmclient
from libnmstate.error import NmstateValueError
@ -29,7 +27,7 @@ BOND_TYPE = 'bond'
def create_setting(options):
bond_setting = nmclient.NM.SettingBond.new()
for option_name, option_value in six.viewitems(options):
for option_name, option_value in options.items():
success = bond_setting.add_option(option_name, option_value)
if not success:
raise NmstateValueError(

View File

@ -17,8 +17,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import six
from libnmstate.nm import connection
from libnmstate.nm import nmclient
from libnmstate.schema import LinuxBridge as LB
@ -48,7 +46,7 @@ def _get_current_bridge_setting(base_con_profile):
def _set_bridge_properties(bridge_setting, options):
for key, val in six.viewitems(options):
for key, val in options.items():
if key == LB.MAC_AGEING_TIME:
bridge_setting.props.ageing_time = val
elif key == LB.GROUP_FORWARD_MASK:
@ -62,7 +60,7 @@ def _set_bridge_properties(bridge_setting, options):
def _set_bridge_stp_properties(bridge_setting, bridge_stp):
bridge_setting.props.stp = bridge_stp[LB.STP_ENABLED]
if bridge_stp[LB.STP_ENABLED] is True:
for stp_key, stp_val in six.viewitems(bridge_stp):
for stp_key, stp_val in bridge_stp.items():
if stp_key == LB.STP_PRIORITY:
bridge_setting.props.priority = stp_val
elif stp_key == LB.STP_FORWARD_DELAY:
@ -83,7 +81,7 @@ def create_port_setting(options, base_con_profile):
if not port_setting:
port_setting = nmclient.NM.SettingBridgePort.new()
for key, val in six.viewitems(options):
for key, val in options.items():
if key == LB.PORT_STP_PRIORITY:
port_setting.props.priority = val
elif key == LB.PORT_STP_HAIRPIN_MODE:

View File

@ -20,8 +20,6 @@ from collections import deque
from contextlib import contextmanager
import logging
import six
import gi
try:
@ -134,7 +132,7 @@ class _MainLoop(object):
return bool(self._action_queue)
def run(self, timeout):
if not isinstance(timeout, six.integer_types):
if not isinstance(timeout, int):
raise error.NmstateValueError(
"Invalid timeout value: should be an integer"
)

View File

@ -17,8 +17,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import six
from libnmstate.error import NmstateValueError
from libnmstate.schema import OVSBridge as OB
@ -57,7 +55,7 @@ def has_ovs_capability():
def create_bridge_setting(options):
bridge_setting = nmclient.NM.SettingOvsBridge.new()
for option_name, option_value in six.viewitems(options):
for option_name, option_value in options.items():
if option_name == 'fail-mode':
if option_value:
bridge_setting.props.fail_mode = option_value
@ -79,7 +77,7 @@ def create_bridge_setting(options):
def create_port_setting(options):
port_setting = nmclient.NM.SettingOvsPort.new()
for option_name, option_value in six.viewitems(options):
for option_name, option_value in options.items():
if option_name == 'tag':
port_setting.props.tag = option_value
elif option_name == 'vlan-mode':
@ -111,7 +109,7 @@ def create_interface_setting():
def translate_bridge_options(iface_state):
br_opts = {}
bridge_state = iface_state.get('bridge', {}).get('options', {})
for key in six.viewkeys(bridge_state) & set(_BRIDGE_OPTION_NAMES):
for key in bridge_state.keys() & set(_BRIDGE_OPTION_NAMES):
br_opts[key] = bridge_state[key]
return br_opts
@ -119,7 +117,7 @@ def translate_bridge_options(iface_state):
def translate_port_options(port_state):
port_opts = {}
for key in six.viewkeys(port_state) & set(_PORT_OPTION_NAMES):
for key in port_state.keys() & set(_PORT_OPTION_NAMES):
port_opts[key] = port_state[key]
return port_opts

View File

@ -20,8 +20,6 @@
from operator import itemgetter
import socket
import six
from libnmstate import iplib
from libnmstate.error import NmstateInternalError
from libnmstate.error import NmstateNotImplementedError
@ -207,7 +205,7 @@ def get_static_gateway_iface(family, iface_routes):
if family == Interface.IPV6
else IPV4_DEFAULT_GATEWAY_DESTINATION
)
for iface_name, routes in six.viewitems(iface_routes):
for iface_name, routes in iface_routes.items():
for route in routes:
if route[Route.DESTINATION] == destination:
return iface_name

View File

@ -19,8 +19,6 @@
import copy
import six
from . import nmclient
@ -128,4 +126,4 @@ class Nm2Api(object):
@staticmethod
def _swap_dict_keyval(dictionary):
return {val: key for key, val in six.viewitems(dictionary)}
return {val: key for key, val in dictionary.items()}

View File

@ -23,7 +23,6 @@ import difflib
import json
from operator import itemgetter
import six
import yaml
from libnmstate.schema import Constants
@ -59,9 +58,6 @@ def format_desired_current_state_diff(desired_state, current_state):
class PrettyState(object):
def __init__(self, state):
yaml.add_representer(OrderedDict, represent_ordereddict)
if six.PY2:
yaml.add_representer(six.text_type, represent_unicode)
self.state = order_state(deepcopy(state))
@property

View File

@ -30,8 +30,6 @@ from functools import total_ordering
from ipaddress import ip_address
from operator import itemgetter
import six
from libnmstate import iplib
from libnmstate import metadata
from libnmstate.error import NmstateValueError
@ -87,7 +85,7 @@ class RouteEntry(object):
def to_dict(self):
return {
key.replace('_', '-'): value
for key, value in six.viewitems(vars(self))
for key, value in vars(self).items()
if value is not None
}
@ -166,8 +164,7 @@ class State(object):
@property
def state(self):
self._state[Interface.KEY] = sorted(
list(six.viewvalues(self._ifaces_state)),
key=itemgetter(Interface.NAME),
list(self._ifaces_state.values()), key=itemgetter(Interface.NAME)
)
return self._state
@ -193,7 +190,7 @@ class State(object):
def _complement_interface_empty_ip_subtrees(self):
""" Complement the interfaces states with empty IPv4/IPv6 subtrees. """
for iface_state in six.viewvalues(self.interfaces):
for iface_state in self.interfaces.values():
for family in (Interface.IPV4, Interface.IPV6):
if family not in iface_state:
iface_state[family] = {}
@ -210,7 +207,7 @@ class State(object):
from the actual configuration. This makes it possible to distinguish
whether a user specified these values in the later configuration step.
"""
for ifname, iface_state in six.viewitems(self.interfaces):
for ifname, iface_state in self.interfaces.items():
iface_current_state = other_state.interfaces.get(ifname, {})
if iface_current_state.get(Interface.TYPE) == Ethernet.TYPE:
ethernet = iface_state.setdefault(Ethernet.CONFIG_SUBTREE, {})
@ -225,7 +222,7 @@ class State(object):
by the current state address values.
If dynamic IP is disabled, all dynamic IP options should be removed.
"""
for iface_state in six.viewvalues(self.interfaces):
for iface_state in self.interfaces.values():
for family in ('ipv4', 'ipv6'):
ip = iface_state[family]
if ip.get(InterfaceIP.ENABLED) and (
@ -258,7 +255,7 @@ class State(object):
self._assert_interfaces_equal(other_state)
def verify_routes(self, other_state):
for iface_name, routes in six.viewitems(self.config_iface_routes):
for iface_name, routes in self.config_iface_routes.items():
other_routes = other_state.config_iface_routes.get(iface_name, [])
if routes != other_routes:
raise NmstateVerificationError(
@ -297,9 +294,7 @@ class State(object):
This is a reverse recursive update operation.
"""
other_state = State(other_state.state)
for name in six.viewkeys(self.interfaces) & six.viewkeys(
other_state.interfaces
):
for name in self.interfaces.keys() & other_state.interfaces.keys():
dict_update(other_state.interfaces[name], self.interfaces[name])
self._ifaces_state[name] = other_state.interfaces[name]
@ -317,7 +312,7 @@ class State(object):
- Support wildcard for matching the absent routes.
"""
other_routes = set()
for ifname, routes in six.viewitems(other_state.config_iface_routes):
for ifname, routes in other_state.config_iface_routes.items():
if (
ifname in self.config_iface_routes
or self._is_interface_routable(ifname, routes)
@ -326,13 +321,13 @@ class State(object):
self_routes = {
route
for routes in six.viewvalues(self.config_iface_routes)
for routes in self.config_iface_routes.values()
for route in routes
if not route.absent
}
absent_routes = set()
for routes in six.viewvalues(self.config_iface_routes):
for routes in self.config_iface_routes.values():
for absent_route in (r for r in routes if r.absent):
absent_routes |= {
r for r in other_routes if absent_route.match(r)
@ -386,7 +381,7 @@ class State(object):
def _remove_absent_interfaces(self):
ifaces = {}
for ifname, ifstate in six.viewitems(self.interfaces):
for ifname, ifstate in self.interfaces.items():
is_absent = ifstate.get(Interface.STATE) == InterfaceState.ABSENT
if not is_absent:
ifaces[ifname] = ifstate
@ -394,7 +389,7 @@ class State(object):
def _remove_down_virt_interfaces(self):
ifaces = {}
for ifname, ifstate in six.viewitems(self.interfaces):
for ifname, ifstate in self.interfaces.items():
is_virt_down = (
ifstate.get(Interface.STATE) == InterfaceState.DOWN
and ifstate.get(Interface.TYPE) in InterfaceType.VIRT_TYPES
@ -414,12 +409,12 @@ class State(object):
for route in self._config_routes:
iface_name = route.get(Route.NEXT_HOP_INTERFACE, '')
iface_routes[iface_name].append(RouteEntry(route))
for routes in six.viewvalues(iface_routes):
for routes in iface_routes.values():
routes.sort()
return iface_routes
def _clean_sanitize_ethernet(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
ethernet_state = ifstate.get(Ethernet.CONFIG_SUBTREE)
if ethernet_state:
for key in (
@ -433,17 +428,17 @@ class State(object):
ifstate.pop(Ethernet.CONFIG_SUBTREE, None)
def _sort_lag_slaves(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
ifstate.get('link-aggregation', {}).get('slaves', []).sort()
def _sort_bridge_ports(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
ifstate.get('bridge', {}).get('port', []).sort(
key=itemgetter('name')
)
def _canonicalize_ipv6(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
new_state = {
Interface.IPV6: {
InterfaceIPv6.ENABLED: False,
@ -465,7 +460,7 @@ class State(object):
self._ifaces_state[ifstate[Interface.NAME]] = new_state
def _remove_iface_ipv6_link_local_addr(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
ifstate['ipv6'][InterfaceIPv6.ADDRESS] = list(
addr
for addr in ifstate['ipv6'][InterfaceIPv6.ADDRESS]
@ -483,20 +478,20 @@ class State(object):
ifstate[family] = {InterfaceIP.ENABLED: False}
def _sort_ip_addresses(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
for family in ('ipv4', 'ipv6'):
ifstate[family].get(InterfaceIP.ADDRESS, []).sort(
key=itemgetter(InterfaceIP.ADDRESS_IP)
)
def _capitalize_mac(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
mac = ifstate.get(Interface.MAC)
if mac:
ifstate[Interface.MAC] = mac.upper()
def _remove_empty_description(self):
for ifstate in six.viewvalues(self.interfaces):
for ifstate in self.interfaces.values():
if ifstate.get(Interface.DESCRIPTION) == '':
del ifstate[Interface.DESCRIPTION]
@ -535,7 +530,7 @@ class State(object):
def dict_update(origin_data, to_merge_data):
"""Recursevely performes a dict update (merge)"""
for key, val in six.viewitems(to_merge_data):
for key, val in to_merge_data.items():
if isinstance(val, Mapping):
origin_data[key] = dict_update(origin_data.get(key, {}), val)
else:
@ -555,7 +550,7 @@ def _validate_routes(
* Non-exit interface
* IPv4/IPv6 disabled
"""
for iface_name, route_set in six.viewitems(iface_route_sets):
for iface_name, route_set in iface_route_sets.items():
if not route_set:
continue
iface_enable_state = iface_enable_states.get(iface_name)
@ -582,7 +577,7 @@ def _validate_routes(
def canonicalize_ipv6_addr(ipv6_address_prefix_len):
ipv6_address = ipv6_address_prefix_len[InterfaceIPv6.ADDRESS_IP]
ipv6_iface = ip_address(six.u(ipv6_address))
ipv6_iface = ip_address(ipv6_address)
return {
InterfaceIPv6.ADDRESS_IP: str(ipv6_iface),
@ -594,9 +589,9 @@ def canonicalize_ipv6_addr(ipv6_address_prefix_len):
def _get_iface_enable_states(desire_state, current_state):
iface_enable_states = {}
for iface_name, iface_state in six.viewitems(current_state.interfaces):
for iface_name, iface_state in current_state.interfaces.items():
iface_enable_states[iface_name] = iface_state[Interface.STATE]
for iface_name, iface_state in six.viewitems(desire_state.interfaces):
for iface_name, iface_state in desire_state.interfaces.items():
if Interface.STATE in iface_state:
# If desire_state does not have Interface.STATE, it will use
# current_state settings.
@ -606,11 +601,11 @@ def _get_iface_enable_states(desire_state, current_state):
def _get_ip_enable_states(family, desire_state, current_state):
ip_enable_states = {}
for iface_name, iface_state in six.viewitems(current_state.interfaces):
for iface_name, iface_state in current_state.interfaces.items():
ip_enable_states[iface_name] = iface_state.get(family, {}).get(
InterfaceIP.ENABLED, False
)
for iface_name, iface_state in six.viewitems(desire_state.interfaces):
for iface_name, iface_state in desire_state.interfaces.items():
ip_enable_state = iface_state.get(family, {}).get(InterfaceIP.ENABLED)
if ip_enable_state is not None:
# If desire_state does not have Interface.IPV4/IPV6, it will use
@ -649,7 +644,7 @@ def _apply_absent_routes(absent_route_sets, iface_route_sets):
"""
for absent_route in absent_route_sets:
absent_iface_name = absent_route.next_hop_interface
for iface_name, route_set in six.viewitems(iface_route_sets):
for iface_name, route_set in iface_route_sets.items():
if absent_iface_name and absent_iface_name != iface_name:
continue
new_routes = set()
@ -666,15 +661,12 @@ def state_match(desire, current):
"""
if isinstance(desire, Mapping):
return isinstance(current, Mapping) and all(
state_match(val, current.get(key))
for key, val in six.viewitems(desire)
state_match(val, current.get(key)) for key, val in desire.items()
)
elif isinstance(desire, Sequence) and not isinstance(
desire, six.string_types
):
elif isinstance(desire, Sequence) and not isinstance(desire, str):
return (
isinstance(current, Sequence)
and not isinstance(current, six.string_types)
and not isinstance(current, str)
and len(current) == len(desire)
and all(state_match(d, c) for d, c in zip(desire, current))
)

View File

@ -19,7 +19,6 @@
import copy
import logging
import six
import jsonschema as js
@ -84,13 +83,13 @@ def validate_interfaces_state(desired_state, current_state):
def validate_link_aggregation_state(desired_state, current_state):
available_ifaces = {
ifname
for ifname, ifstate in six.viewitems(desired_state.interfaces)
for ifname, ifstate in desired_state.interfaces.items()
if ifstate.get('state') != 'absent'
}
available_ifaces |= set(current_state.interfaces)
specified_slaves = set()
for iface_state in six.viewvalues(desired_state.interfaces):
for iface_state in desired_state.interfaces.values():
if iface_state.get('state') != 'absent':
link_aggregation = iface_state.get('link-aggregation')
if link_aggregation:
@ -149,7 +148,7 @@ def validate_routes(desired_state, current_state):
- Exist and be up (no down/absent)
- Have the relevant IPv4/6 stack enabled.
"""
for iface_name, routes in six.viewitems(desired_state.config_iface_routes):
for iface_name, routes in desired_state.config_iface_routes.items():
if not routes:
continue

View File

@ -26,7 +26,6 @@ import subprocess
import sys
import tempfile
from six.moves import input
import yaml
import libnmstate

View File

@ -11,4 +11,3 @@ jsonschema
PyGObject
PyYAML
setuptools
six

View File

@ -16,11 +16,10 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import io
import json
import subprocess
import six
from .compat import mock
from nmstatectl import nmstatectl
@ -90,7 +89,7 @@ def test_run_ctl_directly_show_empty():
@mock.patch.object(
nmstatectl.libnmstate, 'show', lambda: json.loads(LO_JSON_STATE)
)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=six.StringIO)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=io.StringIO)
def test_run_ctl_directly_show_only_empty(mock_stdout):
nmstatectl.main()
assert mock_stdout.getvalue() == EMPTY_YAML_STATE
@ -100,7 +99,7 @@ def test_run_ctl_directly_show_only_empty(mock_stdout):
@mock.patch.object(
nmstatectl.libnmstate, 'show', lambda: json.loads(LO_JSON_STATE)
)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=six.StringIO)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=io.StringIO)
def test_run_ctl_directly_show_only(mock_stdout):
nmstatectl.main()
assert mock_stdout.getvalue() == LO_YAML_STATE
@ -112,7 +111,7 @@ def test_run_ctl_directly_show_only(mock_stdout):
@mock.patch.object(
nmstatectl.libnmstate, 'show', lambda: json.loads(LO_JSON_STATE)
)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=six.StringIO)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=io.StringIO)
def test_run_ctl_directly_show_json_only_empty(mock_stdout):
nmstatectl.main()
assert mock_stdout.getvalue() == EMPTY_JSON_STATE
@ -122,7 +121,7 @@ def test_run_ctl_directly_show_json_only_empty(mock_stdout):
@mock.patch.object(
nmstatectl.libnmstate, 'show', lambda: json.loads(LO_JSON_STATE)
)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=six.StringIO)
@mock.patch('nmstatectl.nmstatectl.sys.stdout', new_callable=io.StringIO)
def test_run_ctl_directly_show_json_only(mock_stdout):
nmstatectl.main()
assert mock_stdout.getvalue() == LO_JSON_STATE

View File

@ -75,7 +75,7 @@ def diff_initial_state():
'before test run:\n {}\n'.format(
libnmstate.prettystate.format_desired_current_state_diff(
old_state, new_state
),
)
)
)

View File

@ -18,12 +18,11 @@
#
from contextlib import contextmanager
from functools import wraps
import subprocess
import threading
import time
import six
TIMEOUT = 10
@ -37,7 +36,7 @@ class IpMonitorResult(object):
def ip_monitor_assert_stable_link_up(dev, timeout=10):
def decorator(func):
@six.wraps(func)
@wraps(func)
def wrapper_ip_monitor(*args, **kwargs):
with ip_monitor('link', dev, timeout) as result:
func(*args, **kwargs)

View File

@ -26,7 +26,6 @@ except ImportError:
import copy
from operator import itemgetter
import six
import libnmstate
from libnmstate.schema import Interface
@ -236,7 +235,7 @@ def _dict_update(origin_data, to_merge_data):
updating the origin_data.
The function changes the origin_data in-place.
"""
for key, val in six.viewitems(to_merge_data):
for key, val in to_merge_data.items():
if isinstance(val, Mapping):
origin_data[key] = _dict_update(origin_data.get(key, {}), val)
else:
@ -278,15 +277,12 @@ def _is_ipv6_link_local(ip, prefix):
def _state_match(desire, current):
if isinstance(desire, Mapping):
return isinstance(current, Mapping) and all(
_state_match(val, current.get(key))
for key, val in six.viewitems(desire)
_state_match(val, current.get(key)) for key, val in desire.items()
)
elif isinstance(desire, Sequence) and not isinstance(
desire, six.string_types
):
elif isinstance(desire, Sequence) and not isinstance(desire, str):
return (
isinstance(current, Sequence)
and not isinstance(current, six.string_types)
and not isinstance(current, str)
and len(current) == len(desire)
and all(_state_match(d, c) for d, c in zip(desire, current))
)