1
0
mirror of https://github.com/systemd/systemd.git synced 2025-03-11 20:58:27 +03:00

test-network: move get_dbus_dhcp_client_state() and friends to global

This commit is contained in:
Yu Watanabe 2023-10-06 14:10:18 +09:00 committed by Luca Boccassi
parent 8aa59287c0
commit e081ffc114

View File

@ -607,6 +607,47 @@ def stop_isc_dhcpd():
stop_by_pid_file(isc_dhcpd_pid_file)
rm_f(isc_dhcpd_lease_file)
def get_dbus_link_path(link):
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
'/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
'GetLinkByName', 's', link])
assert out.startswith(b'io ')
out = out.strip()
assert out.endswith(b'"')
out = out.decode()
return out[:-1].split('"')[1]
def get_dhcp_client_state(link, family):
link_path = get_dbus_link_path(link)
out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.network1',
link_path, f'org.freedesktop.network1.DHCPv{family}Client', 'State'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
return out[3:-1].decode()
def get_dhcp4_client_state(link):
return get_dhcp_client_state(link, '4')
def get_dhcp6_client_state(link):
return get_dhcp_client_state(link, '6')
def get_link_description(link):
link_path = get_dbus_link_path(link)
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
link_path, 'org.freedesktop.network1.Link', 'Describe'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
json_raw = out[2:].decode()
check_json(json_raw)
description = json.loads(json_raw) # Convert from escaped sequences to json
check_json(description)
return json.loads(description) # Now parse the json
def start_radvd(*additional_options, config_file):
config_file_path = os.path.join(networkd_ci_temp_dir, 'radvd', config_file)
command = (
@ -5116,41 +5157,20 @@ class NetworkdDHCPClientTests(unittest.TestCase, Utilities):
self.assertNotIn('rapid-commit', output)
def test_dhcp_client_ipv6_dbus_status(self):
def get_dbus_dhcp6_client_state(IF):
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
'/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
'GetLinkByName', 's', IF])
assert out.startswith(b'io ')
out = out.strip()
assert out.endswith(b'"')
out = out.decode()
linkPath = out[:-1].split('"')[1]
print(f"Found {IF} link path: {linkPath}")
out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.network1',
linkPath, 'org.freedesktop.network1.DHCPv6Client', 'State'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
return out[3:-1].decode()
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv6-only.network')
start_networkd()
self.wait_online(['veth-peer:carrier'])
# Note that at this point the DHCPv6 client has not been started because no RA (with managed
# bit set) has yet been received and the configuration does not include WithoutRA=true
state = get_dbus_dhcp6_client_state('veth99')
state = get_dhcp6_client_state('veth99')
print(f"State = {state}")
self.assertEqual(state, 'stopped')
start_dnsmasq()
self.wait_online(['veth99:routable', 'veth-peer:routable'])
state = get_dbus_dhcp6_client_state('veth99')
state = get_dhcp6_client_state('veth99')
print(f"State = {state}")
self.assertEqual(state, 'bound')
@ -5380,32 +5400,11 @@ class NetworkdDHCPClientTests(unittest.TestCase, Utilities):
self.teardown_nftset('addr4', 'network4', 'ifindex')
def test_dhcp_client_ipv4_dbus_status(self):
def get_dbus_dhcp4_client_state(IF):
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
'/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
'GetLinkByName', 's', IF])
assert out.startswith(b'io ')
out = out.strip()
assert out.endswith(b'"')
out = out.decode()
linkPath = out[:-1].split('"')[1]
print(f"Found {IF} link path: {linkPath}")
out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.network1',
linkPath, 'org.freedesktop.network1.DHCPv4Client', 'State'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
return out[3:-1].decode()
copy_network_unit('25-veth.netdev', '25-dhcp-server-veth-peer.network', '25-dhcp-client-ipv4-only.network')
start_networkd()
self.wait_online(['veth-peer:carrier'])
state = get_dbus_dhcp4_client_state('veth99')
state = get_dhcp4_client_state('veth99')
print(f"State = {state}")
self.assertEqual(state, 'rebooting')
@ -5416,7 +5415,7 @@ class NetworkdDHCPClientTests(unittest.TestCase, Utilities):
self.wait_online(['veth99:routable', 'veth-peer:routable'])
self.wait_address('veth99', r'inet 192.168.5.11[0-9]*/24', ipv='-4')
state = get_dbus_dhcp4_client_state('veth99')
state = get_dhcp4_client_state('veth99')
print(f"State = {state}")
self.assertEqual(state, 'bound')
@ -5868,30 +5867,8 @@ class NetworkdDHCPPDTests(unittest.TestCase, Utilities):
tear_down_common()
def test_dhcp6pd(self):
def get_dbus_dhcp6_prefix(IF):
# busctl call org.freedesktop.network1 /org/freedesktop/network1 org.freedesktop.network1.Manager GetLinkByName s IF
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
'/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
'GetLinkByName', 's', IF])
assert out.startswith(b'io ')
out = out.strip()
assert out.endswith(b'"')
out = out.decode()
linkPath = out[:-1].split('"')[1]
print(f"Found {IF} link path: {linkPath}")
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
linkPath, 'org.freedesktop.network1.Link', 'Describe'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
json_raw = out[2:].decode()
check_json(json_raw)
description = json.loads(json_raw) # Convert from escaped sequences to json
check_json(description)
description = json.loads(description) # Now parse the json
def get_dhcp6_prefix(link):
description = get_link_description(link)
self.assertIn('DHCPv6Client', description.keys())
self.assertIn('Prefixes', description['DHCPv6Client'])
@ -5919,7 +5896,7 @@ class NetworkdDHCPPDTests(unittest.TestCase, Utilities):
'veth97:routable', 'veth97-peer:routable', 'veth98:routable', 'veth98-peer:routable'])
# Check DBus assigned prefix information to veth99
prefixInfo = get_dbus_dhcp6_prefix('veth99')
prefixInfo = get_dhcp6_prefix('veth99')
self.assertEqual(len(prefixInfo), 1)
prefixInfo = prefixInfo[0]
@ -6302,29 +6279,8 @@ class NetworkdDHCPPDTests(unittest.TestCase, Utilities):
self.assertIn(f'via ::10.0.0.1 dev {tunnel_name}', output)
def test_dhcp4_6rd(self):
def get_dbus_dhcp_6rd_prefix(IF):
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
'/org/freedesktop/network1', 'org.freedesktop.network1.Manager',
'GetLinkByName', 's', IF])
assert out.startswith(b'io ')
out = out.strip()
assert out.endswith(b'"')
out = out.decode()
linkPath = out[:-1].split('"')[1]
print(f"Found {IF} link path: {linkPath}")
out = subprocess.check_output(['busctl', 'call', 'org.freedesktop.network1',
linkPath, 'org.freedesktop.network1.Link', 'Describe'])
assert out.startswith(b's "')
out = out.strip()
assert out.endswith(b'"')
json_raw = out[2:].decode()
check_json(json_raw)
description = json.loads(json_raw) # Convert from escaped sequences to json
check_json(description)
description = json.loads(description) # Now parse the json
def get_dhcp_6rd_prefix(link):
description = get_link_description(link)
self.assertIn('DHCPv4Client', description.keys())
self.assertIn('6rdPrefix', description['DHCPv4Client'].keys())
@ -6360,7 +6316,7 @@ class NetworkdDHCPPDTests(unittest.TestCase, Utilities):
'veth97:routable', 'veth97-peer:routable', 'veth98:routable', 'veth98-peer:routable'])
# Check the DBus interface for assigned prefix information
prefixInfo = get_dbus_dhcp_6rd_prefix('veth99')
prefixInfo = get_dhcp_6rd_prefix('veth99')
self.assertEqual(prefixInfo['Prefix'], [32,1,13,184,0,0,0,0,0,0,0,0,0,0,0,0]) # 2001:db8::
self.assertEqual(prefixInfo['PrefixLength'], 32)