2017-05-01 03:26:56 +03:00
#!/usr/bin/env python3
2020-11-09 07:23:58 +03:00
# SPDX-License-Identifier: LGPL-2.1-or-later
2016-04-28 15:24:53 +03:00
#
2015-01-20 18:08:05 +03:00
# systemd-sysv-generator integration test
#
2018-06-12 18:15:23 +03:00
# © 2015 Canonical Ltd.
2015-01-20 18:08:05 +03:00
# Author: Martin Pitt <martin.pitt@ubuntu.com>
2018-02-05 09:28:53 +03:00
import collections
2015-01-20 18:08:05 +03:00
import os
2018-02-05 09:28:53 +03:00
import shutil
2015-01-20 18:08:05 +03:00
import subprocess
2018-02-05 09:28:53 +03:00
import sys
2015-01-20 18:08:05 +03:00
import tempfile
2018-02-05 09:28:53 +03:00
import unittest
2017-09-26 12:59:08 +03:00
from configparser import RawConfigParser
2018-02-05 09:28:53 +03:00
from glob import glob
2015-01-20 18:08:05 +03:00
2017-09-26 14:34:55 +03:00
sysv_generator = ' ./systemd-sysv-generator '
2015-01-20 18:08:05 +03:00
2016-01-20 17:16:32 +03:00
class MultiDict ( collections . OrderedDict ) :
def __setitem__ ( self , key , value ) :
if isinstance ( value , list ) and key in self :
self [ key ] . extend ( value )
else :
super ( MultiDict , self ) . __setitem__ ( key , value )
2015-01-20 18:08:05 +03:00
class SysvGeneratorTest ( unittest . TestCase ) :
def setUp ( self ) :
self . workdir = tempfile . mkdtemp ( prefix = ' sysv-gen-test. ' )
self . init_d_dir = os . path . join ( self . workdir , ' init.d ' )
os . mkdir ( self . init_d_dir )
self . rcnd_dir = self . workdir
self . unit_dir = os . path . join ( self . workdir , ' systemd ' )
os . mkdir ( self . unit_dir )
self . out_dir = os . path . join ( self . workdir , ' output ' )
os . mkdir ( self . out_dir )
def tearDown ( self ) :
shutil . rmtree ( self . workdir )
#
# Helper methods
#
def run_generator ( self , expect_error = False ) :
''' Run sysv-generator.
Fail if stderr contains any " Fail " , unless expect_error is True .
2019-04-27 03:22:40 +03:00
Return ( stderr , filename - > ConfigParser ) pair with output to stderr and
2015-01-20 18:08:05 +03:00
parsed generated units .
'''
env = os . environ . copy ( )
2023-05-31 15:18:35 +03:00
# We might debug log about errors that aren't actually fatal so let's bump the log level to info to
# prevent those logs from interfering with the test.
env [ ' SYSTEMD_LOG_LEVEL ' ] = ' info '
2015-06-15 09:59:44 +03:00
env [ ' SYSTEMD_LOG_TARGET ' ] = ' console '
2015-01-20 18:08:05 +03:00
env [ ' SYSTEMD_SYSVINIT_PATH ' ] = self . init_d_dir
env [ ' SYSTEMD_SYSVRCND_PATH ' ] = self . rcnd_dir
env [ ' SYSTEMD_UNIT_PATH ' ] = self . unit_dir
gen = subprocess . Popen (
[ sysv_generator , ' ignored ' , ' ignored ' , self . out_dir ] ,
stdout = subprocess . PIPE , stderr = subprocess . PIPE ,
universal_newlines = True , env = env )
( out , err ) = gen . communicate ( )
if not expect_error :
self . assertFalse ( ' Fail ' in err , err )
self . assertEqual ( gen . returncode , 0 , err )
results = { }
for service in glob ( self . out_dir + ' /*.service ' ) :
2015-01-21 11:45:15 +03:00
if os . path . islink ( service ) :
continue
2016-01-20 17:16:32 +03:00
try :
# for python3 we need here strict=False to parse multiple
# lines with the same key
cp = RawConfigParser ( dict_type = MultiDict , strict = False )
except TypeError :
# RawConfigParser in python2 does not have the strict option
# but it allows multiple lines with the same key by default
cp = RawConfigParser ( dict_type = MultiDict )
2015-01-20 18:08:05 +03:00
cp . optionxform = lambda o : o # don't lower-case option names
with open ( service ) as f :
2023-07-05 20:43:43 +03:00
cp . read_file ( f )
2015-01-20 18:08:05 +03:00
results [ os . path . basename ( service ) ] = cp
return ( err , results )
def add_sysv ( self , fname , keys , enable = False , prio = 1 ) :
''' Create a SysV init script with the given keys in the LSB header
There are sensible default values for all fields .
If enable is True , links will be created in the rcN . d dirs . In that
case , the priority can be given with " prio " ( default to 1 ) .
Return path of generated script .
'''
name_without_sh = fname . endswith ( ' .sh ' ) and fname [ : - 3 ] or fname
keys . setdefault ( ' Provides ' , name_without_sh )
keys . setdefault ( ' Required-Start ' , ' $local_fs ' )
keys . setdefault ( ' Required-Stop ' , keys [ ' Required-Start ' ] )
keys . setdefault ( ' Default-Start ' , ' 2 3 4 5 ' )
keys . setdefault ( ' Default-Stop ' , ' 0 1 6 ' )
2018-02-05 09:28:53 +03:00
keys . setdefault ( ' Short-Description ' , ' test {} service ' . format ( name_without_sh ) )
keys . setdefault ( ' Description ' , ' long description for test {} service ' . format ( name_without_sh ) )
2015-01-20 18:08:05 +03:00
script = os . path . join ( self . init_d_dir , fname )
with open ( script , ' w ' ) as f :
f . write ( ' #!/bin/init-d-interpreter \n ### BEGIN INIT INFO \n ' )
for k , v in keys . items ( ) :
if v is not None :
2018-02-05 09:28:53 +03:00
f . write ( ' # {:>20} {} \n ' . format ( k + ' : ' , v ) )
2015-01-20 18:08:05 +03:00
f . write ( ' ### END INIT INFO \n code --goes here \n ' )
os . chmod ( script , 0o755 )
if enable :
def make_link ( prefix , runlevel ) :
2018-02-05 09:28:53 +03:00
d = os . path . join ( self . rcnd_dir , ' rc {} .d ' . format ( runlevel ) )
2015-01-20 18:08:05 +03:00
if not os . path . isdir ( d ) :
os . mkdir ( d )
os . symlink ( ' ../init.d/ ' + fname , os . path . join ( d , prefix + fname ) )
for rl in keys [ ' Default-Start ' ] . split ( ) :
make_link ( ' S %02i ' % prio , rl )
for rl in keys [ ' Default-Stop ' ] . split ( ) :
make_link ( ' K %02i ' % ( 99 - prio ) , rl )
return script
2015-02-19 11:09:57 +03:00
def assert_enabled ( self , unit , targets ) :
''' assert that a unit is enabled in precisely the given targets '''
2015-01-20 18:08:05 +03:00
2015-02-19 11:09:57 +03:00
all_targets = [ ' multi-user ' , ' graphical ' ]
2015-01-20 18:08:05 +03:00
# should be enabled
2015-02-19 11:09:57 +03:00
for target in all_targets :
2018-02-05 09:28:53 +03:00
link = os . path . join ( self . out_dir , ' {} .target.wants ' . format ( target ) , unit )
2015-02-19 11:09:57 +03:00
if target in targets :
unit_file = os . readlink ( link )
2017-07-10 06:59:30 +03:00
# os.path.exists() will fail on a dangling symlink
self . assertTrue ( os . path . exists ( link ) )
2015-02-19 11:09:57 +03:00
self . assertEqual ( os . path . basename ( unit_file ) , unit )
2015-01-20 18:08:05 +03:00
else :
self . assertFalse ( os . path . exists ( link ) ,
2018-02-05 09:28:53 +03:00
' {} unexpectedly exists ' . format ( link ) )
2015-01-20 18:08:05 +03:00
#
# test cases
#
def test_nothing ( self ) :
''' no input files '''
results = self . run_generator ( ) [ 1 ]
self . assertEqual ( results , { } )
self . assertEqual ( os . listdir ( self . out_dir ) , [ ] )
def test_simple_disabled ( self ) :
''' simple service without dependencies, disabled '''
self . add_sysv ( ' foo ' , { } , enable = False )
err , results = self . run_generator ( )
self . assertEqual ( len ( results ) , 1 )
# no enablement links or other stuff
self . assertEqual ( os . listdir ( self . out_dir ) , [ ' foo.service ' ] )
s = results [ ' foo.service ' ]
self . assertEqual ( s . sections ( ) , [ ' Unit ' , ' Service ' ] )
self . assertEqual ( s . get ( ' Unit ' , ' Description ' ) , ' LSB: test foo service ' )
# $local_fs does not need translation, don't expect any dependency
# fields here
self . assertEqual ( set ( s . options ( ' Unit ' ) ) ,
set ( [ ' Documentation ' , ' SourcePath ' , ' Description ' ] ) )
self . assertEqual ( s . get ( ' Service ' , ' Type ' ) , ' forking ' )
init_script = os . path . join ( self . init_d_dir , ' foo ' )
self . assertEqual ( s . get ( ' Service ' , ' ExecStart ' ) ,
2018-02-05 09:28:53 +03:00
' {} start ' . format ( init_script ) )
2015-01-20 18:08:05 +03:00
self . assertEqual ( s . get ( ' Service ' , ' ExecStop ' ) ,
2018-02-05 09:28:53 +03:00
' {} stop ' . format ( init_script ) )
2015-01-20 18:08:05 +03:00
2015-02-19 13:06:24 +03:00
self . assertNotIn ( ' Overwriting ' , err )
2015-01-20 18:08:05 +03:00
def test_simple_enabled_all ( self ) :
''' simple service without dependencies, enabled in all runlevels '''
self . add_sysv ( ' foo ' , { } , enable = True )
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
2015-02-19 11:09:57 +03:00
self . assert_enabled ( ' foo.service ' , [ ' multi-user ' , ' graphical ' ] )
2015-02-19 13:06:24 +03:00
self . assertNotIn ( ' Overwriting ' , err )
2015-01-20 18:08:05 +03:00
2015-06-28 03:00:32 +03:00
def test_simple_escaped ( self ) :
''' simple service without dependencies, that requires escaping the name '''
self . add_sysv ( ' foo+ ' , { } )
self . add_sysv ( ' foo-admin ' , { } )
err , results = self . run_generator ( )
2015-07-01 08:34:23 +03:00
self . assertEqual ( set ( results ) , { ' foo-admin.service ' , ' foo \\ x2b.service ' } )
2015-06-28 03:00:32 +03:00
self . assertNotIn ( ' Overwriting ' , err )
2015-01-20 18:08:05 +03:00
def test_simple_enabled_some ( self ) :
''' simple service without dependencies, enabled in some runlevels '''
self . add_sysv ( ' foo ' , { ' Default-Start ' : ' 2 4 ' } , enable = True )
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
2015-02-19 11:09:57 +03:00
self . assert_enabled ( ' foo.service ' , [ ' multi-user ' ] )
2015-01-20 18:08:05 +03:00
def test_lsb_macro_dep_single ( self ) :
''' single LSB macro dependency: $network '''
self . add_sysv ( ' foo ' , { ' Required-Start ' : ' $network ' } )
s = self . run_generator ( ) [ 1 ] [ ' foo.service ' ]
self . assertEqual ( set ( s . options ( ' Unit ' ) ) ,
set ( [ ' Documentation ' , ' SourcePath ' , ' Description ' , ' After ' , ' Wants ' ] ) )
self . assertEqual ( s . get ( ' Unit ' , ' After ' ) , ' network-online.target ' )
self . assertEqual ( s . get ( ' Unit ' , ' Wants ' ) , ' network-online.target ' )
def test_lsb_macro_dep_multi ( self ) :
''' multiple LSB macro dependencies '''
self . add_sysv ( ' foo ' , { ' Required-Start ' : ' $named $portmap ' } )
s = self . run_generator ( ) [ 1 ] [ ' foo.service ' ]
self . assertEqual ( set ( s . options ( ' Unit ' ) ) ,
set ( [ ' Documentation ' , ' SourcePath ' , ' Description ' , ' After ' ] ) )
2016-01-20 17:16:32 +03:00
self . assertEqual ( s . get ( ' Unit ' , ' After ' ) . split ( ) , [ ' nss-lookup.target ' , ' rpcbind.target ' ] )
2015-01-20 18:08:05 +03:00
def test_lsb_deps ( self ) :
''' LSB header dependencies to other services '''
# also give symlink priorities here; they should be ignored
self . add_sysv ( ' foo ' , { ' Required-Start ' : ' must1 must2 ' ,
' Should-Start ' : ' may1 ne_may2 ' } ,
enable = True , prio = 40 )
self . add_sysv ( ' must1 ' , { } , enable = True , prio = 10 )
self . add_sysv ( ' must2 ' , { } , enable = True , prio = 15 )
self . add_sysv ( ' may1 ' , { } , enable = True , prio = 20 )
# do not create ne_may2
err , results = self . run_generator ( )
self . assertEqual ( sorted ( results ) ,
[ ' foo.service ' , ' may1.service ' , ' must1.service ' , ' must2.service ' ] )
# foo should depend on all of them
self . assertEqual ( sorted ( results [ ' foo.service ' ] . get ( ' Unit ' , ' After ' ) . split ( ) ) ,
[ ' may1.service ' , ' must1.service ' , ' must2.service ' , ' ne_may2.service ' ] )
# other services should not depend on each other
self . assertFalse ( results [ ' must1.service ' ] . has_option ( ' Unit ' , ' After ' ) )
self . assertFalse ( results [ ' must2.service ' ] . has_option ( ' Unit ' , ' After ' ) )
self . assertFalse ( results [ ' may1.service ' ] . has_option ( ' Unit ' , ' After ' ) )
def test_symlink_prio_deps ( self ) :
''' script without LSB headers use rcN.d priority '''
# create two init.d scripts without LSB header and enable them with
# startup priorities
for prio , name in [ ( 10 , ' provider ' ) , ( 15 , ' consumer ' ) ] :
with open ( os . path . join ( self . init_d_dir , name ) , ' w ' ) as f :
f . write ( ' #!/bin/init-d-interpreter \n code --goes here \n ' )
os . fchmod ( f . fileno ( ) , 0o755 )
d = os . path . join ( self . rcnd_dir , ' rc2.d ' )
if not os . path . isdir ( d ) :
os . mkdir ( d )
2018-02-05 09:28:53 +03:00
os . symlink ( ' ../init.d/ ' + name , os . path . join ( d , ' S {:>2} {} ' . format ( prio , name ) ) )
2015-01-20 18:08:05 +03:00
err , results = self . run_generator ( )
self . assertEqual ( sorted ( results ) , [ ' consumer.service ' , ' provider.service ' ] )
self . assertFalse ( results [ ' provider.service ' ] . has_option ( ' Unit ' , ' After ' ) )
self . assertEqual ( results [ ' consumer.service ' ] . get ( ' Unit ' , ' After ' ) ,
' provider.service ' )
def test_multiple_provides ( self ) :
''' multiple Provides: names '''
self . add_sysv ( ' foo ' , { ' Provides ' : ' foo bar baz ' } )
2015-01-21 11:45:15 +03:00
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
self . assertEqual ( set ( results [ ' foo.service ' ] . options ( ' Unit ' ) ) ,
2015-01-20 18:08:05 +03:00
set ( [ ' Documentation ' , ' SourcePath ' , ' Description ' ] ) )
# should create symlinks for the alternative names
for f in [ ' bar.service ' , ' baz.service ' ] :
self . assertEqual ( os . readlink ( os . path . join ( self . out_dir , f ) ) ,
' foo.service ' )
2015-02-19 13:06:24 +03:00
self . assertNotIn ( ' Overwriting ' , err )
2015-01-20 18:08:05 +03:00
2015-06-28 03:00:32 +03:00
def test_provides_escaped ( self ) :
''' a script that Provides: a name that requires escaping '''
self . add_sysv ( ' foo ' , { ' Provides ' : ' foo foo+ ' } )
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
self . assertEqual ( os . readlink ( os . path . join ( self . out_dir , ' foo \\ x2b.service ' ) ) ,
2016-12-02 01:30:31 +03:00
' foo.service ' )
2015-06-28 03:00:32 +03:00
self . assertNotIn ( ' Overwriting ' , err )
2015-01-21 12:25:14 +03:00
def test_same_provides_in_multiple_scripts ( self ) :
''' multiple init.d scripts provide the same name '''
self . add_sysv ( ' foo ' , { ' Provides ' : ' foo common ' } , enable = True , prio = 1 )
self . add_sysv ( ' bar ' , { ' Provides ' : ' bar common ' } , enable = True , prio = 2 )
err , results = self . run_generator ( )
self . assertEqual ( sorted ( results ) , [ ' bar.service ' , ' foo.service ' ] )
# should create symlink for the alternative name for either unit
self . assertIn ( os . readlink ( os . path . join ( self . out_dir , ' common.service ' ) ) ,
[ ' foo.service ' , ' bar.service ' ] )
def test_provide_other_script ( self ) :
''' init.d scripts provides the name of another init.d script '''
self . add_sysv ( ' foo ' , { ' Provides ' : ' foo bar ' } , enable = True )
self . add_sysv ( ' bar ' , { ' Provides ' : ' bar ' } , enable = True )
err , results = self . run_generator ( )
self . assertEqual ( sorted ( results ) , [ ' bar.service ' , ' foo.service ' ] )
2015-02-19 13:06:24 +03:00
# we do expect an overwrite here, bar.service should overwrite the
# alias link from foo.service
self . assertIn ( ' Overwriting ' , err )
2015-01-21 12:25:14 +03:00
2015-01-20 18:08:05 +03:00
def test_nonexecutable_script ( self ) :
''' ignores non-executable init.d script '''
os . chmod ( self . add_sysv ( ' foo ' , { } ) , 0o644 )
err , results = self . run_generator ( )
self . assertEqual ( results , { } )
2015-01-20 18:41:31 +03:00
def test_sh_suffix ( self ) :
''' init.d script with .sh suffix '''
self . add_sysv ( ' foo.sh ' , { } , enable = True )
err , results = self . run_generator ( )
s = results [ ' foo.service ' ]
self . assertEqual ( s . sections ( ) , [ ' Unit ' , ' Service ' ] )
# should not have a .sh
self . assertEqual ( s . get ( ' Unit ' , ' Description ' ) , ' LSB: test foo service ' )
# calls correct script with .sh
init_script = os . path . join ( self . init_d_dir , ' foo.sh ' )
self . assertEqual ( s . get ( ' Service ' , ' ExecStart ' ) ,
2018-02-05 09:28:53 +03:00
' {} start ' . format ( init_script ) )
2015-01-20 18:41:31 +03:00
self . assertEqual ( s . get ( ' Service ' , ' ExecStop ' ) ,
2018-02-05 09:28:53 +03:00
' {} stop ' . format ( init_script ) )
2015-01-20 18:41:31 +03:00
2015-02-19 11:09:57 +03:00
self . assert_enabled ( ' foo.service ' , [ ' multi-user ' , ' graphical ' ] )
2015-01-20 18:41:31 +03:00
def test_sh_suffix_with_provides ( self ) :
''' init.d script with .sh suffix and Provides: '''
self . add_sysv ( ' foo.sh ' , { ' Provides ' : ' foo bar ' } )
err , results = self . run_generator ( )
# ensure we don't try to create a symlink to itself
2015-04-28 16:44:23 +03:00
self . assertNotIn ( ' itself ' , err )
2015-01-20 18:41:31 +03:00
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
self . assertEqual ( results [ ' foo.service ' ] . get ( ' Unit ' , ' Description ' ) ,
' LSB: test foo service ' )
# should create symlink for the alternative name
self . assertEqual ( os . readlink ( os . path . join ( self . out_dir , ' bar.service ' ) ) ,
' foo.service ' )
2015-01-21 17:30:01 +03:00
def test_hidden_files ( self ) :
''' init.d script with hidden file suffix '''
script = self . add_sysv ( ' foo ' , { } , enable = True )
# backup files (not enabled in rcN.d/)
shutil . copy ( script , script + ' .dpkg-new ' )
shutil . copy ( script , script + ' .dpkg-dist ' )
shutil . copy ( script , script + ' .swp ' )
shutil . copy ( script , script + ' .rpmsave ' )
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ' foo.service ' ] )
2015-02-19 11:09:57 +03:00
self . assert_enabled ( ' foo.service ' , [ ' multi-user ' , ' graphical ' ] )
2015-01-20 18:41:31 +03:00
2015-01-21 12:25:14 +03:00
def test_backup_file ( self ) :
''' init.d script with backup file '''
script = self . add_sysv ( ' foo ' , { } , enable = True )
# backup files (not enabled in rcN.d/)
shutil . copy ( script , script + ' .bak ' )
shutil . copy ( script , script + ' .old ' )
2016-04-29 17:17:43 +03:00
shutil . copy ( script , script + ' .tmp ' )
shutil . copy ( script , script + ' .new ' )
2015-01-21 12:25:14 +03:00
err , results = self . run_generator ( )
print ( err )
2016-04-29 17:17:43 +03:00
self . assertEqual ( sorted ( results ) , [ ' foo.service ' , ' foo.tmp.service ' ] )
2015-01-21 12:25:14 +03:00
# ensure we don't try to create a symlink to itself
2015-04-28 16:44:23 +03:00
self . assertNotIn ( ' itself ' , err )
2015-01-21 12:25:14 +03:00
2015-02-19 11:09:57 +03:00
self . assert_enabled ( ' foo.service ' , [ ' multi-user ' , ' graphical ' ] )
2015-01-21 12:25:14 +03:00
self . assert_enabled ( ' foo.bak.service ' , [ ] )
self . assert_enabled ( ' foo.old.service ' , [ ] )
2014-07-03 00:00:00 +04:00
def test_existing_native_unit ( self ) :
''' existing native unit '''
with open ( os . path . join ( self . unit_dir , ' foo.service ' ) , ' w ' ) as f :
f . write ( ' [Unit] \n ' )
self . add_sysv ( ' foo.sh ' , { ' Provides ' : ' foo bar ' } , enable = True )
err , results = self . run_generator ( )
self . assertEqual ( list ( results ) , [ ] )
# no enablement or alias links, as native unit is disabled
self . assertEqual ( os . listdir ( self . out_dir ) , [ ] )
2015-01-20 18:08:05 +03:00
if __name__ == ' __main__ ' :
unittest . main ( testRunner = unittest . TextTestRunner ( stream = sys . stdout , verbosity = 2 ) )