ovirt-imageio/test/server_test.py
Albert Esteve fc6e58d9e6 reuse: addheader test/*.py
Add SPDX header to python files with
the 'py' extension in the test directory.

Signed-off-by: Albert Esteve <aesteve@redhat.com>
2022-10-18 13:04:20 +02:00

293 lines
8.5 KiB
Python

# SPDX-FileCopyrightText: Red Hat, Inc.
# SPDX-License-Identifier: GPL-2.0-or-later
import grp
import json
import os
import pwd
import subprocess
import pytest
from contextlib import contextmanager
from ovirt_imageio._internal import config
from ovirt_imageio._internal import server
from ovirt_imageio._internal import sockutil
from . import http
DAEMON_CONFIG = """\
[daemon]
poll_interval = 0.1
buffer_size = 131072
run_dir = {run_dir}
drop_privileges = {drop_priv}
user_name = {user_name}
group_name = {group_name}
[tls]
key_file = test/pki/system/key.pem
cert_file = test/pki/system/cert.pem
ca_file = test/pki/system/ca.pem
[remote]
port = 0
[local]
socket =
[control]
transport = unix
socket = {run_dir}/sock
[logger_root]
level=DEBUG
[handler_logfile]
args=('{log_dir}/daemon.log',)
"""
requires_root = pytest.mark.skipif(os.geteuid() != 0, reason="Requires root")
requires_unprivileged = pytest.mark.skipif(
os.geteuid() == 0, reason="Requires unprivileged user")
@pytest.fixture
def tmp_dirs(tmpdir):
tmpdir.mkdir("run")
tmpdir.mkdir("log")
tmpdir.mkdir("conf").mkdir("conf.d")
def test_load_config(monkeypatch, tmpdir):
# Here we test full scenario, when config is loaded from multiple sources
# and test, that specified options were overwritten as expected.
# Install config overwrites the default settings, then vendor config should
# be loaded and overwrite log handler level and finally admin config should
# be loaded and overwrite setup of control service previously defined by
# install config.
install_config = """
[control]
transport = unix
socket = test/daemon.sock
[handler_logfile]
class = logging.StreamHandler
args = ()
kwargs = {}
"""
vendor_config = """
[handler_logfile]
level = ERROR
"""
admin_config = """
[control]
transport = tcp
port = 10000
"""
etc_dir = tmpdir.mkdir("etc")
etc_conf_d = etc_dir.mkdir("conf.d")
install_cfg = etc_conf_d.join("50-install.conf")
install_cfg.write(install_config)
admin_cfg = etc_conf_d.join("90-admin.conf")
admin_cfg.write(admin_config)
vendor_dir = tmpdir.mkdir("vendor")
vendor_cfg = vendor_dir.mkdir("conf.d").join("60-vendor.conf")
vendor_cfg.write(vendor_config)
monkeypatch.setattr(server, "VENDOR_CONF_DIR", str(vendor_dir))
cfg = server.load_config(str(etc_dir))
assert cfg.control.transport == "tcp"
assert cfg.control.port == 10000
assert cfg.handler_logfile.level == "ERROR"
assert cfg.handler_logfile.keyword__class == "logging.StreamHandler"
def test_load_config_missing():
# Fail fast if conf_dir does not exist.
with pytest.raises(ValueError):
server.load_config("/no/such/dir")
def test_load_config_no_conf_d(tmpdir):
# Fail fast if conf_dir does not contain a cond.d sub directory.
conf_dir = tmpdir.mkdir("user")
with pytest.raises(ValueError):
server.load_config(str(conf_dir))
def test_load_config_no_conf_files(tmpdir):
# Fail fast if conf_dir/cond.d/ does not contain any configurtion file.
conf_dir = tmpdir.mkdir("user")
conf_d = conf_dir.mkdir("cond.d")
# Create a disabled configuration file, it will be ignored.
disabled_cfg = conf_d.join("99-user.conf.disabled")
disabled_cfg.write("[control]\ntransport = tcp\n")
with pytest.raises(ValueError):
server.load_config(str(conf_dir))
def test_show_config():
cfg = config.load(["test/conf.d/daemon.conf"])
out = subprocess.check_output(
["./ovirt-imageio", "--conf-dir", "./test", "--show-config"])
assert json.loads(out) == config.to_dict(cfg)
@requires_unprivileged
def test_unprivileged_user(tmpdir, tmp_dirs):
with started_imageio(tmpdir) as p:
uid = os.getuid()
gid = os.getuid()
status = process_status(p.pid)
assert status["uids"] == {uid}
assert status["gids"] == {gid}
assert status["groups"] == user_groups(uid)
# Since we run as unprivileged users the daemon run as the same
# user and group and no ownership changes are made.
expected_user = pwd.getpwuid(os.getuid()).pw_name
expected_group = grp.getgrgid(os.getuid()).gr_name
assert_ownership(tmpdir, expected_user, expected_group)
@requires_root
def test_drop_privileges_disable(tmpdir, tmp_dirs):
with started_imageio(tmpdir, drop_privileges="false") as p:
# Run under root and privileges shouldn't be dropped and ownership of
# files shouldn't be changed.
status = process_status(p.pid)
assert status["uids"] == {0}
assert status["gids"] == {0}
# Asserting groups directly to root groups fails on Jenkins as the test
# probably run there under a mock user which inherits some groups.
# Therefore we assert that the groups are same as groups of the parent
# process.
parent_process_status = process_status(os.getpid())
assert status["groups"] == parent_process_status["groups"]
assert_ownership(tmpdir, "root", "root")
@requires_root
def test_drop_privileges(tmpdir, tmp_dirs):
with started_imageio(tmpdir) as p:
# Run under root but privileges should be dropped, daemon should run
# under nobody user and relevant files should be owned by this user.
status = process_status(p.pid)
pwnam = pwd.getpwnam("nobody")
assert status["uids"] == {pwnam.pw_uid}
assert status["gids"] == {pwnam.pw_gid}
assert status["groups"] == user_groups("nobody")
assert_ownership(tmpdir, "nobody", "nobody")
def prepare_config(tmpdir, drop_privileges="true"):
daemon_conf = DAEMON_CONFIG.format(
run_dir=os.path.join(tmpdir, "run"),
log_dir=os.path.join(tmpdir, "log"),
drop_priv=drop_privileges,
user_name="nobody",
group_name="nobody",
)
tmpdir.join("conf", "conf.d", "daemon.conf").write(daemon_conf)
@contextmanager
def started_imageio(tmpdir, drop_privileges="true"):
prepare_config(tmpdir, drop_privileges=drop_privileges)
conf_dir = tmpdir.join("conf")
cmd = ["./ovirt-imageio", "--conf-dir", str(conf_dir)]
proc = subprocess.Popen(cmd)
try:
socket = sockutil.UnixAddress(str(tmpdir.join("run", "sock")))
if not sockutil.wait_for_socket(socket, 10):
raise RuntimeError("Timeout waiting for {}".format(socket))
# Wait until server is listening - at this point it already dropped
# privileges.
if drop_privileges:
cfg = config.load(str(conf_dir.join("conf.d", "daemon.conf")))
with http.ControlClient(cfg) as c:
r = c.get("/tickets/no-such-ticket")
r.read()
assert r.status == 404
yield proc
finally:
proc.terminate()
proc.wait()
def user_groups(user):
"""
Return group ids for username (str) or user id (int)
"""
gids = subprocess.check_output(["id", "--groups", "-z", str(user)]).decode(
"utf-8").strip("\0").split("\0")
return {int(gid) for gid in gids}
def parse_ids(line):
_, values = line.split(":", 1)
return set(int(x) for x in values.strip().split())
def process_status(pid):
"""
Returns uid, gid and groups of the process.
Example for the the process with following status:
$ cat /proc/2769/status | egrep '^(Uid:|Gid:|Groups:)'
Uid: 993 993 993 993
Gid: 990 990 990 990
Groups: 990
the output is:
> process_status(2769)
{'uids': {993}, 'gids': {990}, 'groups': {990}}
"""
status_path = os.path.join("/proc", str(pid), "status")
with open(status_path, "r") as f:
status_lines = f.readlines()
for line in status_lines:
if line.startswith("Uid:"):
uids = parse_ids(line)
if line.startswith("Gid:"):
gids = parse_ids(line)
if line.startswith("Groups:"):
groups = parse_ids(line)
return {"uids": uids, "gids": gids, "groups": groups}
def assert_path_owner(path, expected_user, expected_group):
path_stat = os.stat(path)
uid = path_stat.st_uid
gid = path_stat.st_gid
user = pwd.getpwuid(uid).pw_name
group = grp.getgrgid(gid).gr_name
assert expected_user == user
assert expected_group == group
def assert_ownership(tmpdir, user, group):
run_dir = os.path.join(tmpdir, "run")
assert_path_owner(run_dir, user, group)
assert_path_owner(str(tmpdir.join("run", "sock")), user, group)
assert_path_owner(str(tmpdir.join("log", "daemon.log")), user, group)