nm, linux bridge, vlan filtering: implement port vlan for linux bridges

This patch implements trunk ports and access ports as per the
port vlan schema - which was introduced in [0] - for the NM
provider.

[0] - #440

Signed-off-by: Miguel Duarte Barroso <mdbarroso@redhat.com>
This commit is contained in:
Miguel Duarte Barroso 2020-03-05 10:17:12 +01:00 committed by Gris Ge
parent fee1d5fd79
commit 321c483158
2 changed files with 434 additions and 0 deletions

View File

@ -0,0 +1,208 @@
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from libnmstate.nm import nmclient
from libnmstate.schema import LinuxBridge as LB
class PortVlanFilter:
def __init__(self):
self._trunk_tags = []
self._tag = None
self._is_native = None
self._port_mode = None
def create_configuration(self, trunk_tags, tag, is_native_vlan=False):
"""
Fill the PortVlanFilter object with data whose format is tied to the
API.
:param trunk_tags: list of schema.LinuxBridge.Port.Vlan.TrunkTags
objects.
:param tag: the access tag for access ports, the native vlan ID for
trunk ports
:param is_native_vlan: boolean attribute indicating if the trunk port
has a native vlan.
"""
self._trunk_tags = trunk_tags
self._tag = tag
self._is_native = is_native_vlan
self._port_mode = (
LB.Port.Vlan.Mode.TRUNK if trunk_tags else LB.Port.Vlan.Mode.ACCESS
)
@property
def trunk_tags(self):
return self._trunk_tags
@property
def tag(self):
return self._tag
@property
def is_native(self):
return self._is_native
@property
def port_mode(self):
return self._port_mode
def to_nm(self):
"""
Generate a list of NM.BridgeVlan objects from the encapsulated
PortVlanFilter data
"""
port_vlan_config = []
if self._port_mode == LB.Port.Vlan.Mode.TRUNK:
port_vlan_config += map(
PortVlanFilter._generate_vlan_trunk_port_config,
self._trunk_tags,
)
if self._is_native and self._tag:
port_vlan_config.append(
PortVlanFilter._generate_vlan_access_port_config(self._tag)
)
elif self._port_mode == LB.Port.Vlan.Mode.ACCESS and self._tag:
port_vlan_config.append(
PortVlanFilter._generate_vlan_access_port_config(self._tag)
)
return port_vlan_config
def to_dict(self):
"""
Get the port vlan filtering configuration in dict format - e.g. in yaml
format:
- name: eth1
vlan:
type: trunk
trunk-tags:
- id: 101
- id-range:
min: 200
max: 4095
tag: 100
enable-native: true
"""
port_vlan_state = {
LB.Port.Vlan.MODE: self._port_mode,
LB.Port.Vlan.TRUNK_TAGS: self._trunk_tags,
}
if self._tag:
port_vlan_state[LB.Port.Vlan.TAG] = self._tag
if self._port_mode == LB.Port.Vlan.Mode.TRUNK:
port_vlan_state[LB.Port.Vlan.ENABLE_NATIVE] = self._is_native
return port_vlan_state
def import_from_bridge_settings(self, nm_bridge_vlans):
"""
Instantiates a PortVlanFilter object from a list of NM.BridgeVlan
objects.
"""
self._is_native = False
trunk_tags = []
is_access_port = PortVlanFilter._is_access_port(nm_bridge_vlans)
for nm_bridge_vlan in nm_bridge_vlans:
vlan_min, vlan_max = PortVlanFilter.get_vlan_tag_range(
nm_bridge_vlan
)
if is_access_port:
self._tag = vlan_min
elif nm_bridge_vlan.is_pvid() and nm_bridge_vlan.is_untagged():
# an NM.BridgeVlan has a range and can be PVID and/or untagged
# according to NM's model, PVID / untagged apply to the 'max'
# part of the range
self._tag = vlan_max
self._is_native = True
else:
trunk_tags.append(
PortVlanFilter._translate_nm_bridge_vlan_to_trunk_tags(
vlan_min, vlan_max
)
)
self._trunk_tags = trunk_tags
self._port_mode = (
LB.Port.Vlan.Mode.TRUNK if trunk_tags else LB.Port.Vlan.Mode.ACCESS
)
@staticmethod
def get_vlan_tag_range(nm_bridge_vlan):
"""
Extract the vlan tags from the NM.BridgeVlan object.
A single NM.BridgeVlan object can have a range of vlan tags, or a
single one.
When a NM.BridgeVlan holds a single tag, the min_range and max_range
returned will have the same vlan tag.
:return: min_range, max_range
"""
port_vlan_tags = nm_bridge_vlan.to_str().split()
if "-" in port_vlan_tags[0]:
vlan_min, vlan_max = port_vlan_tags[0].split("-")
return int(vlan_min), int(vlan_max)
else:
tag = int(port_vlan_tags[0])
return tag, tag
@staticmethod
def _is_access_port(nm_bridge_vlan_ports):
return (
len(nm_bridge_vlan_ports) == 1
and nm_bridge_vlan_ports[0].is_pvid()
and nm_bridge_vlan_ports[0].is_untagged()
)
@staticmethod
def _translate_nm_bridge_vlan_to_trunk_tags(min_vlan, max_vlan):
if max_vlan != min_vlan:
port_data = {
LB.Port.Vlan.TrunkTags.ID_RANGE: {
LB.Port.Vlan.TrunkTags.MIN_RANGE: min_vlan,
LB.Port.Vlan.TrunkTags.MAX_RANGE: max_vlan,
}
}
else:
port_data = {LB.Port.Vlan.TrunkTags.ID: min_vlan}
return port_data
@staticmethod
def _generate_vlan_trunk_port_config(trunk_port):
min_range = max_range = trunk_port.get(LB.Port.Vlan.TrunkTags.ID)
if min_range is None:
ranged_vlan_tags = trunk_port.get(LB.Port.Vlan.TrunkTags.ID_RANGE)
min_range = ranged_vlan_tags[LB.Port.Vlan.TrunkTags.MIN_RANGE]
max_range = ranged_vlan_tags[LB.Port.Vlan.TrunkTags.MAX_RANGE]
port_vlan = nmclient.NM.BridgeVlan.new(min_range, max_range)
port_vlan.set_untagged(False)
port_vlan.set_pvid(False)
return port_vlan
@staticmethod
def _generate_vlan_access_port_config(vlan_tag):
port_vlan = nmclient.NM.BridgeVlan.new(vlan_tag, vlan_tag)
port_vlan.set_untagged(True)
port_vlan.set_pvid(True)
return port_vlan

View File

@ -0,0 +1,226 @@
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import pytest
from unittest import mock
from libnmstate import nm
from libnmstate.nm.bridge_port_vlan import PortVlanFilter
from libnmstate.schema import LinuxBridge as LB
ACCESS_TAG = 4000
@pytest.fixture
def NM_mock():
with mock.patch.object(nm.user.nmclient, "NM") as m:
yield m
def _get_port_vlan_ranged_trunks(min_vlan, max_vlan):
return {
LB.Port.Vlan.TrunkTags.ID_RANGE: {
LB.Port.Vlan.TrunkTags.MIN_RANGE: min_vlan,
LB.Port.Vlan.TrunkTags.MAX_RANGE: max_vlan,
}
}
@pytest.mark.parametrize(
"trunk_tags",
[
[],
[{LB.Port.Vlan.TrunkTags.ID: 100}],
[{LB.Port.Vlan.TrunkTags.ID: 100}, {LB.Port.Vlan.TrunkTags.ID: 101}],
[_get_port_vlan_ranged_trunks(100, 199)],
[
_get_port_vlan_ranged_trunks(100, 199),
_get_port_vlan_ranged_trunks(500, 1000),
],
[
{LB.Port.Vlan.TrunkTags.ID: 100},
_get_port_vlan_ranged_trunks(500, 1000),
],
],
ids=[
"no-trunk-tags",
"single-trunk-tag",
"two-single-trunk-tags",
"tag-ranges",
"multiple-tag-ranges",
"mixed-single-tag-and-tag-ranges",
],
)
@pytest.mark.parametrize(
"is_native_vlan", [False, True], ids=["not-native-vlan", "native-vlan"]
)
def test_generate_nm_vlan_filtering_config(trunk_tags, is_native_vlan):
port_vlan_filter = PortVlanFilter()
port_vlan_filter.create_configuration(
trunk_tags, ACCESS_TAG if is_native_vlan else None, is_native_vlan
)
nm_port_bridge_vlans = port_vlan_filter.to_nm()
expected_trunk_tag_length = len(trunk_tags) + (1 if is_native_vlan else 0)
assert expected_trunk_tag_length == len(nm_port_bridge_vlans)
assert is_native_vlan == any(
port_bridge_vlan.is_pvid() and port_bridge_vlan.is_untagged()
for port_bridge_vlan in nm_port_bridge_vlans
)
desired_trunk_tags = _port_vlan_trunk_tags_to_tag_list(
trunk_tags, ACCESS_TAG, is_native_vlan
)
_assert_port_settings_vlan_conf(nm_port_bridge_vlans, desired_trunk_tags)
@pytest.mark.parametrize(
"trunk_tags",
[
[],
[(100, 100)],
[(100, 100), (200, 200)],
[(100, 200)],
[(100, 100), (200, 300)],
[(100, 200), (300, 400)],
],
ids=[
"access-port",
"single-trunk-tag",
"two-trunk-tags",
"one-tag-range",
"mixed-tag-and-range",
"multiple-ranges",
],
)
@pytest.mark.parametrize(
"is_native_vlan", [False, True], ids=["not-native-vlan", "native-vlan"]
)
def test_bridge_port_vlan_to_dict(NM_mock, trunk_tags, is_native_vlan):
port_vlans = _generate_port_vlan_mocks(
trunk_tags, is_native_vlan, ACCESS_TAG
)
vlan_config = PortVlanFilter()
vlan_config.import_from_bridge_settings(port_vlans)
assert vlan_config.tag == (
ACCESS_TAG if (not trunk_tags or is_native_vlan) else None
)
assert len(vlan_config.trunk_tags) == len(trunk_tags)
assert vlan_config.is_native == (is_native_vlan and len(trunk_tags) > 0)
assert vlan_config.to_dict() == _get_vlan_config_dict(
trunk_tags, ACCESS_TAG, is_native_vlan
)
def _port_vlan_trunk_tags_to_tag_list(trunk_tags, access_tag, enable_vlan):
tag_list = []
for trunk_tag in trunk_tags:
single_tag = trunk_tag.get(LB.Port.Vlan.TrunkTags.ID)
range_tags = trunk_tag.get(LB.Port.Vlan.TrunkTags.ID_RANGE, {})
if single_tag:
tag_list.append(single_tag)
else:
tag_list += range(
range_tags.get(LB.Port.Vlan.TrunkTags.MIN_RANGE),
range_tags.get(LB.Port.Vlan.TrunkTags.MAX_RANGE) + 1,
)
if enable_vlan:
tag_list.append(access_tag)
return tag_list
def _assert_port_settings_vlan_conf(port_vlans, desired_vlans):
expected_vlans = set(desired_vlans)
for port_vlan in port_vlans:
min_vlan, max_vlan = PortVlanFilter.get_vlan_tag_range(port_vlan)
for i in range(min_vlan, max_vlan + 1):
expected_vlans.remove(i)
assert not expected_vlans, "vlans: {} were not configured".format(
expected_vlans
)
def _generate_port_vlan_mocks(trunk_tags, is_native_vlan, access_tag):
port_vlans = []
for trunk_tag in trunk_tags:
min_tag_range, max_tag_range = (trunk_tag[i] for i in range(2))
trunk_tag_vlan_config = _generate_bridge_vlan_mock(
min_tag_range, max_tag_range, pvid=False, untagged=False
)
port_vlans.append(trunk_tag_vlan_config)
if is_native_vlan or not trunk_tags:
port_vlans.append(
_generate_bridge_vlan_mock(
access_tag, access_tag, pvid=True, untagged=True
)
)
return port_vlans
def _generate_bridge_vlan_mock(
min_tag_range, max_tag_range, pvid=False, untagged=False
):
bridge_vlan = mock.MagicMock()
bridge_vlan.is_untagged.return_value = untagged
bridge_vlan.is_pvid.return_value = pvid
bridge_vlan.to_str.return_value = "{}{}".format(
min_tag_range,
"-{}".format(max_tag_range) if max_tag_range != min_tag_range else "",
)
return bridge_vlan
def _get_vlan_config_dict(trunk_tags, access_tag, enable_native_vlan):
vlan_config_dict = {
LB.Port.Vlan.MODE: _get_port_vlan_mode(trunk_tags),
LB.Port.Vlan.TRUNK_TAGS: _tag_list_to_trunk_tags(trunk_tags),
}
if trunk_tags:
if enable_native_vlan:
vlan_config_dict[LB.Port.Vlan.TAG] = access_tag
vlan_config_dict[LB.Port.Vlan.ENABLE_NATIVE] = enable_native_vlan
else:
vlan_config_dict[LB.Port.Vlan.TAG] = access_tag
return vlan_config_dict
def _tag_list_to_trunk_tags(tag_config_list):
trunk_tags = []
for tag_config in tag_config_list:
min_range, max_range = tag_config
if min_range == max_range:
trunk_tags.append({LB.Port.Vlan.TrunkTags.ID: min_range})
else:
trunk_tags.append(
{
LB.Port.Vlan.TrunkTags.ID_RANGE: {
LB.Port.Vlan.TrunkTags.MIN_RANGE: min_range,
LB.Port.Vlan.TrunkTags.MAX_RANGE: max_range,
}
}
)
return trunk_tags
def _get_port_vlan_mode(trunk_tags):
if not trunk_tags:
return LB.Port.Vlan.Mode.ACCESS
else:
return LB.Port.Vlan.Mode.TRUNK