veth: Show veth interface as type: veth

Previously, in order to simplify the code workflow, we are showing both
veth and normal ethernet as `type: ethernet`. This has break kubernetes
use case where they need it to identify the veth interfaces.

Changed to show veth interface as `type: veth`. When its peer is in
another network namespace, hide the `veth` section.

Expand the integration test cases to cover these use cases:

 * `test_eth_with_veth_conf`
   Desire state is desiring ethernet interface with veth configuration.
   Raise InvalidArgument error (NmstateValueError exception in python)

 * `test_add_veth_with_ethernet_peer`
   Desire state contains both veth and veth peer interface. But veth
   peer interface is set as `type: ethernet`.

 * `test_add_veth_with_veth_peer_in_desire`
   Desire state contains both veth and veth peer interface. Both veth
   and peer interfaces are set as `type: veth` with `veth` configure
   pointing to each other.

 * `test_modify_veth_peer`
   Change veth peer. The old peer should be removed.

 * `test_veth_without_peer_fails`
   When adding new veth(not exist) without veth peer defined in veth
   configure, raise InvalidArgument error (NmstateValueError exception
   in python).

 * `test_change_veth_with_veth_type_without_veth_conf`
   With pre-exist veth, changing other configure with `type: veth` and
   no veth configure.

 * `test_change_veth_with_eth_type_without_veth_conf`
   With pre-exist veth, changing other configure with `type: ethernet` and
   no veth configure.

 * `test_veth_with_ignored_peer`
   With pre-exist veth and its peer marked as ignored (unmanaged by NM).
   Applying desire state with `type: veth` and veth configuration should
   not fail. And the veth configuration should be ignored.

 * `test_veth_with_ignored_peer_changed_to_new_peer`
   With pre-exist veth and its peer marked as ignored (unmanaged by NM).
   Nmstate will raise InvalidArgument error (NmstateValueError exception
   in python) when user try to change its peer. Also have unit test case
   for this.

Signed-off-by: Gris Ge <fge@redhat.com>
This commit is contained in:
Gris Ge 2022-09-02 20:00:33 +08:00 committed by Fernando Fernández Mancera
parent da3d2301b2
commit e94a54fa76
15 changed files with 549 additions and 353 deletions

View File

@ -201,6 +201,7 @@ function prepare_network_environment {
exec_cmd "ip link set ${peer} up"
exec_cmd "ip link set ${device} up"
exec_cmd "nmcli device set ${device} managed yes"
exec_cmd "nmcli device set ${peer} managed no"
fi
done
set -e

View File

@ -353,6 +353,8 @@ impl Interface {
Self::Ethernet(iface) => {
let mut new_iface = EthernetInterface::new();
new_iface.base = iface.base.clone_name_type_only();
// Do not use veth interface type when clone internally
new_iface.base.iface_type = InterfaceType::Ethernet;
Self::Ethernet(new_iface)
}
Self::Vlan(iface) => {
@ -665,9 +667,7 @@ impl Interface {
pub(crate) fn pre_edit_cleanup(&mut self) -> Result<(), NmstateError> {
self.base_iface_mut().pre_edit_cleanup()?;
if let Interface::Ethernet(iface) = self {
if iface.veth.is_some() {
iface.base.iface_type = InterfaceType::Veth;
}
iface.pre_edit_cleanup()?;
}
if let Interface::OvsInterface(iface) = self {
iface.pre_edit_cleanup()?;

View File

@ -45,11 +45,32 @@ impl EthernetInterface {
}
}
pub(crate) fn pre_edit_cleanup(&mut self) -> Result<(), NmstateError> {
if self.base.iface_type != InterfaceType::Veth && self.veth.is_some() {
let e = NmstateError::new(
ErrorKind::InvalidArgument,
format!(
"Interface {} is holding veth configuration \
with `type: ethernet`. Please change to `type: veth`",
self.base.name.as_str()
),
);
log::error!("{}", e);
Err(e)
} else {
Ok(())
}
}
pub(crate) fn pre_verify_cleanup(&mut self) {
if let Some(eth_conf) = self.ethernet.as_mut() {
eth_conf.pre_verify_cleanup()
}
self.base.iface_type = InterfaceType::Ethernet;
if self.base.iface_type == InterfaceType::Ethernet {
self.veth = None;
} else {
self.base.iface_type = InterfaceType::Ethernet;
}
}
pub fn new() -> Self {
@ -76,20 +97,6 @@ impl EthernetInterface {
}
Ok(())
}
// veth config is ignored unless iface type is veth
pub(crate) fn veth_sanitize(&mut self) {
if self.base.iface_type == InterfaceType::Ethernet
&& self.veth.is_some()
{
log::warn!(
"Veth configuration is ignored, please set interface type \
to InterfaceType::Veth (veth) to change veth \
configuration"
);
self.veth = None;
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@ -226,6 +233,16 @@ pub(crate) fn handle_veth_peer_changes(
}
}
for iface in chg_ifaces.kernel_ifaces.values_mut() {
if iface.iface_type() == InterfaceType::Veth {
if let Interface::Ethernet(eth_iface) = iface {
if eth_iface.veth.is_none() {
eth_iface.base.iface_type = InterfaceType::Ethernet;
}
}
}
}
let mut del_peers: Vec<&str> = Vec::new();
for iface in del_ifaces
.kernel_ifaces

View File

@ -47,20 +47,9 @@ impl<'de> Deserialize<'de> for Interfaces {
D: Deserializer<'de>,
{
let mut ret = Self::new();
let ifaces =
<Vec<Interface> as Deserialize>::deserialize(deserializer)?;
for mut iface in ifaces {
// Unless user place veth configure in ethernet interface,
// it means user just applying the return of
// NetworkState.retrieve(). If user would like to change
// veth configuration, it should use veth interface
// type.
if iface.iface_type() == InterfaceType::Ethernet {
if let Interface::Ethernet(ref mut eth_iface) = iface {
eth_iface.veth_sanitize();
}
}
ret.push(iface)
for iface in <Vec<Interface> as Deserialize>::deserialize(deserializer)?
{
ret.push(iface);
}
Ok(ret)
}
@ -300,9 +289,72 @@ impl Interfaces {
iface.remove_port(ignore_port);
}
}
if iface.iface_type() == InterfaceType::Veth {
if let Interface::Ethernet(eth_iface) = iface {
if let Some(veth_conf) = eth_iface.veth.as_ref() {
if kernel_iface_names.contains(veth_conf.peer.as_str())
{
log::info!(
"Veth interface {} is holding ignored peer {}",
eth_iface.base.name,
veth_conf.peer.as_str()
);
eth_iface.veth = None;
eth_iface.base.iface_type = InterfaceType::Ethernet;
}
}
}
}
}
}
// Not allowing changing veth peer away from ignored peer unless previous
// peer changed from ignore to managed
pub(crate) fn pre_ignore_check(
&self,
current: &Self,
ignored_kernel_iface_names: &[String],
) -> Result<(), NmstateError> {
for iface in self
.kernel_ifaces
.values()
.filter(|i| i.iface_type() == InterfaceType::Veth)
{
if let (
Interface::Ethernet(des_iface),
Some(Interface::Ethernet(cur_iface)),
) = (iface, current.get_iface(iface.name(), InterfaceType::Veth))
{
if let (Some(des_peer), Some(cur_peer)) = (
des_iface.veth.as_ref().map(|v| v.peer.as_str()),
cur_iface.veth.as_ref().map(|v| v.peer.as_str()),
) {
if des_peer != cur_peer
&& ignored_kernel_iface_names
.contains(&cur_peer.to_string())
{
let e = NmstateError::new(
ErrorKind::InvalidArgument,
format!(
"Veth interface {} is currently holding \
peer {} which is marked as ignored. \
Hence not allowing changing its peer \
to {}. Please remove this veth pair \
before changing veth peer",
iface.name(),
cur_peer,
des_peer
),
);
log::error!("{}", e);
return Err(e);
}
}
}
}
Ok(())
}
pub(crate) fn gen_state_for_apply(
&mut self,
current: &Self,
@ -342,7 +394,9 @@ impl Interfaces {
match current.get_iface(iface.name(), iface.iface_type()) {
Some(cur_iface) => {
let mut chg_iface = iface.clone();
chg_iface.set_iface_type(cur_iface.iface_type());
if cur_iface.iface_type() == InterfaceType::Unknown {
chg_iface.set_iface_type(cur_iface.iface_type());
}
chg_iface.pre_edit_cleanup()?;
info!(
"Changing interface {} with type {}, \
@ -654,15 +708,17 @@ fn gen_ifaces_to_del(
let mut del_ifaces = Vec::new();
let cur_ifaces = cur_ifaces.to_vec();
for cur_iface in cur_ifaces {
// Internally we use ethernet type for veth, hence we should
// change desire before searching.
let del_iface_type = match del_iface.iface_type() {
InterfaceType::Veth => InterfaceType::Ethernet,
t => t,
};
let cur_iface_type = match cur_iface.iface_type() {
InterfaceType::Veth => InterfaceType::Ethernet,
t => t,
};
if cur_iface.name() == del_iface.name()
&& (del_iface_type == InterfaceType::Unknown
|| del_iface_type == cur_iface.iface_type())
|| del_iface_type == cur_iface_type)
{
let mut tmp_iface = del_iface.clone();
tmp_iface.base_iface_mut().iface_type = cur_iface.iface_type();

View File

@ -254,6 +254,11 @@ impl NetworkState {
)
}
desire_state_to_apply.interfaces.pre_ignore_check(
&cur_net_state.interfaces,
&ignored_kernel_ifaces,
)?;
desire_state_to_apply.interfaces.remove_ignored_ifaces(
&ignored_kernel_ifaces,
&ignored_user_ifaces,

View File

@ -47,7 +47,6 @@ fn net_state_to_nispor(
}
np_ifaces.push(nmstate_iface_to_np(iface, np_iface_type)?);
} else if iface.is_absent() {
println!("del {:?} {:?}", iface.name(), iface.iface_type());
let mut iface_conf = nispor::IfaceConf::default();
iface_conf.name = iface.name().to_string();
iface_conf.iface_type =

View File

@ -30,8 +30,7 @@ pub(crate) fn nispor_retrieve(
let np_state = nispor::NetState::retrieve().map_err(np_error_to_nmstate)?;
for (_, np_iface) in np_state.ifaces.iter() {
let mut base_iface =
np_iface_to_base_iface(np_iface, running_config_only);
let base_iface = np_iface_to_base_iface(np_iface, running_config_only);
// The `ovs-system` is reserved for OVS kernel datapath
if np_iface.name == "ovs-system" {
continue;
@ -60,7 +59,6 @@ pub(crate) fn nispor_retrieve(
np_ethernet_to_nmstate(np_iface, base_iface),
),
InterfaceType::Veth => {
base_iface.iface_type = InterfaceType::Ethernet;
Interface::Ethernet(np_veth_to_nmstate(np_iface, base_iface))
}
InterfaceType::Vlan => {

View File

@ -7,8 +7,7 @@ pub(crate) fn np_veth_to_nmstate(
let veth_conf = np_iface.veth.as_ref().and_then(|np_veth_info| {
if np_veth_info.peer.as_str().parse::<u32>().is_ok() {
// If veth peer is interface index, it means its veth peer is in
// another network namespace, we should treat this interface
// as ethernet
// another network namespace, we hide the veth section
None
} else {
Some(VethConfig {

View File

@ -152,8 +152,15 @@ pub(crate) fn iface_to_nm_connections(
if let Some(cur_iface) =
cur_net_state.get_kernel_iface_with_route(iface.name())
{
// Do no try to persistent veth config of current interface
let mut iface = cur_iface;
if let Interface::Ethernet(eth_iface) = &mut iface {
eth_iface.veth = None;
eth_iface.base.iface_type = InterfaceType::Ethernet;
}
return iface_to_nm_connections(
&cur_iface,
&iface,
ctrl_iface,
exist_nm_conns,
nm_ac_uuids,
@ -345,9 +352,28 @@ pub(crate) fn create_index_for_nm_conns_by_name_type(
let mut ret: HashMap<(&str, &str), Vec<&NmConnection>> = HashMap::new();
for nm_conn in nm_conns {
if let Some(iface_name) = nm_conn.iface_name() {
if let Some(mut nm_iface_type) = nm_conn.iface_type() {
if let Some(nm_iface_type) = nm_conn.iface_type() {
if nm_iface_type == NM_SETTING_VETH_SETTING_NAME {
nm_iface_type = NM_SETTING_WIRED_SETTING_NAME;
match ret.entry((iface_name, NM_SETTING_WIRED_SETTING_NAME))
{
Entry::Occupied(o) => {
o.into_mut().push(nm_conn);
}
Entry::Vacant(v) => {
v.insert(vec![nm_conn]);
}
};
}
if nm_iface_type == NM_SETTING_WIRED_SETTING_NAME {
match ret.entry((iface_name, NM_SETTING_VETH_SETTING_NAME))
{
Entry::Occupied(o) => {
o.into_mut().push(nm_conn);
}
Entry::Vacant(v) => {
v.insert(vec![nm_conn]);
}
};
}
match ret.entry((iface_name, nm_iface_type)) {
Entry::Occupied(o) => {
@ -468,13 +494,13 @@ pub(crate) fn gen_nm_conn_setting(
} else {
NmApi::uuid_gen()
});
if new_nm_conn_set.iface_type.is_none() {
// The `get_exist_profile()` already confirmed the existing
// profile has correct `iface_type`. We should not override it.
// The use case is, existing connection is veth, but user desire
// ethernet interface, we should use existing interface type.
new_nm_conn_set.iface_type =
Some(iface_type_to_nm(&iface.iface_type())?);
new_nm_conn_set.iface_type =
Some(iface_type_to_nm(&iface.iface_type())?);
if let Interface::Ethernet(eth_iface) = iface {
if eth_iface.veth.is_some() {
new_nm_conn_set.iface_type =
Some(NM_SETTING_VETH_SETTING_NAME.to_string());
}
}
new_nm_conn_set
};

View File

@ -27,8 +27,6 @@ pub(crate) fn get_exist_profile<'a>(
let mut found_nm_conns: Vec<&NmConnection> = Vec::new();
for exist_nm_conn in exist_nm_conns {
let nm_iface_type = if let Ok(t) = iface_type_to_nm(iface_type) {
// The iface_type will never be veth as top level code
// `pre_edit_clean()` has confirmed so.
t
} else {
continue;

View File

@ -223,7 +223,6 @@ fn nm_conn_to_base_iface(
base_iface.prop_list = vec![
"name",
"state",
"iface_type",
"ipv4",
"ipv6",
"ieee8021x",
@ -233,6 +232,11 @@ fn nm_conn_to_base_iface(
];
base_iface.state = InterfaceState::Up;
base_iface.iface_type = nm_dev_iface_type_to_nmstate(nm_dev);
if base_iface.iface_type.is_userspace() {
// Only override iface type for user space. For other interface,
// we trust nispor to set the correct interface type.
base_iface.prop_list.push("iface_type");
}
base_iface.ipv4 = ipv4;
base_iface.ipv6 = ipv6;
base_iface.wait_ip =
@ -424,7 +428,7 @@ fn nm_dev_to_nm_iface(nm_dev: &NmDevice) -> Option<Interface> {
} else {
base_iface.name = nm_dev.name.clone();
}
base_iface.prop_list = vec!["name", "iface_type", "state"];
base_iface.prop_list = vec!["name", "state"];
match nm_dev.state {
NmDeviceState::Unmanaged => {
if !nm_dev.real {
@ -437,7 +441,7 @@ fn nm_dev_to_nm_iface(nm_dev: &NmDevice) -> Option<Interface> {
_ => base_iface.state = InterfaceState::Up,
}
base_iface.iface_type = nm_dev_iface_type_to_nmstate(nm_dev);
Some(match &base_iface.iface_type {
let mut iface = match &base_iface.iface_type {
InterfaceType::Ethernet => Interface::Ethernet({
let mut iface = EthernetInterface::new();
iface.base = base_iface;
@ -522,5 +526,11 @@ fn nm_dev_to_nm_iface(nm_dev: &NmDevice) -> Option<Interface> {
iface.base = base_iface;
iface
}),
})
};
if iface.iface_type().is_userspace() {
// Only override iface type for user space. For other interface,
// we trust nispor to set the correct interface type.
iface.base_iface_mut().prop_list.push("iface_type");
}
Some(iface)
}

View File

@ -1,4 +1,4 @@
use crate::EthernetInterface;
use crate::{ErrorKind, EthernetInterface, Interfaces};
#[test]
fn test_ethernet_stringlized_attributes() {
@ -38,3 +38,41 @@ ethernet:
assert_eq!(vf_conf.vlan_id, Some(102));
assert_eq!(vf_conf.qos, Some(103));
}
#[test]
fn test_veth_change_peer_away_from_ignored_peer() {
let desired: Interfaces = serde_yaml::from_str(
r#"---
- name: veth1
type: veth
state: up
veth:
peer: newpeer
"#,
)
.unwrap();
let current: Interfaces = serde_yaml::from_str(
r#"---
- name: veth1
type: veth
state: up
veth:
peer: veth1peer
- name: veth1peer
type: veth
state: ignore
veth:
peer: veth1
"#,
)
.unwrap();
let ignored_kernel_ifaces = vec!["veth1peer".to_string()];
let result = desired.pre_ignore_check(&current, &ignored_kernel_ifaces);
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(e.kind(), ErrorKind::InvalidArgument);
}
}

View File

@ -20,8 +20,11 @@
import pytest
import libnmstate
from libnmstate.error import NmstateValueError
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceState
from libnmstate.schema import InterfaceType
from libnmstate.schema import Veth
from ..testlib.env import nm_major_minor_version
from ..testlib import cmdlib
@ -30,6 +33,7 @@ from ..testlib.veth import veth_interface
VETH1 = "veth1"
VETH1PEER = "veth1peer"
VETH1PEER2 = "veth1ep"
@pytest.mark.skipif(
@ -46,3 +50,67 @@ def test_remove_peer_connection():
cmdlib.exec_cmd(f"nmcli connection show {VETH1PEER}".split())[0]
!= 0
)
@pytest.fixture
def veth1_with_ignored_peer():
cmdlib.exec_cmd(
f"ip link add {VETH1} type veth peer {VETH1PEER}".split(), check=True
)
cmdlib.exec_cmd(f"ip link set {VETH1} up".split(), check=True)
cmdlib.exec_cmd(f"ip link set {VETH1PEER} up".split(), check=True)
cmdlib.exec_cmd(f"nmcli d set {VETH1} managed true".split(), check=True)
cmdlib.exec_cmd(
f"nmcli d set {VETH1PEER} managed false".split(), check=True
)
yield
cmdlib.exec_cmd(f"nmcli c del {VETH1}".split())
cmdlib.exec_cmd(f"nmcli c del {VETH1PEER}".split())
cmdlib.exec_cmd(f"ip link del {VETH1}".split())
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
def test_veth_with_ignored_peer(veth1_with_ignored_peer):
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
}
]
}
libnmstate.apply(desired_state)
assert (
"unmanaged"
in cmdlib.exec_cmd(
f"nmcli -g GENERAL.STATE d show {VETH1PEER}".split()
)[1]
)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
def test_veth_with_ignored_peer_changed_to_new_peer(veth1_with_ignored_peer):
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER2,
},
}
]
}
with pytest.raises(NmstateValueError):
libnmstate.apply(desired_state)

View File

@ -53,7 +53,7 @@ def remove_veth_pair(nic, peer_ns):
@contextmanager
def veth_interface(ifname, peer):
def veth_interface(ifname, peer, kernel_mode=False):
d_state = {
Interface.KEY: [
{
@ -67,7 +67,7 @@ def veth_interface(ifname, peer):
]
}
try:
libnmstate.apply(d_state)
libnmstate.apply(d_state, kernel_only=kernel_mode)
yield d_state
finally:
d_state[Interface.KEY][0][Interface.STATE] = InterfaceState.ABSENT
@ -78,4 +78,4 @@ def veth_interface(ifname, peer):
Interface.STATE: InterfaceState.ABSENT,
}
)
libnmstate.apply(d_state)
libnmstate.apply(d_state, kernel_only=kernel_mode, verify_change=False)

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2021 Red Hat, Inc.
# Copyright (c) 2021-2022 Red Hat, Inc.
#
# This file is part of nmstate
#
@ -34,6 +34,7 @@ from libnmstate.schema import VLAN
from .testlib import assertlib
from .testlib import statelib
from .testlib.veth import veth_interface
from .testlib.env import nm_major_minor_version
@ -48,7 +49,7 @@ VETH1_VLAN = "veth1.0"
reason="Modifying veth interfaces is supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_veth_not_supported():
def test_add_veth_not_supported(self):
desired_state = {
Interface.KEY: [
{
@ -68,208 +69,288 @@ def test_add_veth_not_supported():
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_veth_with_ethernet_peer():
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: Veth.TYPE,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
class TestVeth:
def test_eth_with_veth_conf(self, eth1_up):
d_state = {
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.TYPE: InterfaceType.ETHERNET,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
},
},
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.ETHERNET,
Interface.STATE: InterfaceState.UP,
},
]
}
try:
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.ETHERNET,
Interface.STATE: InterfaceState.UP,
},
]
}
with pytest.raises(NmstateValueError):
libnmstate.apply(d_state)
@pytest.mark.tier1
def test_add_veth_with_ethernet_peer(self):
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: Veth.TYPE,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
},
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.ETHERNET,
Interface.STATE: InterfaceState.UP,
},
]
}
try:
libnmstate.apply(d_state)
assertlib.assert_state_match(d_state)
finally:
d_state[Interface.KEY][0][Interface.STATE] = InterfaceState.ABSENT
d_state[Interface.KEY][1][Interface.STATE] = InterfaceState.ABSENT
libnmstate.apply(d_state)
@pytest.mark.tier1
def test_add_with_peer_not_mentioned_in_desire(self):
with veth_interface(VETH1, VETH1PEER) as desired_state:
assertlib.assert_state(desired_state)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.tier1
def test_add_and_remove_veth_kernel_mode(self):
with veth_interface(
VETH1, VETH1PEER, kernel_mode=True
) as desired_state:
assertlib.assert_state(desired_state)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.tier1
def test_add_veth_with_veth_peer_in_desire(self):
with veth_interface_both_up(VETH1, VETH1PEER):
c_state = statelib.show_only(
(
VETH1,
VETH1PEER,
)
)
assert (
c_state[Interface.KEY][0][Interface.STATE] == InterfaceState.UP
)
assert (
c_state[Interface.KEY][1][Interface.STATE] == InterfaceState.UP
)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.tier1
def test_add_veth_as_bridge_port(self):
with veth_interface(VETH1, VETH1PEER):
with bridges_with_port() as desired_state:
assertlib.assert_state_match(desired_state)
@pytest.mark.tier1
def test_modify_veth_peer(self):
with veth_interface(VETH1, VETH1PEER) as d_state:
d_state[Interface.KEY][0][Veth.CONFIG_SUBTREE][
Veth.PEER
] = VETH2PEER
libnmstate.apply(d_state)
c_state = statelib.show_only(
(
VETH1,
VETH2PEER,
)
)
assert (
c_state[Interface.KEY][0][Veth.CONFIG_SUBTREE][Veth.PEER]
== VETH2PEER
)
assert c_state[Interface.KEY][1][Interface.NAME] == VETH2PEER
@pytest.mark.tier1
def test_veth_as_vlan_base_iface(self):
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
},
{
Interface.NAME: VETH1_VLAN,
Interface.TYPE: InterfaceType.VLAN,
Interface.STATE: InterfaceState.UP,
VLAN.CONFIG_SUBTREE: {
VLAN.BASE_IFACE: VETH1,
VLAN.ID: 0,
},
},
]
}
libnmstate.apply(d_state)
assertlib.assert_state_match(d_state)
finally:
c_state = statelib.show_only(
(
VETH1,
VETH1_VLAN,
)
)
assert c_state[Interface.KEY][0][Interface.STATE] == InterfaceState.UP
assert c_state[Interface.KEY][1][Interface.STATE] == InterfaceState.UP
d_state[Interface.KEY][0][Interface.STATE] = InterfaceState.ABSENT
d_state[Interface.KEY][1][Interface.STATE] = InterfaceState.ABSENT
libnmstate.apply(d_state)
@pytest.mark.tier1
def test_veth_enable_and_disable_accept_all_mac_addresses(self):
with veth_interface(VETH1, VETH1PEER) as d_state:
d_state[Interface.KEY][0][
Interface.ACCEPT_ALL_MAC_ADDRESSES
] = True
libnmstate.apply(d_state)
assertlib.assert_state(d_state)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_and_remove_veth():
with veth_interface(VETH1, VETH1PEER) as desired_state:
assertlib.assert_state(desired_state)
d_state[Interface.KEY][0][
Interface.ACCEPT_ALL_MAC_ADDRESSES
] = False
libnmstate.apply(d_state)
assertlib.assert_state(d_state)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.tier1
def test_veth_without_peer_fails(self):
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
}
]
}
@pytest.mark.tier1
def test_add_and_remove_veth_kernel_mode():
with veth_interface(VETH1, VETH1PEER, kernel_mode=True) as desired_state:
assertlib.assert_state(desired_state)
with pytest.raises(NmstateValueError):
libnmstate.apply(d_state)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_veth_both_up():
with veth_interface(VETH1, VETH1PEER):
c_state = statelib.show_only(
(
VETH1,
VETH1PEER,
)
)
assert c_state[Interface.KEY][0][Interface.STATE] == InterfaceState.UP
assert c_state[Interface.KEY][1][Interface.STATE] == InterfaceState.UP
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_veth_as_bridge_port():
with veth_interface(VETH1, VETH1PEER):
with bridges_with_port() as desired_state:
def test_new_veth_with_ipv6_only(self):
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
Interface.IPV6: {
InterfaceIPv6.ENABLED: True,
InterfaceIPv6.DHCP: False,
InterfaceIPv6.AUTOCONF: False,
InterfaceIPv6.ADDRESS: [
{
InterfaceIPv6.ADDRESS_IP: "2001:db8:1::1",
InterfaceIPv6.ADDRESS_PREFIX_LENGTH: 64,
}
],
},
}
]
}
try:
libnmstate.apply(desired_state)
assertlib.assert_state_match(desired_state)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_add_veth_and_bring_both_up():
with veth_interface_both_up(VETH1, VETH1PEER):
c_state = statelib.show_only(
(
VETH1,
VETH1PEER,
)
)
assert c_state[Interface.KEY][0][Interface.STATE] == InterfaceState.UP
assert c_state[Interface.KEY][1][Interface.STATE] == InterfaceState.UP
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_modify_veth_peer():
with veth_interface(VETH1, VETH1PEER) as d_state:
d_state[Interface.KEY][0][Veth.CONFIG_SUBTREE][Veth.PEER] = VETH2PEER
libnmstate.apply(d_state)
c_state = statelib.show_only(
(
VETH1,
VETH2PEER,
)
)
assert (
c_state[Interface.KEY][0][Veth.CONFIG_SUBTREE][Veth.PEER]
== VETH2PEER
)
assert c_state[Interface.KEY][1][Interface.NAME] == VETH2PEER
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_veth_as_vlan_base_iface():
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
finally:
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.ABSENT,
},
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.ABSENT,
},
]
},
},
{
Interface.NAME: VETH1_VLAN,
Interface.TYPE: InterfaceType.VLAN,
Interface.STATE: InterfaceState.UP,
VLAN.CONFIG_SUBTREE: {
VLAN.BASE_IFACE: VETH1,
VLAN.ID: 0,
},
},
]
}
libnmstate.apply(d_state)
verify_change=False,
)
c_state = statelib.show_only(
(
VETH1,
VETH1_VLAN,
)
)
assert c_state[Interface.KEY][0][Interface.STATE] == InterfaceState.UP
assert c_state[Interface.KEY][1][Interface.STATE] == InterfaceState.UP
def test_veth_invalid_mtu_smaller_than_min(self, eth1_up):
with pytest.raises(NmstateValueError):
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.TYPE: InterfaceType.VETH,
Interface.MTU: 32,
},
]
}
)
d_state[Interface.KEY][0][Interface.STATE] = InterfaceState.ABSENT
d_state[Interface.KEY][1][Interface.STATE] = InterfaceState.ABSENT
libnmstate.apply(d_state)
def test_veth_invalid_mtu_bigger_than_max(self, eth1_up):
with pytest.raises(NmstateValueError):
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.TYPE: InterfaceType.VETH,
Interface.MTU: 1500000,
},
]
}
)
def test_change_veth_with_veth_type_without_veth_conf(self, veth1_up):
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
}
]
}
libnmstate.apply(desired_state)
assertlib.assert_state_match(desired_state)
@pytest.mark.tier1
@pytest.mark.skipif(
nm_major_minor_version() < 1.31,
reason="Modifying accept-all-mac-addresses is not supported on NM.",
)
@pytest.mark.tier1
def test_veth_enable_and_disable_accept_all_mac_addresses():
with veth_interface(VETH1, VETH1PEER) as d_state:
d_state[Interface.KEY][0][Interface.ACCEPT_ALL_MAC_ADDRESSES] = True
libnmstate.apply(d_state)
assertlib.assert_state(d_state)
d_state[Interface.KEY][0][Interface.ACCEPT_ALL_MAC_ADDRESSES] = False
libnmstate.apply(d_state)
assertlib.assert_state(d_state)
assertlib.assert_absent(VETH1)
assertlib.assert_absent(VETH1PEER)
@pytest.mark.skipif(
nm_major_minor_version() <= 1.28,
reason="Modifying veth interfaces is not supported on NetworkManager.",
)
@pytest.mark.tier1
def test_veth_without_peer_fails():
d_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
}
]
}
with pytest.raises(NmstateValueError):
libnmstate.apply(d_state)
def test_change_veth_with_eth_type_without_veth_conf(self, veth1_up):
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.ETHERNET,
Interface.STATE: InterfaceState.UP,
}
]
}
libnmstate.apply(desired_state)
assertlib.assert_state_match(desired_state)
@contextmanager
@ -340,107 +421,7 @@ def veth_interface_both_up(ifname, peer):
libnmstate.apply(d_state)
@contextmanager
def veth_interface(ifname, peer, kernel_mode=False):
d_state = {
Interface.KEY: [
{
Interface.NAME: ifname,
Interface.TYPE: Veth.TYPE,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: peer,
},
}
]
}
try:
libnmstate.apply(d_state, kernel_only=kernel_mode)
yield d_state
finally:
d_state[Interface.KEY][0][Interface.STATE] = InterfaceState.ABSENT
d_state[Interface.KEY].append(
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.ABSENT,
}
)
libnmstate.apply(d_state, kernel_only=kernel_mode)
def test_new_veth_with_ipv6_only():
desired_state = {
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.UP,
Veth.CONFIG_SUBTREE: {
Veth.PEER: VETH1PEER,
},
Interface.IPV6: {
InterfaceIPv6.ENABLED: True,
InterfaceIPv6.DHCP: False,
InterfaceIPv6.AUTOCONF: False,
InterfaceIPv6.ADDRESS: [
{
InterfaceIPv6.ADDRESS_IP: "2001:db8:1::1",
InterfaceIPv6.ADDRESS_PREFIX_LENGTH: 64,
}
],
},
}
]
}
try:
libnmstate.apply(desired_state)
assertlib.assert_state_match(desired_state)
finally:
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: VETH1,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.ABSENT,
},
{
Interface.NAME: VETH1PEER,
Interface.TYPE: InterfaceType.VETH,
Interface.STATE: InterfaceState.ABSENT,
},
]
},
verify_change=False,
)
def test_veth_invalid_mtu_smaller_than_min(eth1_up):
with pytest.raises(NmstateValueError):
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.TYPE: InterfaceType.VETH,
Interface.MTU: 32,
},
]
}
)
def test_veth_invalid_mtu_bigger_than_max(eth1_up):
with pytest.raises(NmstateValueError):
libnmstate.apply(
{
Interface.KEY: [
{
Interface.NAME: "eth1",
Interface.TYPE: InterfaceType.VETH,
Interface.MTU: 1500000,
},
]
}
)
@pytest.fixture
def veth1_up():
with veth_interface(VETH1, VETH1PEER):
yield