datastore: chunker: implement chunker for payload stream

Implement the Chunker trait for a dedicated payload stream chunker,
which extends the regular chunker by the option to suggest boundaries
to be used over the hast based boundaries whenever possible.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-04-19 15:47:11 +02:00 committed by Fabian Grünbichler
parent e321815635
commit 88ef759cc4
2 changed files with 91 additions and 1 deletions

View File

@ -1,3 +1,5 @@
use std::sync::mpsc::Receiver;
/// Note: window size 32 or 64, is faster because we can
/// speedup modulo operations, but always computes hash 0
/// for constant data streams .. 0,0,0,0,0,0
@ -46,6 +48,16 @@ pub struct ChunkerImpl {
window: [u8; CA_CHUNKER_WINDOW_SIZE],
}
/// Sliding window chunker (Buzhash) with boundary suggestions
///
/// Suggest to chunk at a given boundary instead of the regular chunk boundary for better alignment
/// with file payload boundaries.
pub struct PayloadChunker {
chunker: ChunkerImpl,
current_suggested: Option<u64>,
suggested_boundaries: Receiver<u64>,
}
const BUZHASH_TABLE: [u32; 256] = [
0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801,
0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494,
@ -221,6 +233,84 @@ impl Chunker for ChunkerImpl {
}
}
impl PayloadChunker {
/// Create a new PayloadChunker instance, which produces and average
/// chunk size of `chunk_size_avg` (need to be a power of two), if no
/// suggested boundaries are provided.
/// Use suggested boundaries instead, whenever the chunk size is within
/// the min - max range.
pub fn new(chunk_size_avg: usize, suggested_boundaries: Receiver<u64>) -> Self {
Self {
chunker: ChunkerImpl::new(chunk_size_avg),
current_suggested: None,
suggested_boundaries,
}
}
}
impl Chunker for PayloadChunker {
fn scan(&mut self, data: &[u8], ctx: &Context) -> usize {
assert!(ctx.total >= data.len() as u64);
let pos = ctx.total - data.len() as u64;
loop {
if let Some(boundary) = self.current_suggested {
if boundary < ctx.base + pos {
log::debug!("Boundary {boundary} in past");
// ignore passed boundaries
self.current_suggested = None;
continue;
}
if boundary > ctx.base + ctx.total {
log::debug!("Boundary {boundary} in future");
// boundary in future, cannot decide yet
return self.chunker.scan(data, ctx);
}
let chunk_size = (boundary - ctx.base) as usize;
if chunk_size < self.chunker.chunk_size_min {
log::debug!("Chunk size {chunk_size} below minimum chunk size");
// chunk to small, ignore boundary
self.current_suggested = None;
continue;
}
if chunk_size <= self.chunker.chunk_size_max {
self.current_suggested = None;
// calculate boundary relative to start of given data buffer
let len = chunk_size - pos as usize;
if len == 0 {
// passed this one, previous scan did not know about boundary just yet
return self.chunker.scan(data, ctx);
}
self.chunker.reset();
log::debug!(
"Chunk at suggested boundary: {boundary}, chunk size: {chunk_size}"
);
return len;
}
log::debug!("Chunk {chunk_size} to big, regular scan");
// chunk to big, cannot decide yet
// scan for hash based chunk boundary instead
return self.chunker.scan(data, ctx);
}
if let Ok(boundary) = self.suggested_boundaries.try_recv() {
self.current_suggested = Some(boundary);
} else {
log::debug!("No suggested boundary, regular scan");
return self.chunker.scan(data, ctx);
}
}
}
fn reset(&mut self) {
self.chunker.reset();
}
}
#[test]
fn test_chunker1() {
let mut buffer = Vec::new();

View File

@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo};
pub use checksum_reader::ChecksumReader;
pub use checksum_writer::ChecksumWriter;
pub use chunk_store::ChunkStore;
pub use chunker::{Chunker, ChunkerImpl};
pub use chunker::{Chunker, ChunkerImpl, PayloadChunker};
pub use crypt_reader::CryptReader;
pub use crypt_writer::CryptWriter;
pub use data_blob::DataBlob;