chunk stream: tests: add regression tests for payload chunker

Regression tests to cover suggested and forced boundaries as well as
chunk injection.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-05-14 11:57:47 +02:00 committed by Fabian Grünbichler
parent e11ee319ce
commit 589f510e7d

View File

@ -228,3 +228,120 @@ where
}
}
}
#[cfg(test)]
mod test {
use futures::stream::StreamExt;
use super::*;
struct DummyInput {
data: Vec<u8>,
}
impl DummyInput {
fn new(data: Vec<u8>) -> Self {
Self { data }
}
}
impl Stream for DummyInput {
type Item = Result<Vec<u8>, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.data.len() {
0 => Poll::Ready(None),
size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))),
_ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))),
}
}
}
#[test]
fn test_chunk_stream_forced_boundaries() {
let mut data = Vec::new();
for i in 0..(256 * 1024) {
for j in 0..4 {
let byte = ((i >> (j << 3)) & 0xff) as u8;
data.push(byte);
}
}
let mut input = DummyInput::new(data);
let input = Pin::new(&mut input);
let (injections_tx, injections_rx) = mpsc::channel();
let (boundaries_tx, boundaries_rx) = mpsc::channel();
let (suggested_tx, suggested_rx) = mpsc::channel();
let injection_data = InjectionData::new(boundaries_rx, injections_tx);
let mut chunk_stream = ChunkStream::new(
input,
Some(64 * 1024),
Some(injection_data),
Some(suggested_rx),
);
let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let chunks_clone = chunks.clone();
// Suggested boundary matching forced boundary
suggested_tx.send(32 * 1024).unwrap();
// Suggested boundary not matching forced boundary
suggested_tx.send(64 * 1024).unwrap();
// Force chunk boundary at suggested boundary
boundaries_tx
.send(InjectChunks {
boundary: 32 * 1024,
chunks: Vec::new(),
size: 1024,
})
.unwrap();
// Force chunk boundary within regular chunk
boundaries_tx
.send(InjectChunks {
boundary: 128 * 1024,
chunks: Vec::new(),
size: 2048,
})
.unwrap();
// Force chunk boundary aligned with regular boundary
boundaries_tx
.send(InjectChunks {
boundary: 657408,
chunks: Vec::new(),
size: 512,
})
.unwrap();
// Force chunk boundary within regular chunk, without injecting data
boundaries_tx
.send(InjectChunks {
boundary: 657408 + 1024,
chunks: Vec::new(),
size: 0,
})
.unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
while let Some(chunk) = chunk_stream.next().await {
let chunk = chunk.unwrap();
let mut chunks = chunks.lock().unwrap();
chunks.push(chunk);
}
});
let mut total = 0;
let chunks = chunks_clone.lock().unwrap();
let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584];
for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) {
assert_eq!(chunk.len(), *expected);
total += chunk.len();
}
while let Ok(injection) = injections_rx.recv() {
total += injection.size;
}
assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512);
}
}