diff --git a/pbs-datastore/src/chunker.rs b/pbs-datastore/src/chunker.rs index d75e63fa..d0543bca 100644 --- a/pbs-datastore/src/chunker.rs +++ b/pbs-datastore/src/chunker.rs @@ -1,3 +1,5 @@ +use std::sync::mpsc::Receiver; + /// Note: window size 32 or 64, is faster because we can /// speedup modulo operations, but always computes hash 0 /// for constant data streams .. 0,0,0,0,0,0 @@ -46,6 +48,16 @@ pub struct ChunkerImpl { window: [u8; CA_CHUNKER_WINDOW_SIZE], } +/// Sliding window chunker (Buzhash) with boundary suggestions +/// +/// Suggest to chunk at a given boundary instead of the regular chunk boundary for better alignment +/// with file payload boundaries. +pub struct PayloadChunker { + chunker: ChunkerImpl, + current_suggested: Option, + suggested_boundaries: Receiver, +} + const BUZHASH_TABLE: [u32; 256] = [ 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801, 0x7ebf5191, 0x841135c7, 0x65cc53b3, 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494, @@ -221,6 +233,84 @@ impl Chunker for ChunkerImpl { } } +impl PayloadChunker { + /// Create a new PayloadChunker instance, which produces and average + /// chunk size of `chunk_size_avg` (need to be a power of two), if no + /// suggested boundaries are provided. + /// Use suggested boundaries instead, whenever the chunk size is within + /// the min - max range. + pub fn new(chunk_size_avg: usize, suggested_boundaries: Receiver) -> Self { + Self { + chunker: ChunkerImpl::new(chunk_size_avg), + current_suggested: None, + suggested_boundaries, + } + } +} + +impl Chunker for PayloadChunker { + fn scan(&mut self, data: &[u8], ctx: &Context) -> usize { + assert!(ctx.total >= data.len() as u64); + let pos = ctx.total - data.len() as u64; + + loop { + if let Some(boundary) = self.current_suggested { + if boundary < ctx.base + pos { + log::debug!("Boundary {boundary} in past"); + // ignore passed boundaries + self.current_suggested = None; + continue; + } + + if boundary > ctx.base + ctx.total { + log::debug!("Boundary {boundary} in future"); + // boundary in future, cannot decide yet + return self.chunker.scan(data, ctx); + } + + let chunk_size = (boundary - ctx.base) as usize; + if chunk_size < self.chunker.chunk_size_min { + log::debug!("Chunk size {chunk_size} below minimum chunk size"); + // chunk to small, ignore boundary + self.current_suggested = None; + continue; + } + + if chunk_size <= self.chunker.chunk_size_max { + self.current_suggested = None; + // calculate boundary relative to start of given data buffer + let len = chunk_size - pos as usize; + if len == 0 { + // passed this one, previous scan did not know about boundary just yet + return self.chunker.scan(data, ctx); + } + self.chunker.reset(); + log::debug!( + "Chunk at suggested boundary: {boundary}, chunk size: {chunk_size}" + ); + return len; + } + + log::debug!("Chunk {chunk_size} to big, regular scan"); + // chunk to big, cannot decide yet + // scan for hash based chunk boundary instead + return self.chunker.scan(data, ctx); + } + + if let Ok(boundary) = self.suggested_boundaries.try_recv() { + self.current_suggested = Some(boundary); + } else { + log::debug!("No suggested boundary, regular scan"); + return self.chunker.scan(data, ctx); + } + } + } + + fn reset(&mut self) { + self.chunker.reset(); + } +} + #[test] fn test_chunker1() { let mut buffer = Vec::new(); diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index 24429626..3e4aa34c 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo}; pub use checksum_reader::ChecksumReader; pub use checksum_writer::ChecksumWriter; pub use chunk_store::ChunkStore; -pub use chunker::{Chunker, ChunkerImpl}; +pub use chunker::{Chunker, ChunkerImpl, PayloadChunker}; pub use crypt_reader::CryptReader; pub use crypt_writer::CryptWriter; pub use data_blob::DataBlob;