ovirt-imageio/test/ioutil_test.py
Albert Esteve fc6e58d9e6 reuse: addheader test/*.py
Add SPDX header to python files with
the 'py' extension in the test directory.

Signed-off-by: Albert Esteve <aesteve@redhat.com>
2022-10-18 13:04:20 +02:00

301 lines
8.4 KiB
Python

# SPDX-FileCopyrightText: Red Hat, Inc.
# SPDX-License-Identifier: GPL-2.0-or-later
import errno
import os
import subprocess
from contextlib import closing
import pytest
from ovirt_imageio._internal import ioutil
from ovirt_imageio._internal import util
BLOCKSIZE = 4096
requires_root = pytest.mark.skipif(os.geteuid() != 0, reason="Requires root")
@pytest.fixture
def loop_device(tmpdir):
backing_file = str(tmpdir.join("backing_file"))
with util.open(backing_file, "w") as f:
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf):
buf[:] = b"x" * BLOCKSIZE * 3
f.write(buf)
out = subprocess.check_output(
["losetup", "--find", backing_file, "--show"])
try:
loop = out.strip().decode("ascii")
yield loop
finally:
subprocess.check_call(["losetup", "--detach", loop])
@requires_root
def test_zeroout_start(loop_device):
with util.open(loop_device, "r+") as f:
ioutil.blkzeroout(f.fileno(), 0, BLOCKSIZE)
with util.open(loop_device, "r") as f:
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf):
f.readinto(buf)
assert buf[:BLOCKSIZE] == b"\0" * BLOCKSIZE
assert buf[BLOCKSIZE:] == b"x" * BLOCKSIZE * 2
@requires_root
def test_zeroout_middle(loop_device):
with util.open(loop_device, "r+") as f:
ioutil.blkzeroout(f.fileno(), BLOCKSIZE, BLOCKSIZE)
with util.open(loop_device, "r") as f:
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf):
f.readinto(buf)
assert buf[:BLOCKSIZE] == b"x" * BLOCKSIZE
assert buf[BLOCKSIZE:-BLOCKSIZE] == b"\0" * BLOCKSIZE
assert buf[-BLOCKSIZE:] == b"x" * BLOCKSIZE
@requires_root
def test_zeroout_end(loop_device):
with util.open(loop_device, "r+") as f:
ioutil.blkzeroout(f.fileno(), BLOCKSIZE * 2, BLOCKSIZE)
with util.open(loop_device, "r") as f:
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf):
f.readinto(buf)
assert buf[:-BLOCKSIZE] == b"x" * BLOCKSIZE * 2
assert buf[-BLOCKSIZE:] == b"\0" * BLOCKSIZE
@requires_root
def test_blksszget_512(loop_device):
with util.open(loop_device, "r+") as f:
assert ioutil.blksszget(f.fileno()) == 512
@requires_root
def test_blksszget_bad_fd(loop_device):
with pytest.raises(OSError):
ioutil.blksszget(-1)
@requires_root
def test_blksszget_not_block_device(loop_device, tmpfile):
with open(tmpfile) as f:
with pytest.raises(OSError):
ioutil.blksszget(f.fileno())
# Empty zero buffer
@pytest.mark.parametrize("buf", [
pytest.param("", id="unicode"),
pytest.param(b"", id="bytes"),
pytest.param(bytearray(), id="bytearray"),
])
def test_is_zero_empty(buf):
assert ioutil.is_zero(buf)
@pytest.mark.parametrize("buf", [
pytest.param(b"", id="bytes"),
pytest.param(bytearray(), id="bytearray"),
])
def test_is_zero_empty_memoryview(buf):
assert ioutil.is_zero(memoryview(buf))
# Non-empty zero buffer
@pytest.mark.parametrize("buf", [
pytest.param("\u0000" * 512, id="unicode"),
pytest.param(b"\0" * 512, id="bytes"),
pytest.param(bytearray(512), id="bytearray"),
])
def test_is_zero(buf):
assert ioutil.is_zero(buf)
@pytest.mark.parametrize("buf", [
pytest.param(b"\0" * 512, id="bytes"),
pytest.param(bytearray(512), id="bytearray"),
])
def test_is_zero_memoryview(buf):
assert ioutil.is_zero(memoryview(buf))
# Non-zero buffer with non-zero in first 16 bytes
@pytest.mark.parametrize("buf", [
pytest.param("\u0000" * 15 + "x", id="unicode"),
pytest.param(b"\0" * 15 + b"x", id="bytes"),
pytest.param(bytearray(b"\0" * 15 + b"x"), id="bytearray"),
])
def test_is_not_zero_head(buf):
assert not ioutil.is_zero(buf)
@pytest.mark.parametrize("buf", [
pytest.param(b"\0" * 15 + b"x", id="bytes"),
pytest.param(bytearray(b"\0" * 15 + b"x"), id="bytearray"),
])
def test_is_not_zero_head_memoryview(buf):
assert not ioutil.is_zero(memoryview(buf))
# Non-zero buffer
@pytest.mark.parametrize("buf", [
pytest.param("\u0000" * 511 + "x", id="unicode"),
pytest.param(b"\0" * 511 + b"x", id="bytes"),
pytest.param(bytearray(b"\0" * 511 + b"x"), id="bytearray"),
])
def test_is_not_zero(buf):
assert not ioutil.is_zero(buf)
@pytest.mark.parametrize("buf", [
pytest.param(b"\0" * 511 + b"x", id="bytes"),
pytest.param(bytearray(b"\0" * 511 + b"x"), id="bytearray"),
])
def test_is_not_zero_memoryview(buf):
assert not ioutil.is_zero(memoryview(buf))
# Checking mmap
@pytest.fixture
def aligned_buffer():
buf = util.aligned_buffer(BLOCKSIZE)
with closing(buf):
yield buf
def test_is_zero_mmap(aligned_buffer):
assert ioutil.is_zero(aligned_buffer)
def test_is_not_zero_mmap_head(aligned_buffer):
aligned_buffer[15:16] = b"x"
assert not ioutil.is_zero(aligned_buffer)
def test_is_not_zero_mmap(aligned_buffer):
aligned_buffer[-1:] = b"x"
assert not ioutil.is_zero(aligned_buffer)
# fallocate
fallocate_mode = pytest.mark.parametrize("mode", [
# Zero byte range, allocating space - for preallocated images.
# Not supported on NFS 4.2.
ioutil.FALLOC_FL_ZERO_RANGE,
# Zero byte range, daallocating space - for sparse images.
# Supported on NFS 4.2.
ioutil.FALLOC_FL_PUNCH_HOLE | ioutil.FALLOC_FL_KEEP_SIZE,
])
@fallocate_mode
def test_fallocate_zero_start(tmpdir, mode):
path = str(tmpdir.join("file"))
with open(path, "wb") as f:
f.write(b"x" * BLOCKSIZE * 3)
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf), util.open(path, "r+") as f:
try_fallocate(f.fileno(), mode, 0, BLOCKSIZE)
n = f.readinto(buf)
assert n == BLOCKSIZE * 3
assert buf[:BLOCKSIZE] == b"\0" * BLOCKSIZE
assert buf[BLOCKSIZE:] == b"x" * BLOCKSIZE * 2
assert f.readinto(buf) == 0
@fallocate_mode
def test_fallocate_zero_middle(tmpdir, mode):
path = str(tmpdir.join("file"))
with open(path, "wb") as f:
f.write(b"x" * BLOCKSIZE * 3)
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf), util.open(path, "r+") as f:
try_fallocate(f.fileno(), mode, BLOCKSIZE, BLOCKSIZE)
n = f.readinto(buf)
assert n == BLOCKSIZE * 3
assert buf[:BLOCKSIZE] == b"x" * BLOCKSIZE
assert buf[BLOCKSIZE:-BLOCKSIZE] == b"\0" * BLOCKSIZE
assert buf[-BLOCKSIZE:] == b"x" * BLOCKSIZE
assert f.readinto(buf) == 0
@fallocate_mode
def test_fallocate_zero_end(tmpdir, mode):
path = str(tmpdir.join("file"))
with open(path, "wb") as f:
f.write(b"x" * BLOCKSIZE * 3)
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf), util.open(path, "r+") as f:
try_fallocate(f.fileno(), mode, BLOCKSIZE * 2, BLOCKSIZE)
n = f.readinto(buf)
assert n == BLOCKSIZE * 3
assert buf[:-BLOCKSIZE] == b"x" * BLOCKSIZE * 2
assert buf[-BLOCKSIZE:] == b"\0" * BLOCKSIZE
assert f.readinto(buf) == 0
def test_fallocate_zero_after_end(tmpdir):
path = str(tmpdir.join("file"))
with open(path, "wb") as f:
f.write(b"x" * BLOCKSIZE * 3)
buf = util.aligned_buffer(BLOCKSIZE * 4)
with closing(buf), util.open(path, "r+") as f:
# Will allocate more space that will return zeros when read.
mode = ioutil.FALLOC_FL_ZERO_RANGE
try_fallocate(f.fileno(), mode, BLOCKSIZE * 3, BLOCKSIZE)
n = f.readinto(buf)
assert n == BLOCKSIZE * 4
assert buf[:-BLOCKSIZE] == b"x" * BLOCKSIZE * 3
assert buf[-BLOCKSIZE:] == b"\0" * BLOCKSIZE
assert f.readinto(buf) == 0
def test_fallocate_punch_hole_after_end(tmpdir):
path = str(tmpdir.join("file"))
with open(path, "wb") as f:
f.write(b"x" * BLOCKSIZE * 3)
buf = util.aligned_buffer(BLOCKSIZE * 3)
with closing(buf), util.open(path, "r+") as f:
# This does not change file contents or size.
mode = ioutil.FALLOC_FL_PUNCH_HOLE | ioutil.FALLOC_FL_KEEP_SIZE
try_fallocate(f.fileno(), mode, BLOCKSIZE * 3, BLOCKSIZE)
n = f.readinto(buf)
assert n == BLOCKSIZE * 3
assert buf[:] == b"x" * BLOCKSIZE * 3
assert f.readinto(buf) == 0
def try_fallocate(fd, mode, offset, count):
try:
ioutil.fallocate(fd, mode, offset, count)
except EnvironmentError as e:
if e.errno != errno.EOPNOTSUPP:
raise
pytest.skip("fallocate(mode=%r) not supported" % mode)