2022-10-06 13:33:45 +02:00
# SPDX-FileCopyrightText: Red Hat, Inc.
# SPDX-License-Identifier: GPL-2.0-or-later
2017-12-17 14:44:32 +02:00
2017-12-16 02:07:28 +02:00
import errno
2017-12-17 14:44:32 +02:00
import os
2020-02-20 14:17:59 +01:00
import subprocess
2017-12-17 14:44:32 +02:00
from contextlib import closing
import pytest
2020-05-10 09:40:03 +03:00
from ovirt_imageio . _internal import ioutil
from ovirt_imageio . _internal import util
2017-12-17 14:44:32 +02:00
BLOCKSIZE = 4096
requires_root = pytest . mark . skipif ( os . geteuid ( ) != 0 , reason = " Requires root " )
@pytest.fixture
def loop_device ( tmpdir ) :
backing_file = str ( tmpdir . join ( " backing_file " ) )
2018-12-03 17:27:02 +02:00
with util . open ( backing_file , " w " ) as f :
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2017-12-17 14:44:32 +02:00
with closing ( buf ) :
buf [ : ] = b " x " * BLOCKSIZE * 3
f . write ( buf )
out = subprocess . check_output (
[ " losetup " , " --find " , backing_file , " --show " ] )
try :
loop = out . strip ( ) . decode ( " ascii " )
yield loop
finally :
subprocess . check_call ( [ " losetup " , " --detach " , loop ] )
@requires_root
def test_zeroout_start ( loop_device ) :
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r+ " ) as f :
2017-12-17 14:44:32 +02:00
ioutil . blkzeroout ( f . fileno ( ) , 0 , BLOCKSIZE )
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r " ) as f :
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2017-12-17 14:44:32 +02:00
with closing ( buf ) :
f . readinto ( buf )
assert buf [ : BLOCKSIZE ] == b " \0 " * BLOCKSIZE
assert buf [ BLOCKSIZE : ] == b " x " * BLOCKSIZE * 2
@requires_root
def test_zeroout_middle ( loop_device ) :
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r+ " ) as f :
2017-12-17 14:44:32 +02:00
ioutil . blkzeroout ( f . fileno ( ) , BLOCKSIZE , BLOCKSIZE )
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r " ) as f :
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2017-12-17 14:44:32 +02:00
with closing ( buf ) :
f . readinto ( buf )
assert buf [ : BLOCKSIZE ] == b " x " * BLOCKSIZE
assert buf [ BLOCKSIZE : - BLOCKSIZE ] == b " \0 " * BLOCKSIZE
assert buf [ - BLOCKSIZE : ] == b " x " * BLOCKSIZE
@requires_root
def test_zeroout_end ( loop_device ) :
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r+ " ) as f :
2017-12-17 14:44:32 +02:00
ioutil . blkzeroout ( f . fileno ( ) , BLOCKSIZE * 2 , BLOCKSIZE )
2018-12-03 17:27:02 +02:00
with util . open ( loop_device , " r " ) as f :
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2017-12-17 14:44:32 +02:00
with closing ( buf ) :
f . readinto ( buf )
assert buf [ : - BLOCKSIZE ] == b " x " * BLOCKSIZE * 2
assert buf [ - BLOCKSIZE : ] == b " \0 " * BLOCKSIZE
2019-07-13 23:12:27 +03:00
@requires_root
def test_blksszget_512 ( loop_device ) :
with util . open ( loop_device , " r+ " ) as f :
assert ioutil . blksszget ( f . fileno ( ) ) == 512
@requires_root
def test_blksszget_bad_fd ( loop_device ) :
with pytest . raises ( OSError ) :
ioutil . blksszget ( - 1 )
@requires_root
def test_blksszget_not_block_device ( loop_device , tmpfile ) :
with open ( tmpfile ) as f :
with pytest . raises ( OSError ) :
ioutil . blksszget ( f . fileno ( ) )
2017-12-17 14:44:32 +02:00
# Empty zero buffer
@pytest.mark.parametrize ( " buf " , [
2020-03-18 22:01:12 +01:00
pytest . param ( " " , id = " unicode " ) ,
2017-12-17 14:44:32 +02:00
pytest . param ( b " " , id = " bytes " ) ,
pytest . param ( bytearray ( ) , id = " bytearray " ) ,
] )
def test_is_zero_empty ( buf ) :
assert ioutil . is_zero ( buf )
@pytest.mark.parametrize ( " buf " , [
pytest . param ( b " " , id = " bytes " ) ,
pytest . param ( bytearray ( ) , id = " bytearray " ) ,
] )
def test_is_zero_empty_memoryview ( buf ) :
assert ioutil . is_zero ( memoryview ( buf ) )
# Non-empty zero buffer
@pytest.mark.parametrize ( " buf " , [
2020-03-18 22:01:12 +01:00
pytest . param ( " \u0000 " * 512 , id = " unicode " ) ,
2017-12-17 14:44:32 +02:00
pytest . param ( b " \0 " * 512 , id = " bytes " ) ,
pytest . param ( bytearray ( 512 ) , id = " bytearray " ) ,
] )
def test_is_zero ( buf ) :
assert ioutil . is_zero ( buf )
@pytest.mark.parametrize ( " buf " , [
pytest . param ( b " \0 " * 512 , id = " bytes " ) ,
pytest . param ( bytearray ( 512 ) , id = " bytearray " ) ,
] )
def test_is_zero_memoryview ( buf ) :
assert ioutil . is_zero ( memoryview ( buf ) )
# Non-zero buffer with non-zero in first 16 bytes
@pytest.mark.parametrize ( " buf " , [
2020-03-18 22:01:12 +01:00
pytest . param ( " \u0000 " * 15 + " x " , id = " unicode " ) ,
2017-12-17 14:44:32 +02:00
pytest . param ( b " \0 " * 15 + b " x " , id = " bytes " ) ,
pytest . param ( bytearray ( b " \0 " * 15 + b " x " ) , id = " bytearray " ) ,
] )
def test_is_not_zero_head ( buf ) :
assert not ioutil . is_zero ( buf )
@pytest.mark.parametrize ( " buf " , [
pytest . param ( b " \0 " * 15 + b " x " , id = " bytes " ) ,
pytest . param ( bytearray ( b " \0 " * 15 + b " x " ) , id = " bytearray " ) ,
] )
def test_is_not_zero_head_memoryview ( buf ) :
assert not ioutil . is_zero ( memoryview ( buf ) )
# Non-zero buffer
@pytest.mark.parametrize ( " buf " , [
2020-03-18 22:01:12 +01:00
pytest . param ( " \u0000 " * 511 + " x " , id = " unicode " ) ,
2017-12-17 14:44:32 +02:00
pytest . param ( b " \0 " * 511 + b " x " , id = " bytes " ) ,
pytest . param ( bytearray ( b " \0 " * 511 + b " x " ) , id = " bytearray " ) ,
] )
def test_is_not_zero ( buf ) :
assert not ioutil . is_zero ( buf )
@pytest.mark.parametrize ( " buf " , [
pytest . param ( b " \0 " * 511 + b " x " , id = " bytes " ) ,
pytest . param ( bytearray ( b " \0 " * 511 + b " x " ) , id = " bytearray " ) ,
] )
def test_is_not_zero_memoryview ( buf ) :
assert not ioutil . is_zero ( memoryview ( buf ) )
# Checking mmap
@pytest.fixture
def aligned_buffer ( ) :
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE )
2017-12-17 14:44:32 +02:00
with closing ( buf ) :
yield buf
def test_is_zero_mmap ( aligned_buffer ) :
assert ioutil . is_zero ( aligned_buffer )
def test_is_not_zero_mmap_head ( aligned_buffer ) :
aligned_buffer [ 15 : 16 ] = b " x "
assert not ioutil . is_zero ( aligned_buffer )
def test_is_not_zero_mmap ( aligned_buffer ) :
aligned_buffer [ - 1 : ] = b " x "
assert not ioutil . is_zero ( aligned_buffer )
2017-12-16 02:07:28 +02:00
# fallocate
fallocate_mode = pytest . mark . parametrize ( " mode " , [
# Zero byte range, allocating space - for preallocated images.
# Not supported on NFS 4.2.
ioutil . FALLOC_FL_ZERO_RANGE ,
# Zero byte range, daallocating space - for sparse images.
# Supported on NFS 4.2.
ioutil . FALLOC_FL_PUNCH_HOLE | ioutil . FALLOC_FL_KEEP_SIZE ,
] )
@fallocate_mode
def test_fallocate_zero_start ( tmpdir , mode ) :
path = str ( tmpdir . join ( " file " ) )
with open ( path , " wb " ) as f :
f . write ( b " x " * BLOCKSIZE * 3 )
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2018-12-03 17:27:02 +02:00
with closing ( buf ) , util . open ( path , " r+ " ) as f :
2017-12-16 02:07:28 +02:00
try_fallocate ( f . fileno ( ) , mode , 0 , BLOCKSIZE )
n = f . readinto ( buf )
assert n == BLOCKSIZE * 3
assert buf [ : BLOCKSIZE ] == b " \0 " * BLOCKSIZE
assert buf [ BLOCKSIZE : ] == b " x " * BLOCKSIZE * 2
assert f . readinto ( buf ) == 0
@fallocate_mode
def test_fallocate_zero_middle ( tmpdir , mode ) :
path = str ( tmpdir . join ( " file " ) )
with open ( path , " wb " ) as f :
f . write ( b " x " * BLOCKSIZE * 3 )
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2018-12-03 17:27:02 +02:00
with closing ( buf ) , util . open ( path , " r+ " ) as f :
2017-12-16 02:07:28 +02:00
try_fallocate ( f . fileno ( ) , mode , BLOCKSIZE , BLOCKSIZE )
n = f . readinto ( buf )
assert n == BLOCKSIZE * 3
assert buf [ : BLOCKSIZE ] == b " x " * BLOCKSIZE
assert buf [ BLOCKSIZE : - BLOCKSIZE ] == b " \0 " * BLOCKSIZE
assert buf [ - BLOCKSIZE : ] == b " x " * BLOCKSIZE
assert f . readinto ( buf ) == 0
@fallocate_mode
def test_fallocate_zero_end ( tmpdir , mode ) :
path = str ( tmpdir . join ( " file " ) )
with open ( path , " wb " ) as f :
f . write ( b " x " * BLOCKSIZE * 3 )
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2018-12-03 17:27:02 +02:00
with closing ( buf ) , util . open ( path , " r+ " ) as f :
2017-12-16 02:07:28 +02:00
try_fallocate ( f . fileno ( ) , mode , BLOCKSIZE * 2 , BLOCKSIZE )
n = f . readinto ( buf )
assert n == BLOCKSIZE * 3
assert buf [ : - BLOCKSIZE ] == b " x " * BLOCKSIZE * 2
assert buf [ - BLOCKSIZE : ] == b " \0 " * BLOCKSIZE
assert f . readinto ( buf ) == 0
def test_fallocate_zero_after_end ( tmpdir ) :
path = str ( tmpdir . join ( " file " ) )
with open ( path , " wb " ) as f :
f . write ( b " x " * BLOCKSIZE * 3 )
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 4 )
2018-12-03 17:27:02 +02:00
with closing ( buf ) , util . open ( path , " r+ " ) as f :
2017-12-16 02:07:28 +02:00
# Will allocate more space that will return zeros when read.
mode = ioutil . FALLOC_FL_ZERO_RANGE
try_fallocate ( f . fileno ( ) , mode , BLOCKSIZE * 3 , BLOCKSIZE )
n = f . readinto ( buf )
assert n == BLOCKSIZE * 4
assert buf [ : - BLOCKSIZE ] == b " x " * BLOCKSIZE * 3
assert buf [ - BLOCKSIZE : ] == b " \0 " * BLOCKSIZE
assert f . readinto ( buf ) == 0
def test_fallocate_punch_hole_after_end ( tmpdir ) :
path = str ( tmpdir . join ( " file " ) )
with open ( path , " wb " ) as f :
f . write ( b " x " * BLOCKSIZE * 3 )
2018-11-18 18:23:12 +02:00
buf = util . aligned_buffer ( BLOCKSIZE * 3 )
2018-12-03 17:27:02 +02:00
with closing ( buf ) , util . open ( path , " r+ " ) as f :
2017-12-16 02:07:28 +02:00
# This does not change file contents or size.
mode = ioutil . FALLOC_FL_PUNCH_HOLE | ioutil . FALLOC_FL_KEEP_SIZE
try_fallocate ( f . fileno ( ) , mode , BLOCKSIZE * 3 , BLOCKSIZE )
n = f . readinto ( buf )
assert n == BLOCKSIZE * 3
assert buf [ : ] == b " x " * BLOCKSIZE * 3
assert f . readinto ( buf ) == 0
def try_fallocate ( fd , mode , offset , count ) :
try :
ioutil . fallocate ( fd , mode , offset , count )
except EnvironmentError as e :
if e . errno != errno . EOPNOTSUPP :
raise
pytest . skip ( " fallocate(mode= %r ) not supported " % mode )