Add SPDX header to python files with the 'py' extension in the test directory. Signed-off-by: Albert Esteve <aesteve@redhat.com>
293 lines
8.5 KiB
Python
293 lines
8.5 KiB
Python
# SPDX-FileCopyrightText: Red Hat, Inc.
|
|
# SPDX-License-Identifier: GPL-2.0-or-later
|
|
|
|
import grp
|
|
import json
|
|
import os
|
|
import pwd
|
|
import subprocess
|
|
|
|
import pytest
|
|
|
|
from contextlib import contextmanager
|
|
|
|
from ovirt_imageio._internal import config
|
|
from ovirt_imageio._internal import server
|
|
from ovirt_imageio._internal import sockutil
|
|
|
|
from . import http
|
|
|
|
DAEMON_CONFIG = """\
|
|
[daemon]
|
|
poll_interval = 0.1
|
|
buffer_size = 131072
|
|
run_dir = {run_dir}
|
|
drop_privileges = {drop_priv}
|
|
user_name = {user_name}
|
|
group_name = {group_name}
|
|
|
|
[tls]
|
|
key_file = test/pki/system/key.pem
|
|
cert_file = test/pki/system/cert.pem
|
|
ca_file = test/pki/system/ca.pem
|
|
|
|
[remote]
|
|
port = 0
|
|
|
|
[local]
|
|
socket =
|
|
|
|
[control]
|
|
transport = unix
|
|
socket = {run_dir}/sock
|
|
|
|
[logger_root]
|
|
level=DEBUG
|
|
|
|
[handler_logfile]
|
|
args=('{log_dir}/daemon.log',)
|
|
"""
|
|
|
|
requires_root = pytest.mark.skipif(os.geteuid() != 0, reason="Requires root")
|
|
requires_unprivileged = pytest.mark.skipif(
|
|
os.geteuid() == 0, reason="Requires unprivileged user")
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_dirs(tmpdir):
|
|
tmpdir.mkdir("run")
|
|
tmpdir.mkdir("log")
|
|
tmpdir.mkdir("conf").mkdir("conf.d")
|
|
|
|
|
|
def test_load_config(monkeypatch, tmpdir):
|
|
# Here we test full scenario, when config is loaded from multiple sources
|
|
# and test, that specified options were overwritten as expected.
|
|
# Install config overwrites the default settings, then vendor config should
|
|
# be loaded and overwrite log handler level and finally admin config should
|
|
# be loaded and overwrite setup of control service previously defined by
|
|
# install config.
|
|
|
|
install_config = """
|
|
[control]
|
|
transport = unix
|
|
socket = test/daemon.sock
|
|
|
|
[handler_logfile]
|
|
class = logging.StreamHandler
|
|
args = ()
|
|
kwargs = {}
|
|
"""
|
|
|
|
vendor_config = """
|
|
[handler_logfile]
|
|
level = ERROR
|
|
"""
|
|
|
|
admin_config = """
|
|
[control]
|
|
transport = tcp
|
|
port = 10000
|
|
"""
|
|
|
|
etc_dir = tmpdir.mkdir("etc")
|
|
etc_conf_d = etc_dir.mkdir("conf.d")
|
|
install_cfg = etc_conf_d.join("50-install.conf")
|
|
install_cfg.write(install_config)
|
|
admin_cfg = etc_conf_d.join("90-admin.conf")
|
|
admin_cfg.write(admin_config)
|
|
|
|
vendor_dir = tmpdir.mkdir("vendor")
|
|
vendor_cfg = vendor_dir.mkdir("conf.d").join("60-vendor.conf")
|
|
vendor_cfg.write(vendor_config)
|
|
|
|
monkeypatch.setattr(server, "VENDOR_CONF_DIR", str(vendor_dir))
|
|
cfg = server.load_config(str(etc_dir))
|
|
|
|
assert cfg.control.transport == "tcp"
|
|
assert cfg.control.port == 10000
|
|
assert cfg.handler_logfile.level == "ERROR"
|
|
assert cfg.handler_logfile.keyword__class == "logging.StreamHandler"
|
|
|
|
|
|
def test_load_config_missing():
|
|
# Fail fast if conf_dir does not exist.
|
|
with pytest.raises(ValueError):
|
|
server.load_config("/no/such/dir")
|
|
|
|
|
|
def test_load_config_no_conf_d(tmpdir):
|
|
# Fail fast if conf_dir does not contain a cond.d sub directory.
|
|
conf_dir = tmpdir.mkdir("user")
|
|
with pytest.raises(ValueError):
|
|
server.load_config(str(conf_dir))
|
|
|
|
|
|
def test_load_config_no_conf_files(tmpdir):
|
|
# Fail fast if conf_dir/cond.d/ does not contain any configurtion file.
|
|
conf_dir = tmpdir.mkdir("user")
|
|
conf_d = conf_dir.mkdir("cond.d")
|
|
|
|
# Create a disabled configuration file, it will be ignored.
|
|
disabled_cfg = conf_d.join("99-user.conf.disabled")
|
|
disabled_cfg.write("[control]\ntransport = tcp\n")
|
|
|
|
with pytest.raises(ValueError):
|
|
server.load_config(str(conf_dir))
|
|
|
|
|
|
def test_show_config():
|
|
cfg = config.load(["test/conf.d/daemon.conf"])
|
|
out = subprocess.check_output(
|
|
["./ovirt-imageio", "--conf-dir", "./test", "--show-config"])
|
|
assert json.loads(out) == config.to_dict(cfg)
|
|
|
|
|
|
@requires_unprivileged
|
|
def test_unprivileged_user(tmpdir, tmp_dirs):
|
|
with started_imageio(tmpdir) as p:
|
|
uid = os.getuid()
|
|
gid = os.getuid()
|
|
status = process_status(p.pid)
|
|
assert status["uids"] == {uid}
|
|
assert status["gids"] == {gid}
|
|
assert status["groups"] == user_groups(uid)
|
|
|
|
# Since we run as unprivileged users the daemon run as the same
|
|
# user and group and no ownership changes are made.
|
|
expected_user = pwd.getpwuid(os.getuid()).pw_name
|
|
expected_group = grp.getgrgid(os.getuid()).gr_name
|
|
assert_ownership(tmpdir, expected_user, expected_group)
|
|
|
|
|
|
@requires_root
|
|
def test_drop_privileges_disable(tmpdir, tmp_dirs):
|
|
with started_imageio(tmpdir, drop_privileges="false") as p:
|
|
# Run under root and privileges shouldn't be dropped and ownership of
|
|
# files shouldn't be changed.
|
|
status = process_status(p.pid)
|
|
assert status["uids"] == {0}
|
|
assert status["gids"] == {0}
|
|
|
|
# Asserting groups directly to root groups fails on Jenkins as the test
|
|
# probably run there under a mock user which inherits some groups.
|
|
# Therefore we assert that the groups are same as groups of the parent
|
|
# process.
|
|
parent_process_status = process_status(os.getpid())
|
|
assert status["groups"] == parent_process_status["groups"]
|
|
assert_ownership(tmpdir, "root", "root")
|
|
|
|
|
|
@requires_root
|
|
def test_drop_privileges(tmpdir, tmp_dirs):
|
|
with started_imageio(tmpdir) as p:
|
|
# Run under root but privileges should be dropped, daemon should run
|
|
# under nobody user and relevant files should be owned by this user.
|
|
status = process_status(p.pid)
|
|
pwnam = pwd.getpwnam("nobody")
|
|
assert status["uids"] == {pwnam.pw_uid}
|
|
assert status["gids"] == {pwnam.pw_gid}
|
|
assert status["groups"] == user_groups("nobody")
|
|
assert_ownership(tmpdir, "nobody", "nobody")
|
|
|
|
|
|
def prepare_config(tmpdir, drop_privileges="true"):
|
|
daemon_conf = DAEMON_CONFIG.format(
|
|
run_dir=os.path.join(tmpdir, "run"),
|
|
log_dir=os.path.join(tmpdir, "log"),
|
|
drop_priv=drop_privileges,
|
|
user_name="nobody",
|
|
group_name="nobody",
|
|
)
|
|
tmpdir.join("conf", "conf.d", "daemon.conf").write(daemon_conf)
|
|
|
|
|
|
@contextmanager
|
|
def started_imageio(tmpdir, drop_privileges="true"):
|
|
prepare_config(tmpdir, drop_privileges=drop_privileges)
|
|
|
|
conf_dir = tmpdir.join("conf")
|
|
|
|
cmd = ["./ovirt-imageio", "--conf-dir", str(conf_dir)]
|
|
proc = subprocess.Popen(cmd)
|
|
try:
|
|
socket = sockutil.UnixAddress(str(tmpdir.join("run", "sock")))
|
|
if not sockutil.wait_for_socket(socket, 10):
|
|
raise RuntimeError("Timeout waiting for {}".format(socket))
|
|
|
|
# Wait until server is listening - at this point it already dropped
|
|
# privileges.
|
|
if drop_privileges:
|
|
cfg = config.load(str(conf_dir.join("conf.d", "daemon.conf")))
|
|
with http.ControlClient(cfg) as c:
|
|
r = c.get("/tickets/no-such-ticket")
|
|
r.read()
|
|
assert r.status == 404
|
|
|
|
yield proc
|
|
finally:
|
|
proc.terminate()
|
|
proc.wait()
|
|
|
|
|
|
def user_groups(user):
|
|
"""
|
|
Return group ids for username (str) or user id (int)
|
|
"""
|
|
gids = subprocess.check_output(["id", "--groups", "-z", str(user)]).decode(
|
|
"utf-8").strip("\0").split("\0")
|
|
return {int(gid) for gid in gids}
|
|
|
|
|
|
def parse_ids(line):
|
|
_, values = line.split(":", 1)
|
|
return set(int(x) for x in values.strip().split())
|
|
|
|
|
|
def process_status(pid):
|
|
"""
|
|
Returns uid, gid and groups of the process.
|
|
Example for the the process with following status:
|
|
|
|
$ cat /proc/2769/status | egrep '^(Uid:|Gid:|Groups:)'
|
|
Uid: 993 993 993 993
|
|
Gid: 990 990 990 990
|
|
Groups: 990
|
|
|
|
the output is:
|
|
|
|
> process_status(2769)
|
|
{'uids': {993}, 'gids': {990}, 'groups': {990}}
|
|
"""
|
|
status_path = os.path.join("/proc", str(pid), "status")
|
|
with open(status_path, "r") as f:
|
|
status_lines = f.readlines()
|
|
|
|
for line in status_lines:
|
|
if line.startswith("Uid:"):
|
|
uids = parse_ids(line)
|
|
if line.startswith("Gid:"):
|
|
gids = parse_ids(line)
|
|
if line.startswith("Groups:"):
|
|
groups = parse_ids(line)
|
|
|
|
return {"uids": uids, "gids": gids, "groups": groups}
|
|
|
|
|
|
def assert_path_owner(path, expected_user, expected_group):
|
|
path_stat = os.stat(path)
|
|
uid = path_stat.st_uid
|
|
gid = path_stat.st_gid
|
|
user = pwd.getpwuid(uid).pw_name
|
|
group = grp.getgrgid(gid).gr_name
|
|
|
|
assert expected_user == user
|
|
assert expected_group == group
|
|
|
|
|
|
def assert_ownership(tmpdir, user, group):
|
|
run_dir = os.path.join(tmpdir, "run")
|
|
assert_path_owner(run_dir, user, group)
|
|
assert_path_owner(str(tmpdir.join("run", "sock")), user, group)
|
|
assert_path_owner(str(tmpdir.join("log", "daemon.log")), user, group)
|