2015-11-17 18:30:50 +01:00
#!/usr/bin/env python3
2017-11-18 17:32:46 +01:00
# SPDX-License-Identifier: LGPL-2.1+
2015-11-17 18:30:50 +01:00
#
# networkd integration test
# This uses temporary configuration in /run and temporary veth devices, and
# does not write anything on disk or change any system configuration;
# but it assumes (and checks at the beginning) that networkd is not currently
# running.
2016-04-26 12:16:43 +02:00
#
# This can be run on a normal installation, in QEMU, nspawn (with
# --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
# or LXC system containers. You need at least the "ip" tool from the iproute
# package; it is recommended to install dnsmasq too to get full test coverage.
#
2015-11-17 18:30:50 +01:00
# ATTENTION: This uses the *installed* networkd, not the one from the built
# source tree.
#
2018-06-12 17:15:23 +02:00
# © 2015 Canonical Ltd.
2015-11-17 18:30:50 +01:00
# Author: Martin Pitt <martin.pitt@ubuntu.com>
2016-11-03 16:27:55 -07:00
import errno
2015-11-17 18:30:50 +01:00
import os
2018-02-05 09:28:53 +03:00
import shutil
import socket
import subprocess
2015-11-17 18:30:50 +01:00
import sys
2018-02-05 09:28:53 +03:00
import tempfile
2015-11-17 18:30:50 +01:00
import time
import unittest
2016-11-03 16:27:55 -07:00
HAVE_DNSMASQ = shutil . which ( ' dnsmasq ' ) is not None
2019-02-27 23:15:31 +01:00
IS_CONTAINER = subprocess . call ( [ ' systemd-detect-virt ' , ' --quiet ' , ' --container ' ] ) == 0
2016-11-03 16:27:55 -07:00
NETWORK_UNITDIR = ' /run/systemd/network '
NETWORKD_WAIT_ONLINE = shutil . which ( ' systemd-networkd-wait-online ' ,
path = ' /usr/lib/systemd:/lib/systemd ' )
2015-11-17 18:30:50 +01:00
2016-06-30 15:44:22 +02:00
RESOLV_CONF = ' /run/systemd/resolve/resolv.conf '
2018-07-08 17:32:32 +02:00
tmpmounts = [ ]
running_units = [ ]
stopped_units = [ ]
2015-11-17 18:30:50 +01:00
2016-11-03 16:27:55 -07:00
def setUpModule ( ) :
2018-07-08 17:32:32 +02:00
global tmpmounts
2016-11-03 16:27:55 -07:00
""" Initialize the environment, and perform sanity checks on it. """
if NETWORKD_WAIT_ONLINE is None :
raise OSError ( errno . ENOENT , ' systemd-networkd-wait-online not found ' )
2018-07-08 17:32:32 +02:00
# Do not run any tests if the system is using networkd already and it's not virtualized
if ( subprocess . call ( [ ' systemctl ' , ' is-active ' , ' --quiet ' , ' systemd-networkd.service ' ] ) == 0 and
subprocess . call ( [ ' systemd-detect-virt ' , ' --quiet ' ] ) != 0 ) :
raise unittest . SkipTest ( ' not virtualized and networkd is already active ' )
2018-12-05 20:52:03 +01:00
2018-07-08 17:32:32 +02:00
# Ensure we don't mess with an existing networkd config
for u in [ ' systemd-networkd.socket ' , ' systemd-networkd ' , ' systemd-resolved ' ] :
if subprocess . call ( [ ' systemctl ' , ' is-active ' , ' --quiet ' , u ] ) == 0 :
subprocess . call ( [ ' systemctl ' , ' stop ' , u ] )
running_units . append ( u )
else :
stopped_units . append ( u )
2018-12-05 20:52:03 +01:00
# create static systemd-network user for networkd-test-router.service (it
# needs to do some stuff as root and can't start as user; but networkd
# still insists on the user)
2019-07-30 14:27:44 +02:00
if subprocess . call ( [ ' getent ' , ' passwd ' , ' systemd-network ' ] ) != 0 :
subprocess . call ( [ ' useradd ' , ' --system ' , ' --no-create-home ' , ' systemd-network ' ] )
2018-12-05 20:52:03 +01:00
2018-07-08 17:32:32 +02:00
for d in [ ' /etc/systemd/network ' , ' /run/systemd/network ' ,
' /run/systemd/netif ' , ' /run/systemd/resolve ' ] :
if os . path . isdir ( d ) :
subprocess . check_call ( [ " mount " , " -t " , " tmpfs " , " none " , d ] )
tmpmounts . append ( d )
if os . path . isdir ( ' /run/systemd/resolve ' ) :
os . chmod ( ' /run/systemd/resolve ' , 0o755 )
2018-09-19 10:04:33 +02:00
shutil . chown ( ' /run/systemd/resolve ' , ' systemd-resolve ' , ' systemd-resolve ' )
2018-12-05 20:52:03 +01:00
if os . path . isdir ( ' /run/systemd/netif ' ) :
os . chmod ( ' /run/systemd/netif ' , 0o755 )
shutil . chown ( ' /run/systemd/netif ' , ' systemd-network ' , ' systemd-network ' )
2016-11-03 16:27:55 -07:00
# Avoid "Failed to open /dev/tty" errors in containers.
os . environ [ ' SYSTEMD_LOG_TARGET ' ] = ' journal '
# Ensure the unit directory exists so tests can dump files into it.
os . makedirs ( NETWORK_UNITDIR , exist_ok = True )
2018-07-08 17:32:32 +02:00
def tearDownModule ( ) :
global tmpmounts
for d in tmpmounts :
subprocess . check_call ( [ " umount " , d ] )
for u in stopped_units :
subprocess . call ( [ " systemctl " , " stop " , u ] )
for u in running_units :
subprocess . call ( [ " systemctl " , " restart " , u ] )
2016-11-03 16:27:55 -07:00
class NetworkdTestingUtilities :
""" Provide a set of utility functions to facilitate networkd tests.
This class must be inherited along with unittest . TestCase to define
some required methods .
"""
2016-12-07 10:12:10 -08:00
def add_veth_pair ( self , veth , peer , veth_options = ( ) , peer_options = ( ) ) :
""" Add a veth interface pair, and queue them to be removed. """
subprocess . check_call ( [ ' ip ' , ' link ' , ' add ' , ' name ' , veth ] +
list ( veth_options ) +
[ ' type ' , ' veth ' , ' peer ' , ' name ' , peer ] +
list ( peer_options ) )
self . addCleanup ( subprocess . call , [ ' ip ' , ' link ' , ' del ' , ' dev ' , peer ] )
2018-12-05 20:53:51 +01:00
def write_config ( self , path , contents ) :
""" " Write a configuration file, and queue it to be removed. """
with open ( path , ' w ' ) as f :
f . write ( contents )
self . addCleanup ( os . remove , path )
2016-11-03 16:27:55 -07:00
def write_network ( self , unit_name , contents ) :
""" Write a network unit file, and queue it to be removed. """
2018-12-05 20:53:51 +01:00
self . write_config ( os . path . join ( NETWORK_UNITDIR , unit_name ) , contents )
2016-11-03 16:27:55 -07:00
def write_network_dropin ( self , unit_name , dropin_name , contents ) :
""" Write a network unit drop-in, and queue it to be removed. """
2018-02-05 09:28:53 +03:00
dropin_dir = os . path . join ( NETWORK_UNITDIR , " {} .d " . format ( unit_name ) )
dropin_path = os . path . join ( dropin_dir , " {} .conf " . format ( dropin_name ) )
2016-11-03 16:27:55 -07:00
os . makedirs ( dropin_dir , exist_ok = True )
2017-04-11 22:17:31 +01:00
self . addCleanup ( os . rmdir , dropin_dir )
2016-11-03 16:27:55 -07:00
with open ( dropin_path , ' w ' ) as dropin :
dropin . write ( contents )
self . addCleanup ( os . remove , dropin_path )
2017-04-11 22:17:31 +01:00
def read_attr ( self , link , attribute ) :
""" Read a link attributed from the sysfs. """
# Note we we don't want to check if interface `link' is managed, we
# want to evaluate link variable and pass the value of the link to
# assert_link_states e.g. eth0=managed.
self . assert_link_states ( * * { link : ' managed ' } )
with open ( os . path . join ( ' /sys/class/net ' , link , attribute ) ) as f :
return f . readline ( ) . strip ( )
2016-09-27 15:18:14 -07:00
def assert_link_states ( self , * * kwargs ) :
""" Match networkctl link states to the given ones.
Each keyword argument should be the name of a network interface
with its expected value of the " SETUP " column in output from
networkctl . The interfaces have five seconds to come online
before the check is performed . Every specified interface must
be present in the output , and any other interfaces found in the
output are ignored .
A special interface state " managed " is supported , which matches
any value in the " SETUP " column other than " unmanaged " .
"""
if not kwargs :
return
interfaces = set ( kwargs )
# Wait for the requested interfaces, but don't fail for them.
subprocess . call ( [ NETWORKD_WAIT_ONLINE , ' --timeout=5 ' ] +
2018-02-05 09:28:53 +03:00
[ ' --interface= {} ' . format ( iface ) for iface in kwargs ] )
2016-09-27 15:18:14 -07:00
# Validate each link state found in the networkctl output.
out = subprocess . check_output ( [ ' networkctl ' , ' --no-legend ' ] ) . rstrip ( )
for line in out . decode ( ' utf-8 ' ) . split ( ' \n ' ) :
fields = line . split ( )
if len ( fields ) > = 5 and fields [ 1 ] in kwargs :
iface = fields [ 1 ]
expected = kwargs [ iface ]
actual = fields [ - 1 ]
if ( actual != expected and
not ( expected == ' managed ' and actual != ' unmanaged ' ) ) :
2018-02-05 09:28:53 +03:00
self . fail ( " Link {} expects state {} , found {} " . format ( iface , expected , actual ) )
2016-09-27 15:18:14 -07:00
interfaces . remove ( iface )
# Ensure that all requested interfaces have been covered.
if interfaces :
2018-02-05 09:28:53 +03:00
self . fail ( " Missing links in status output: {} " . format ( interfaces ) )
2016-09-27 15:18:14 -07:00
2016-11-03 16:27:55 -07:00
2017-04-11 22:17:31 +01:00
class BridgeTest ( NetworkdTestingUtilities , unittest . TestCase ) :
""" Provide common methods for testing networkd against servers. """
def setUp ( self ) :
self . write_network ( ' port1.netdev ' , ''' \
[ NetDev ]
Name = port1
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
self . write_network ( ' port2.netdev ' , ''' \
[ NetDev ]
Name = port2
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bd ''' )
self . write_network ( ' mybridge.netdev ' , ''' \
[ NetDev ]
Name = mybridge
Kind = bridge ''' )
self . write_network ( ' port1.network ' , ''' \
[ Match ]
Name = port1
[ Network ]
Bridge = mybridge ''' )
self . write_network ( ' port2.network ' , ''' \
[ Match ]
Name = port2
[ Network ]
Bridge = mybridge ''' )
self . write_network ( ' mybridge.network ' , ''' \
[ Match ]
Name = mybridge
[ Network ]
DNS = 192.168 .250 .1
Address = 192.168 .250 .33 / 24
Gateway = 192.168 .250 .1 ''' )
2018-07-08 19:49:21 +02:00
subprocess . call ( [ ' systemctl ' , ' reset-failed ' , ' systemd-networkd ' , ' systemd-resolved ' ] )
2017-04-11 22:17:31 +01:00
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
def tearDown ( self ) :
subprocess . check_call ( [ ' systemctl ' , ' stop ' , ' systemd-networkd ' ] )
subprocess . check_call ( [ ' ip ' , ' link ' , ' del ' , ' mybridge ' ] )
subprocess . check_call ( [ ' ip ' , ' link ' , ' del ' , ' port1 ' ] )
subprocess . check_call ( [ ' ip ' , ' link ' , ' del ' , ' port2 ' ] )
def test_bridge_init ( self ) :
self . assert_link_states (
port1 = ' managed ' ,
port2 = ' managed ' ,
mybridge = ' managed ' )
def test_bridge_port_priority ( self ) :
self . assertEqual ( self . read_attr ( ' port1 ' , ' brport/priority ' ) , ' 32 ' )
self . write_network_dropin ( ' port1.network ' , ' priority ' , ''' \
[ Bridge ]
Priority = 28
''' )
subprocess . check_call ( [ ' systemctl ' , ' restart ' , ' systemd-networkd ' ] )
self . assertEqual ( self . read_attr ( ' port1 ' , ' brport/priority ' ) , ' 28 ' )
def test_bridge_port_priority_set_zero ( self ) :
""" It should be possible to set the bridge port priority to 0 """
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/priority ' ) , ' 32 ' )
self . write_network_dropin ( ' port2.network ' , ' priority ' , ''' \
[ Bridge ]
Priority = 0
''' )
subprocess . check_call ( [ ' systemctl ' , ' restart ' , ' systemd-networkd ' ] )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/priority ' ) , ' 0 ' )
2018-04-29 09:32:22 +05:30
def test_bridge_port_property ( self ) :
""" Test the " [Bridge] " section keys """
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/priority ' ) , ' 32 ' )
self . write_network_dropin ( ' port2.network ' , ' property ' , ''' \
[ Bridge ]
UnicastFlood = true
HairPin = true
UseBPDU = true
FastLeave = true
AllowPortToBeRoot = true
Cost = 555
Priority = 23
''' )
subprocess . check_call ( [ ' systemctl ' , ' restart ' , ' systemd-networkd ' ] )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/priority ' ) , ' 23 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/hairpin_mode ' ) , ' 1 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/path_cost ' ) , ' 555 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/multicast_fast_leave ' ) , ' 1 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/unicast_flood ' ) , ' 1 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/bpdu_guard ' ) , ' 1 ' )
self . assertEqual ( self . read_attr ( ' port2 ' , ' brport/root_block ' ) , ' 1 ' )
2016-11-03 16:27:55 -07:00
class ClientTestBase ( NetworkdTestingUtilities ) :
""" Provide common methods for testing networkd against servers. """
2016-11-28 12:35:49 +01:00
@classmethod
def setUpClass ( klass ) :
klass . orig_log_level = subprocess . check_output (
[ ' systemctl ' , ' show ' , ' --value ' , ' --property ' , ' LogLevel ' ] ,
universal_newlines = True ) . strip ( )
2019-04-03 08:19:08 +02:00
subprocess . check_call ( [ ' systemd-analyze ' , ' log-level ' , ' debug ' ] )
2016-11-28 12:35:49 +01:00
@classmethod
def tearDownClass ( klass ) :
2019-04-03 08:19:08 +02:00
subprocess . check_call ( [ ' systemd-analyze ' , ' log-level ' , klass . orig_log_level ] )
2016-11-28 12:35:49 +01:00
2015-11-17 18:30:50 +01:00
def setUp ( self ) :
self . iface = ' test_eth42 '
self . if_router = ' router_eth42 '
self . workdir_obj = tempfile . TemporaryDirectory ( )
self . workdir = self . workdir_obj . name
2016-11-03 16:27:55 -07:00
self . config = ' test_eth42.network '
2015-11-17 18:30:50 +01:00
# get current journal cursor
2016-11-28 12:35:49 +01:00
subprocess . check_output ( [ ' journalctl ' , ' --sync ' ] )
2015-11-17 18:30:50 +01:00
out = subprocess . check_output ( [ ' journalctl ' , ' -b ' , ' --quiet ' ,
' --no-pager ' , ' -n0 ' , ' --show-cursor ' ] ,
universal_newlines = True )
self . assertTrue ( out . startswith ( ' -- cursor: ' ) )
self . journal_cursor = out . split ( ) [ - 1 ]
2018-07-08 19:49:21 +02:00
subprocess . call ( [ ' systemctl ' , ' reset-failed ' , ' systemd-networkd ' , ' systemd-resolved ' ] )
2018-07-02 22:26:31 +02:00
2015-11-17 18:30:50 +01:00
def tearDown ( self ) :
self . shutdown_iface ( )
subprocess . call ( [ ' systemctl ' , ' stop ' , ' systemd-networkd ' ] )
2016-11-20 10:08:23 +01:00
subprocess . call ( [ ' ip ' , ' link ' , ' del ' , ' dummy0 ' ] ,
stderr = subprocess . DEVNULL )
2015-11-17 18:30:50 +01:00
def show_journal ( self , unit ) :
''' Show journal of given unit since start of the test '''
2018-02-05 09:28:53 +03:00
print ( ' ---- {} ---- ' . format ( unit ) )
2016-11-28 12:35:49 +01:00
subprocess . check_output ( [ ' journalctl ' , ' --sync ' ] )
2015-11-17 18:30:50 +01:00
sys . stdout . flush ( )
subprocess . call ( [ ' journalctl ' , ' -b ' , ' --no-pager ' , ' --quiet ' ,
' --cursor ' , self . journal_cursor , ' -u ' , unit ] )
def create_iface ( self , ipv6 = False ) :
''' Create test interface with DHCP server behind it '''
raise NotImplementedError ( ' must be implemented by a subclass ' )
def shutdown_iface ( self ) :
''' Remove test interface and stop DHCP server '''
raise NotImplementedError ( ' must be implemented by a subclass ' )
def print_server_log ( self ) :
''' Print DHCP server log for debugging failures '''
raise NotImplementedError ( ' must be implemented by a subclass ' )
2019-02-26 23:03:35 +01:00
def start_unit ( self , unit ) :
2017-02-10 03:30:44 +01:00
try :
2019-02-26 23:03:35 +01:00
subprocess . check_call ( [ ' systemctl ' , ' start ' , unit ] )
2017-02-10 03:30:44 +01:00
except subprocess . CalledProcessError :
2019-02-26 23:03:35 +01:00
self . show_journal ( unit )
2017-02-10 03:30:44 +01:00
raise
2019-02-26 23:03:35 +01:00
def do_test ( self , coldplug = True , ipv6 = False , extra_opts = ' ' ,
online_timeout = 10 , dhcp_mode = ' yes ' ) :
self . start_unit ( ' systemd-resolved ' )
2016-11-03 16:27:55 -07:00
self . write_network ( self . config , ''' \
2016-09-14 06:52:40 -04:00
[ Match ]
2018-02-05 09:28:53 +03:00
Name = { }
2015-11-17 18:30:50 +01:00
[ Network ]
2018-02-05 09:28:53 +03:00
DHCP = { }
{ } ''' .format(self.iface, dhcp_mode, extra_opts))
2015-11-17 18:30:50 +01:00
if coldplug :
# create interface first, then start networkd
self . create_iface ( ipv6 = ipv6 )
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2016-11-18 16:17:01 +01:00
elif coldplug is not None :
2015-11-17 18:30:50 +01:00
# start networkd first, then create interface
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2015-11-17 18:30:50 +01:00
self . create_iface ( ipv6 = ipv6 )
2016-11-18 16:17:01 +01:00
else :
# "None" means test sets up interface by itself
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2015-11-17 18:30:50 +01:00
try :
2016-11-03 16:27:55 -07:00
subprocess . check_call ( [ NETWORKD_WAIT_ONLINE , ' --interface ' ,
2015-11-17 18:30:50 +01:00
self . iface , ' --timeout= %i ' % online_timeout ] )
if ipv6 :
# check iface state and IP 6 address; FIXME: we need to wait a bit
# longer, as the iface is "configured" already with IPv4 *or*
# IPv6, but we want to wait for both
2016-12-01 17:30:31 -05:00
for _ in range ( 10 ) :
2015-11-17 18:30:50 +01:00
out = subprocess . check_output ( [ ' ip ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
if b ' state UP ' in out and b ' inet6 2600 ' in out and b ' inet 192.168 ' in out :
break
time . sleep ( 1 )
else :
self . fail ( ' timed out waiting for IPv6 configuration ' )
self . assertRegex ( out , b ' inet6 2600::.* scope global .*dynamic ' )
self . assertRegex ( out , b ' inet6 fe80::.* scope link ' )
else :
# should have link-local address on IPv6 only
out = subprocess . check_output ( [ ' ip ' , ' -6 ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
2016-12-01 18:29:54 -05:00
self . assertRegex ( out , br ' inet6 fe80::.* scope link ' )
2015-11-17 18:30:50 +01:00
self . assertNotIn ( b ' scope global ' , out )
# should have IPv4 address
out = subprocess . check_output ( [ ' ip ' , ' -4 ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
self . assertIn ( b ' state UP ' , out )
2016-12-01 18:29:54 -05:00
self . assertRegex ( out , br ' inet 192.168.5. \ d+/.* scope global dynamic ' )
2015-11-17 18:30:50 +01:00
# check networkctl state
out = subprocess . check_output ( [ ' networkctl ' ] )
2018-02-05 09:28:53 +03:00
self . assertRegex ( out , ( r ' {} \ s+ether \ s+[a-z-]+ \ s+unmanaged ' . format ( self . if_router ) ) . encode ( ) )
self . assertRegex ( out , ( r ' {} \ s+ether \ s+routable \ s+configured ' . format ( self . iface ) ) . encode ( ) )
2015-11-17 18:30:50 +01:00
out = subprocess . check_output ( [ ' networkctl ' , ' status ' , self . iface ] )
2016-12-01 18:29:54 -05:00
self . assertRegex ( out , br ' Type: \ s+ether ' )
self . assertRegex ( out , br ' State: \ s+routable.*configured ' )
self . assertRegex ( out , br ' Address: \ s+192.168.5. \ d+ ' )
2015-11-17 18:30:50 +01:00
if ipv6 :
2016-12-01 18:29:54 -05:00
self . assertRegex ( out , br ' 2600:: ' )
2015-11-17 18:30:50 +01:00
else :
2016-12-01 18:29:54 -05:00
self . assertNotIn ( br ' 2600:: ' , out )
self . assertRegex ( out , br ' fe80:: ' )
self . assertRegex ( out , br ' Gateway: \ s+192.168.5.1 ' )
self . assertRegex ( out , br ' DNS: \ s+192.168.5.1 ' )
2015-11-17 18:30:50 +01:00
except ( AssertionError , subprocess . CalledProcessError ) :
# show networkd status, journal, and DHCP server log on failure
2016-11-03 16:27:55 -07:00
with open ( os . path . join ( NETWORK_UNITDIR , self . config ) ) as f :
2018-02-05 09:28:53 +03:00
print ( ' \n ---- {} ---- \n {} ' . format ( self . config , f . read ( ) ) )
2015-11-17 18:30:50 +01:00
print ( ' ---- interface status ---- ' )
sys . stdout . flush ( )
subprocess . call ( [ ' ip ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
2018-02-05 09:28:53 +03:00
print ( ' ---- networkctl status {} ---- ' . format ( self . iface ) )
2015-11-17 18:30:50 +01:00
sys . stdout . flush ( )
2019-07-30 20:27:34 +02:00
rc = subprocess . call ( [ ' networkctl ' , ' status ' , self . iface ] )
if rc != 0 :
print ( " ' networkctl status ' exited with an unexpected code {} " . format ( rc ) )
2015-11-17 18:30:50 +01:00
self . show_journal ( ' systemd-networkd.service ' )
self . print_server_log ( )
raise
2016-06-30 15:44:22 +02:00
for timeout in range ( 50 ) :
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
if ' nameserver 192.168.5.1 \n ' in contents :
break
time . sleep ( 0.1 )
else :
self . fail ( ' nameserver 192.168.5.1 not found in ' + RESOLV_CONF )
2015-11-17 18:30:50 +01:00
2016-11-18 16:17:01 +01:00
if coldplug is False :
2015-11-17 18:30:50 +01:00
# check post-down.d hook
self . shutdown_iface ( )
def test_coldplug_dhcp_yes_ip4 ( self ) :
# we have a 12s timeout on RA, so we need to wait longer
self . do_test ( coldplug = True , ipv6 = False , online_timeout = 15 )
def test_coldplug_dhcp_yes_ip4_no_ra ( self ) :
# with disabling RA explicitly things should be fast
self . do_test ( coldplug = True , ipv6 = False ,
2016-06-07 11:19:26 +02:00
extra_opts = ' IPv6AcceptRA=False ' )
2015-11-17 18:30:50 +01:00
def test_coldplug_dhcp_ip4_only ( self ) :
# we have a 12s timeout on RA, so we need to wait longer
self . do_test ( coldplug = True , ipv6 = False , dhcp_mode = ' ipv4 ' ,
online_timeout = 15 )
def test_coldplug_dhcp_ip4_only_no_ra ( self ) :
# with disabling RA explicitly things should be fast
self . do_test ( coldplug = True , ipv6 = False , dhcp_mode = ' ipv4 ' ,
2016-06-07 11:19:26 +02:00
extra_opts = ' IPv6AcceptRA=False ' )
2015-11-17 18:30:50 +01:00
def test_coldplug_dhcp_ip6 ( self ) :
self . do_test ( coldplug = True , ipv6 = True )
def test_hotplug_dhcp_ip4 ( self ) :
# With IPv4 only we have a 12s timeout on RA, so we need to wait longer
self . do_test ( coldplug = False , ipv6 = False , online_timeout = 15 )
def test_hotplug_dhcp_ip6 ( self ) :
self . do_test ( coldplug = False , ipv6 = True )
2016-06-28 18:18:27 +02:00
def test_route_only_dns ( self ) :
2016-11-03 16:27:55 -07:00
self . write_network ( ' myvpn.netdev ' , ''' \
2016-09-14 06:52:40 -04:00
[ NetDev ]
2016-06-28 18:18:27 +02:00
Name = dummy0
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
2016-11-03 16:27:55 -07:00
self . write_network ( ' myvpn.network ' , ''' \
2016-09-14 06:52:40 -04:00
[ Match ]
2016-06-28 18:18:27 +02:00
Name = dummy0
[ Network ]
2019-02-26 23:05:05 +01:00
Address = 192.168 .42 .100 / 24
2016-06-28 18:18:27 +02:00
DNS = 192.168 .42 .1
Domains = ~ company ''' )
2019-02-27 23:15:31 +01:00
try :
self . do_test ( coldplug = True , ipv6 = False ,
extra_opts = ' IPv6AcceptRouterAdvertisements=False ' )
except subprocess . CalledProcessError as e :
# networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
if IS_CONTAINER and e . cmd == [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] :
raise unittest . SkipTest ( ' https://github.com/systemd/systemd/issues/11848 ' )
else :
raise
2016-06-28 18:18:27 +02:00
2016-06-30 15:44:22 +02:00
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
2016-06-28 18:18:27 +02:00
# ~company is not a search domain, only a routing domain
self . assertNotRegex ( contents , ' search.*company ' )
2016-06-30 15:44:22 +02:00
# our global server should appear
self . assertIn ( ' nameserver 192.168.5.1 \n ' , contents )
2016-09-30 09:30:08 +02:00
# should not have domain-restricted server as global server
self . assertNotIn ( ' nameserver 192.168.42.1 \n ' , contents )
def test_route_only_dns_all_domains ( self ) :
2016-11-03 16:27:55 -07:00
self . write_network ( ' myvpn.netdev ' , ''' [NetDev]
2016-09-30 09:30:08 +02:00
Name = dummy0
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
2016-11-03 16:27:55 -07:00
self . write_network ( ' myvpn.network ' , ''' [Match]
2016-09-30 09:30:08 +02:00
Name = dummy0
[ Network ]
2019-02-26 23:05:05 +01:00
Address = 192.168 .42 .100 / 24
2016-09-30 09:30:08 +02:00
DNS = 192.168 .42 .1
Domains = ~ company ~ . ''' )
2019-02-27 23:15:31 +01:00
try :
self . do_test ( coldplug = True , ipv6 = False ,
extra_opts = ' IPv6AcceptRouterAdvertisements=False ' )
except subprocess . CalledProcessError as e :
# networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
if IS_CONTAINER and e . cmd == [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] :
raise unittest . SkipTest ( ' https://github.com/systemd/systemd/issues/11848 ' )
else :
raise
2016-09-30 09:30:08 +02:00
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
# ~company is not a search domain, only a routing domain
self . assertNotRegex ( contents , ' search.*company ' )
# our global server should appear
self . assertIn ( ' nameserver 192.168.5.1 \n ' , contents )
# should have company server as global server due to ~.
self . assertIn ( ' nameserver 192.168.42.1 \n ' , contents )
2016-06-28 18:18:27 +02:00
2015-11-17 18:30:50 +01:00
2016-11-03 16:27:55 -07:00
@unittest.skipUnless ( HAVE_DNSMASQ , ' dnsmasq not installed ' )
2015-11-17 18:30:50 +01:00
class DnsmasqClientTest ( ClientTestBase , unittest . TestCase ) :
''' Test networkd client against dnsmasq '''
def setUp ( self ) :
super ( ) . setUp ( )
self . dnsmasq = None
2016-11-18 16:17:01 +01:00
self . iface_mac = ' de:ad:be:ef:47:11 '
2015-11-17 18:30:50 +01:00
2016-09-30 09:30:08 +02:00
def create_iface ( self , ipv6 = False , dnsmasq_opts = None ) :
2015-11-17 18:30:50 +01:00
''' Create test interface with DHCP server behind it '''
# add veth pair
2016-11-18 16:17:01 +01:00
subprocess . check_call ( [ ' ip ' , ' link ' , ' add ' , ' name ' , self . iface ,
' address ' , self . iface_mac ,
' type ' , ' veth ' , ' peer ' , ' name ' , self . if_router ] )
2015-11-17 18:30:50 +01:00
# give our router an IP
subprocess . check_call ( [ ' ip ' , ' a ' , ' flush ' , ' dev ' , self . if_router ] )
subprocess . check_call ( [ ' ip ' , ' a ' , ' add ' , ' 192.168.5.1/24 ' , ' dev ' , self . if_router ] )
if ipv6 :
subprocess . check_call ( [ ' ip ' , ' a ' , ' add ' , ' 2600::1/64 ' , ' dev ' , self . if_router ] )
subprocess . check_call ( [ ' ip ' , ' link ' , ' set ' , self . if_router , ' up ' ] )
# add DHCP server
self . dnsmasq_log = os . path . join ( self . workdir , ' dnsmasq.log ' )
lease_file = os . path . join ( self . workdir , ' dnsmasq.leases ' )
if ipv6 :
extra_opts = [ ' --enable-ra ' , ' --dhcp-range=2600::10,2600::20 ' ]
else :
extra_opts = [ ]
2016-09-30 09:30:08 +02:00
if dnsmasq_opts :
extra_opts + = dnsmasq_opts
2015-11-17 18:30:50 +01:00
self . dnsmasq = subprocess . Popen (
[ ' dnsmasq ' , ' --keep-in-foreground ' , ' --log-queries ' ,
' --log-facility= ' + self . dnsmasq_log , ' --conf-file=/dev/null ' ,
' --dhcp-leasefile= ' + lease_file , ' --bind-interfaces ' ,
' --interface= ' + self . if_router , ' --except-interface=lo ' ,
' --dhcp-range=192.168.5.10,192.168.5.200 ' ] + extra_opts )
def shutdown_iface ( self ) :
''' Remove test interface and stop DHCP server '''
if self . if_router :
subprocess . check_call ( [ ' ip ' , ' link ' , ' del ' , ' dev ' , self . if_router ] )
self . if_router = None
if self . dnsmasq :
self . dnsmasq . kill ( )
self . dnsmasq . wait ( )
self . dnsmasq = None
def print_server_log ( self ) :
''' Print DHCP server log for debugging failures '''
with open ( self . dnsmasq_log ) as f :
2018-02-05 09:28:53 +03:00
sys . stdout . write ( ' \n \n ---- dnsmasq log ---- \n {} \n ------ \n \n ' . format ( f . read ( ) ) )
2015-11-17 18:30:50 +01:00
2016-09-30 09:30:08 +02:00
def test_resolved_domain_restricted_dns ( self ) :
''' resolved: domain-restricted DNS servers '''
2019-02-21 12:24:16 +01:00
# FIXME: resolvectl query fails with enabled DNSSEC against our dnsmasq
conf = ' /run/systemd/resolved.conf.d/test-disable-dnssec.conf '
os . makedirs ( os . path . dirname ( conf ) , exist_ok = True )
with open ( conf , ' w ' ) as f :
f . write ( ' [Resolve] \n DNSSEC=no \n ' )
self . addCleanup ( os . remove , conf )
2016-09-30 09:30:08 +02:00
# create interface for generic connections; this will map all DNS names
# to 192.168.42.1
self . create_iface ( dnsmasq_opts = [ ' --address=/#/192.168.42.1 ' ] )
2016-11-03 16:27:55 -07:00
self . write_network ( ' general.network ' , ''' \
2016-09-30 09:30:08 +02:00
[ Match ]
2018-02-05 09:28:53 +03:00
Name = { }
2016-09-30 09:30:08 +02:00
[ Network ]
DHCP = ipv4
2018-02-05 09:28:53 +03:00
IPv6AcceptRA = False ''' .format(self.iface))
2016-09-30 09:30:08 +02:00
# create second device/dnsmasq for a .company/.lab VPN interface
# static IPs for simplicity
2016-12-07 10:12:10 -08:00
self . add_veth_pair ( ' testvpnclient ' , ' testvpnrouter ' )
2016-09-30 09:30:08 +02:00
subprocess . check_call ( [ ' ip ' , ' a ' , ' flush ' , ' dev ' , ' testvpnrouter ' ] )
subprocess . check_call ( [ ' ip ' , ' a ' , ' add ' , ' 10.241.3.1/24 ' , ' dev ' , ' testvpnrouter ' ] )
subprocess . check_call ( [ ' ip ' , ' link ' , ' set ' , ' testvpnrouter ' , ' up ' ] )
vpn_dnsmasq_log = os . path . join ( self . workdir , ' dnsmasq-vpn.log ' )
vpn_dnsmasq = subprocess . Popen (
[ ' dnsmasq ' , ' --keep-in-foreground ' , ' --log-queries ' ,
' --log-facility= ' + vpn_dnsmasq_log , ' --conf-file=/dev/null ' ,
' --dhcp-leasefile=/dev/null ' , ' --bind-interfaces ' ,
' --interface=testvpnrouter ' , ' --except-interface=lo ' ,
' --address=/math.lab/10.241.3.3 ' , ' --address=/cantina.company/10.241.4.4 ' ] )
self . addCleanup ( vpn_dnsmasq . wait )
self . addCleanup ( vpn_dnsmasq . kill )
2016-11-03 16:27:55 -07:00
self . write_network ( ' vpn.network ' , ''' \
2016-09-30 09:30:08 +02:00
[ Match ]
Name = testvpnclient
[ Network ]
IPv6AcceptRA = False
Address = 10.241 .3 .2 / 24
DNS = 10.241 .3 .1
Domains = ~ company ~ lab ''' )
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2016-11-03 16:27:55 -07:00
subprocess . check_call ( [ NETWORKD_WAIT_ONLINE , ' --interface ' , self . iface ,
2016-09-30 09:30:08 +02:00
' --interface=testvpnclient ' , ' --timeout=20 ' ] )
# ensure we start fresh with every test
subprocess . check_call ( [ ' systemctl ' , ' restart ' , ' systemd-resolved ' ] )
# test vpnclient specific domains; these should *not* be answered by
# the general DNS
2018-10-23 00:29:03 +02:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' math.lab ' ] )
2016-09-30 09:30:08 +02:00
self . assertIn ( b ' math.lab: 10.241.3.3 ' , out )
2018-10-23 00:29:03 +02:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' kettle.cantina.company ' ] )
2016-09-30 09:30:08 +02:00
self . assertIn ( b ' kettle.cantina.company: 10.241.4.4 ' , out )
# test general domains
2018-10-23 00:29:03 +02:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' megasearch.net ' ] )
2016-09-30 09:30:08 +02:00
self . assertIn ( b ' megasearch.net: 192.168.42.1 ' , out )
with open ( self . dnsmasq_log ) as f :
general_log = f . read ( )
with open ( vpn_dnsmasq_log ) as f :
vpn_log = f . read ( )
# VPN domains should only be sent to VPN DNS
self . assertRegex ( vpn_log , ' query.*math.lab ' )
self . assertRegex ( vpn_log , ' query.*cantina.company ' )
2016-12-26 12:19:25 +01:00
self . assertNotIn ( ' .lab ' , general_log )
self . assertNotIn ( ' .company ' , general_log )
2016-09-30 09:30:08 +02:00
# general domains should not be sent to the VPN DNS
self . assertRegex ( general_log , ' query.*megasearch.net ' )
self . assertNotIn ( ' megasearch.net ' , vpn_log )
2016-12-22 07:58:02 +01:00
def test_resolved_etc_hosts ( self ) :
''' resolved queries to /etc/hosts '''
# FIXME: -t MX query fails with enabled DNSSEC (even when using
2019-02-21 12:26:44 +01:00
# the known negative trust anchor .internal instead of .example.com)
2016-12-22 07:58:02 +01:00
conf = ' /run/systemd/resolved.conf.d/test-disable-dnssec.conf '
os . makedirs ( os . path . dirname ( conf ) , exist_ok = True )
with open ( conf , ' w ' ) as f :
2018-12-03 22:27:19 +01:00
f . write ( ' [Resolve] \n DNSSEC=no \n LLMNR=no \n MulticastDNS=no \n ' )
2016-12-22 07:58:02 +01:00
self . addCleanup ( os . remove , conf )
2019-02-21 12:26:44 +01:00
# create /etc/hosts bind mount which resolves my.example.com for IPv4
2016-12-22 07:58:02 +01:00
hosts = os . path . join ( self . workdir , ' hosts ' )
with open ( hosts , ' w ' ) as f :
2019-02-21 12:26:44 +01:00
f . write ( ' 172.16.99.99 my.example.com \n ' )
2016-12-22 07:58:02 +01:00
subprocess . check_call ( [ ' mount ' , ' --bind ' , hosts , ' /etc/hosts ' ] )
self . addCleanup ( subprocess . call , [ ' umount ' , ' /etc/hosts ' ] )
subprocess . check_call ( [ ' systemctl ' , ' stop ' , ' systemd-resolved.service ' ] )
# note: different IPv4 address here, so that it's easy to tell apart
# what resolved the query
2019-02-21 12:26:44 +01:00
self . create_iface ( dnsmasq_opts = [ ' --host-record=my.example.com,172.16.99.1,2600::99:99 ' ,
' --host-record=other.example.com,172.16.0.42,2600::42 ' ,
' --mx-host=example.com,mail.example.com ' ] ,
2016-12-22 07:58:02 +01:00
ipv6 = True )
self . do_test ( coldplug = None , ipv6 = True )
try :
# family specific queries
2019-02-21 12:26:44 +01:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' -4 ' , ' my.example.com ' ] )
self . assertIn ( b ' my.example.com: 172.16.99.99 ' , out )
2016-12-22 07:58:02 +01:00
# we don't expect an IPv6 answer; if /etc/hosts has any IP address,
# it's considered a sufficient source
2019-02-21 12:26:44 +01:00
self . assertNotEqual ( subprocess . call ( [ ' resolvectl ' , ' query ' , ' -6 ' , ' my.example.com ' ] ) , 0 )
2016-12-22 07:58:02 +01:00
# "any family" query; IPv4 should come from /etc/hosts
2019-02-21 12:26:44 +01:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' my.example.com ' ] )
self . assertIn ( b ' my.example.com: 172.16.99.99 ' , out )
2016-12-22 07:58:02 +01:00
# IP → name lookup; again, takes the /etc/hosts one
2018-10-23 00:29:03 +02:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' 172.16.99.99 ' ] )
2019-02-21 12:26:44 +01:00
self . assertIn ( b ' 172.16.99.99: my.example.com ' , out )
2016-12-22 07:58:02 +01:00
# non-address RRs should fall back to DNS
2019-02-21 12:26:44 +01:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' --type=MX ' , ' example.com ' ] )
self . assertIn ( b ' example.com IN MX 1 mail.example.com ' , out )
2016-12-22 07:58:02 +01:00
# other domains query DNS
2019-02-21 12:26:44 +01:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' other.example.com ' ] )
2016-12-22 07:58:02 +01:00
self . assertIn ( b ' 172.16.0.42 ' , out )
2018-10-23 00:29:03 +02:00
out = subprocess . check_output ( [ ' resolvectl ' , ' query ' , ' 172.16.0.42 ' ] )
2019-02-21 12:26:44 +01:00
self . assertIn ( b ' 172.16.0.42: other.example.com ' , out )
2016-12-22 07:58:02 +01:00
except ( AssertionError , subprocess . CalledProcessError ) :
self . show_journal ( ' systemd-resolved.service ' )
self . print_server_log ( )
raise
2016-11-18 16:17:01 +01:00
def test_transient_hostname ( self ) :
''' networkd sets transient hostname from DHCP '''
2016-11-21 12:53:24 +01:00
orig_hostname = socket . gethostname ( )
self . addCleanup ( socket . sethostname , orig_hostname )
# temporarily move /etc/hostname away; restart hostnamed to pick it up
if os . path . exists ( ' /etc/hostname ' ) :
subprocess . check_call ( [ ' mount ' , ' --bind ' , ' /dev/null ' , ' /etc/hostname ' ] )
self . addCleanup ( subprocess . call , [ ' umount ' , ' /etc/hostname ' ] )
subprocess . check_call ( [ ' systemctl ' , ' stop ' , ' systemd-hostnamed.service ' ] )
2018-12-05 21:59:38 +01:00
self . addCleanup ( subprocess . call , [ ' systemctl ' , ' stop ' , ' systemd-hostnamed.service ' ] )
2016-11-21 12:53:24 +01:00
2018-02-05 09:28:53 +03:00
self . create_iface ( dnsmasq_opts = [ ' --dhcp-host= {} ,192.168.5.210,testgreen ' . format ( self . iface_mac ) ] )
2016-11-18 16:17:01 +01:00
self . do_test ( coldplug = None , extra_opts = ' IPv6AcceptRA=False ' , dhcp_mode = ' ipv4 ' )
2016-11-28 12:35:49 +01:00
try :
# should have received the fixed IP above
out = subprocess . check_output ( [ ' ip ' , ' -4 ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
self . assertRegex ( out , b ' inet 192.168.5.210/24 .* scope global dynamic ' )
2016-11-30 08:02:49 +01:00
# should have set transient hostname in hostnamed; this is
# sometimes a bit lagging (issue #4753), so retry a few times
for retry in range ( 1 , 6 ) :
out = subprocess . check_output ( [ ' hostnamectl ' ] )
if b ' testgreen ' in out :
break
time . sleep ( 5 )
sys . stdout . write ( ' [retry %i ] ' % retry )
sys . stdout . flush ( )
else :
2018-02-05 09:28:53 +03:00
self . fail ( ' Transient hostname not found in hostnamectl: \n {} ' . format ( out . decode ( ) ) )
2016-11-28 12:35:49 +01:00
# and also applied to the system
self . assertEqual ( socket . gethostname ( ) , ' testgreen ' )
except AssertionError :
self . show_journal ( ' systemd-networkd.service ' )
self . show_journal ( ' systemd-hostnamed.service ' )
self . print_server_log ( )
raise
2016-11-21 12:53:24 +01:00
def test_transient_hostname_with_static ( self ) :
''' transient hostname is not applied if static hostname exists '''
orig_hostname = socket . gethostname ( )
self . addCleanup ( socket . sethostname , orig_hostname )
2018-12-05 22:00:42 +01:00
2016-11-21 12:53:24 +01:00
if not os . path . exists ( ' /etc/hostname ' ) :
2018-12-05 22:00:42 +01:00
self . write_config ( ' /etc/hostname ' , " foobarqux " )
else :
self . write_config ( ' /run/hostname.tmp ' , " foobarqux " )
subprocess . check_call ( [ ' mount ' , ' --bind ' , ' /run/hostname.tmp ' , ' /etc/hostname ' ] )
self . addCleanup ( subprocess . call , [ ' umount ' , ' /etc/hostname ' ] )
socket . sethostname ( " foobarqux " ) ;
2016-11-21 12:53:24 +01:00
subprocess . check_call ( [ ' systemctl ' , ' stop ' , ' systemd-hostnamed.service ' ] )
2018-12-05 21:59:38 +01:00
self . addCleanup ( subprocess . call , [ ' systemctl ' , ' stop ' , ' systemd-hostnamed.service ' ] )
2016-11-21 12:53:24 +01:00
2018-02-05 09:28:53 +03:00
self . create_iface ( dnsmasq_opts = [ ' --dhcp-host= {} ,192.168.5.210,testgreen ' . format ( self . iface_mac ) ] )
2016-11-21 12:53:24 +01:00
self . do_test ( coldplug = None , extra_opts = ' IPv6AcceptRA=False ' , dhcp_mode = ' ipv4 ' )
2016-11-28 12:35:49 +01:00
try :
# should have received the fixed IP above
out = subprocess . check_output ( [ ' ip ' , ' -4 ' , ' a ' , ' show ' , ' dev ' , self . iface ] )
self . assertRegex ( out , b ' inet 192.168.5.210/24 .* scope global dynamic ' )
# static hostname wins over transient one, thus *not* applied
2018-12-05 22:00:42 +01:00
self . assertEqual ( socket . gethostname ( ) , " foobarqux " )
2016-11-28 12:35:49 +01:00
except AssertionError :
self . show_journal ( ' systemd-networkd.service ' )
self . show_journal ( ' systemd-hostnamed.service ' )
self . print_server_log ( )
raise
2016-11-18 16:17:01 +01:00
2015-11-17 18:30:50 +01:00
class NetworkdClientTest ( ClientTestBase , unittest . TestCase ) :
''' Test networkd client against networkd server '''
def setUp ( self ) :
super ( ) . setUp ( )
self . dnsmasq = None
2016-11-22 08:05:18 +01:00
def create_iface ( self , ipv6 = False , dhcpserver_opts = None ) :
2015-11-17 18:30:50 +01:00
''' Create test interface with DHCP server behind it '''
# run "router-side" networkd in own mount namespace to shield it from
# "client-side" configuration and networkd
( fd , script ) = tempfile . mkstemp ( prefix = ' networkd-router.sh ' )
self . addCleanup ( os . remove , script )
with os . fdopen ( fd , ' w+ ' ) as f :
2016-09-14 06:52:40 -04:00
f . write ( ''' \
2017-12-24 06:53:20 +00:00
#!/bin/sh
set - eu
2015-11-17 18:30:50 +01:00
mkdir - p / run / systemd / network
mkdir - p / run / systemd / netif
mount - t tmpfs none / run / systemd / network
mount - t tmpfs none / run / systemd / netif
[ ! - e / run / dbus ] | | mount - t tmpfs none / run / dbus
# create router/client veth pair
cat << EOF > / run / systemd / network / test . netdev
[ NetDev ]
Name = % ( ifr ) s
Kind = veth
[ Peer ]
Name = % ( ifc ) s
EOF
cat << EOF > / run / systemd / network / test . network
[ Match ]
Name = % ( ifr ) s
[ Network ]
Address = 192.168 .5 .1 / 24
% ( addr6 ) s
DHCPServer = yes
[ DHCPServer ]
PoolOffset = 10
PoolSize = 50
DNS = 192.168 .5 .1
2016-11-22 08:05:18 +01:00
% ( dhopts ) s
2015-11-17 18:30:50 +01:00
EOF
# run networkd as in systemd-networkd.service
2017-08-27 01:48:23 +09:00
exec $ ( systemctl cat systemd - networkd . service | sed - n ' /^ExecStart=/ { s/^.*=//; s/^[@+-]//; s/^!*//; p} ' )
2016-11-22 08:05:18 +01:00
''' % { ' ifr ' : self.if_router, ' ifc ' : self.iface, ' addr6 ' : ipv6 and ' Address=2600::1/64 ' or ' ' ,
' dhopts ' : dhcpserver_opts or ' ' } )
2015-11-17 18:30:50 +01:00
os . fchmod ( fd , 0o755 )
subprocess . check_call ( [ ' systemd-run ' , ' --unit=networkd-test-router.service ' ,
' -p ' , ' InaccessibleDirectories=-/etc/systemd/network ' ,
' -p ' , ' InaccessibleDirectories=-/run/systemd/network ' ,
' -p ' , ' InaccessibleDirectories=-/run/systemd/netif ' ,
' --service-type=notify ' , script ] )
# wait until devices got created
2016-12-01 17:30:31 -05:00
for _ in range ( 50 ) :
2015-11-17 18:30:50 +01:00
out = subprocess . check_output ( [ ' ip ' , ' a ' , ' show ' , ' dev ' , self . if_router ] )
if b ' state UP ' in out and b ' scope global ' in out :
break
time . sleep ( 0.1 )
def shutdown_iface ( self ) :
''' Remove test interface and stop DHCP server '''
if self . if_router :
subprocess . check_call ( [ ' systemctl ' , ' stop ' , ' networkd-test-router.service ' ] )
# ensure failed transient unit does not stay around
subprocess . call ( [ ' systemctl ' , ' reset-failed ' , ' networkd-test-router.service ' ] )
subprocess . call ( [ ' ip ' , ' link ' , ' del ' , ' dev ' , self . if_router ] )
self . if_router = None
def print_server_log ( self ) :
''' Print DHCP server log for debugging failures '''
self . show_journal ( ' networkd-test-router.service ' )
@unittest.skip ( ' networkd does not have DHCPv6 server support ' )
def test_hotplug_dhcp_ip6 ( self ) :
pass
@unittest.skip ( ' networkd does not have DHCPv6 server support ' )
def test_coldplug_dhcp_ip6 ( self ) :
pass
2016-06-03 11:15:44 +02:00
def test_search_domains ( self ) :
# we don't use this interface for this test
self . if_router = None
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.netdev ' , ''' \
2016-09-14 06:52:40 -04:00
[ NetDev ]
2016-06-03 11:15:44 +02:00
Name = dummy0
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.network ' , ''' \
2016-09-14 06:52:40 -04:00
[ Match ]
2016-06-03 11:15:44 +02:00
Name = dummy0
[ Network ]
2019-02-26 23:05:05 +01:00
Address = 192.168 .42 .100 / 24
2016-06-03 11:15:44 +02:00
DNS = 192.168 .42 .1
Domains = one two three four five six seven eight nine ten ''' )
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2016-06-03 11:15:44 +02:00
2016-06-30 15:44:22 +02:00
for timeout in range ( 50 ) :
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
if ' one ' in contents :
break
time . sleep ( 0.1 )
self . assertRegex ( contents , ' search .*one two three four ' )
self . assertNotIn ( ' seven \n ' , contents )
self . assertIn ( ' # Too many search domains configured, remaining ones ignored. \n ' , contents )
2016-06-03 11:15:44 +02:00
def test_search_domains_too_long ( self ) :
# we don't use this interface for this test
self . if_router = None
name_prefix = ' a ' * 60
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.netdev ' , ''' \
2016-09-14 06:52:40 -04:00
[ NetDev ]
2016-06-03 11:15:44 +02:00
Name = dummy0
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.network ' , ''' \
2016-09-14 06:52:40 -04:00
[ Match ]
2016-06-03 11:15:44 +02:00
Name = dummy0
[ Network ]
2019-02-26 23:05:05 +01:00
Address = 192.168 .42 .100 / 24
2016-06-03 11:15:44 +02:00
DNS = 192.168 .42 .1
2016-09-14 06:52:40 -04:00
Domains = { p } 0 { p } 1 { p } 2 { p } 3 { p } 4 ''' .format(p=name_prefix))
2016-06-03 11:15:44 +02:00
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-networkd ' )
2016-06-03 11:15:44 +02:00
2016-06-30 15:44:22 +02:00
for timeout in range ( 50 ) :
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
if ' one ' in contents :
break
time . sleep ( 0.1 )
2016-09-14 06:52:40 -04:00
self . assertRegex ( contents , ' search .* {p} 0 {p} 1 {p} 2 ' . format ( p = name_prefix ) )
2016-06-30 15:44:22 +02:00
self . assertIn ( ' # Total length of all search domains is too long, remaining ones ignored. ' , contents )
2016-06-03 11:15:44 +02:00
2016-07-09 16:55:26 +02:00
def test_dropin ( self ) :
# we don't use this interface for this test
self . if_router = None
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.netdev ' , ''' \
2016-07-09 16:55:26 +02:00
[ NetDev ]
Name = dummy0
Kind = dummy
MACAddress = 12 : 34 : 56 : 78 : 9 a : bc ''' )
2016-11-03 16:27:55 -07:00
self . write_network ( ' test.network ' , ''' \
2016-07-09 16:55:26 +02:00
[ Match ]
Name = dummy0
[ Network ]
2019-02-26 23:05:05 +01:00
Address = 192.168 .42 .100 / 24
2016-07-09 16:55:26 +02:00
DNS = 192.168 .42 .1 ''' )
2016-11-03 16:27:55 -07:00
self . write_network_dropin ( ' test.network ' , ' dns ' , ''' \
2016-07-09 16:55:26 +02:00
[ Network ]
DNS = 127.0 .0 .1 ''' )
2019-02-26 23:03:35 +01:00
self . start_unit ( ' systemd-resolved ' )
self . start_unit ( ' systemd-networkd ' )
2016-07-09 16:55:26 +02:00
for timeout in range ( 50 ) :
with open ( RESOLV_CONF ) as f :
contents = f . read ( )
2019-02-21 12:34:23 +01:00
if ' 127.0.0.1 ' in contents and ' 192.168.42.1 ' in contents :
2016-07-09 16:55:26 +02:00
break
time . sleep ( 0.1 )
self . assertIn ( ' nameserver 192.168.42.1 \n ' , contents )
self . assertIn ( ' nameserver 127.0.0.1 \n ' , contents )
2016-11-22 08:05:18 +01:00
def test_dhcp_timezone ( self ) :
''' networkd sets time zone from DHCP '''
def get_tz ( ) :
out = subprocess . check_output ( [ ' busctl ' , ' get-property ' , ' org.freedesktop.timedate1 ' ,
' /org/freedesktop/timedate1 ' , ' org.freedesktop.timedate1 ' , ' Timezone ' ] )
assert out . startswith ( b ' s " ' )
out = out . strip ( )
assert out . endswith ( b ' " ' )
return out [ 3 : - 1 ] . decode ( )
orig_timezone = get_tz ( )
self . addCleanup ( subprocess . call , [ ' timedatectl ' , ' set-timezone ' , orig_timezone ] )
self . create_iface ( dhcpserver_opts = ' EmitTimezone=yes \n Timezone=Pacific/Honolulu ' )
self . do_test ( coldplug = None , extra_opts = ' IPv6AcceptRA=false \n [DHCP] \n UseTimezone=true ' , dhcp_mode = ' ipv4 ' )
# should have applied the received timezone
try :
self . assertEqual ( get_tz ( ) , ' Pacific/Honolulu ' )
except AssertionError :
self . show_journal ( ' systemd-networkd.service ' )
self . show_journal ( ' systemd-hostnamed.service ' )
raise
2016-12-07 10:12:10 -08:00
class MatchClientTest ( unittest . TestCase , NetworkdTestingUtilities ) :
""" Test [Match] sections in .network files.
Be aware that matching the test host ' s interfaces will wipe their
configuration , so as a precaution , all network files should have a
restrictive [ Match ] section to only ever interfere with the
temporary veth interfaces created here .
"""
def tearDown ( self ) :
""" Stop networkd. """
subprocess . call ( [ ' systemctl ' , ' stop ' , ' systemd-networkd ' ] )
def test_basic_matching ( self ) :
""" Verify the Name= line works throughout this class. """
self . add_veth_pair ( ' test_if1 ' , ' fake_if2 ' )
self . write_network ( ' test.network ' , " [Match] \n Name=test_* \n [Network] " )
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . assert_link_states ( test_if1 = ' managed ' , fake_if2 = ' unmanaged ' )
def test_inverted_matching ( self ) :
""" Verify that a ' ! ' -prefixed value inverts the match. """
# Use a MAC address as the interfaces' common matching attribute
# to avoid depending on udev, to support testing in containers.
mac = ' 00:01:02:03:98:99 '
self . add_veth_pair ( ' test_veth ' , ' test_peer ' ,
[ ' addr ' , mac ] , [ ' addr ' , mac ] )
self . write_network ( ' no-veth.network ' , """ \
[ Match ]
2018-02-05 09:28:53 +03:00
MACAddress = { }
2016-12-07 10:12:10 -08:00
Name = ! nonexistent * peer *
2018-02-05 09:28:53 +03:00
[ Network ] """ .format(mac))
2016-12-07 10:12:10 -08:00
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . assert_link_states ( test_veth = ' managed ' , test_peer = ' unmanaged ' )
2016-09-27 15:18:14 -07:00
class UnmanagedClientTest ( unittest . TestCase , NetworkdTestingUtilities ) :
""" Test if networkd manages the correct interfaces. """
def setUp ( self ) :
""" Write .network files to match the named veth devices. """
# Define the veth+peer pairs to be created.
# Their pairing doesn't actually matter, only their names do.
self . veths = {
' m1def ' : ' m0unm ' ,
' m1man ' : ' m1unm ' ,
}
# Define the contents of .network files to be read in order.
self . configs = (
" [Match] \n Name=m1def \n " ,
" [Match] \n Name=m1unm \n [Link] \n Unmanaged=yes \n " ,
" [Match] \n Name=m1* \n [Link] \n Unmanaged=no \n " ,
)
# Write out the .network files to be cleaned up automatically.
for i , config in enumerate ( self . configs ) :
self . write_network ( " %02d -test.network " % i , config )
def tearDown ( self ) :
""" Stop networkd. """
subprocess . call ( [ ' systemctl ' , ' stop ' , ' systemd-networkd ' ] )
def create_iface ( self ) :
""" Create temporary veth pairs for interface matching. """
for veth , peer in self . veths . items ( ) :
2016-12-07 10:12:10 -08:00
self . add_veth_pair ( veth , peer )
2016-09-27 15:18:14 -07:00
def test_unmanaged_setting ( self ) :
""" Verify link states with Unmanaged= settings, hot-plug. """
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . create_iface ( )
self . assert_link_states ( m1def = ' managed ' ,
m1man = ' managed ' ,
m1unm = ' unmanaged ' ,
m0unm = ' unmanaged ' )
def test_unmanaged_setting_coldplug ( self ) :
""" Verify link states with Unmanaged= settings, cold-plug. """
self . create_iface ( )
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . assert_link_states ( m1def = ' managed ' ,
m1man = ' managed ' ,
m1unm = ' unmanaged ' ,
m0unm = ' unmanaged ' )
def test_catchall_config ( self ) :
""" Verify link states with a catch-all config, hot-plug. """
# Don't actually catch ALL interfaces. It messes up the host.
self . write_network ( ' all.network ' , " [Match] \n Name=m[01]??? \n " )
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . create_iface ( )
self . assert_link_states ( m1def = ' managed ' ,
m1man = ' managed ' ,
m1unm = ' unmanaged ' ,
m0unm = ' managed ' )
def test_catchall_config_coldplug ( self ) :
""" Verify link states with a catch-all config, cold-plug. """
# Don't actually catch ALL interfaces. It messes up the host.
self . write_network ( ' all.network ' , " [Match] \n Name=m[01]??? \n " )
self . create_iface ( )
subprocess . check_call ( [ ' systemctl ' , ' start ' , ' systemd-networkd ' ] )
self . assert_link_states ( m1def = ' managed ' ,
m1man = ' managed ' ,
m1unm = ' unmanaged ' ,
m0unm = ' managed ' )
2015-11-17 18:30:50 +01:00
if __name__ == ' __main__ ' :
unittest . main ( testRunner = unittest . TextTestRunner ( stream = sys . stdout ,
verbosity = 2 ) )