7de35dc243
To reuse dynamic entries of a previous backup run and index them for the new snapshot. Adds a non-blocking channel between the pxar archiver and the chunk stream, as well as the chunk stream and the backup writer. The archiver sends forced boundary positions and the dynamic entries to inject into the chunk stream following this boundary. The chunk stream consumes this channel inputs as receiver whenever a new chunk is requested by the upload stream, forcing a non-regular chunk boundary in the pxar stream at the requested positions. The dynamic entries to inject and the boundary are then send via the second asynchronous channel to the backup writer's upload stream, indexing them by inserting the dynamic entries as known chunks into the upload stream. Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
62 lines
1.6 KiB
Rust
62 lines
1.6 KiB
Rust
use anyhow::Error;
|
|
use futures::*;
|
|
|
|
extern crate proxmox_backup;
|
|
|
|
use pbs_client::ChunkStream;
|
|
|
|
// Test Chunker with real data read from a file.
|
|
//
|
|
// To generate some test input use:
|
|
// # dd if=/dev/urandom of=random-test.dat bs=1M count=1024 iflag=fullblock
|
|
//
|
|
// Note: I can currently get about 830MB/s
|
|
|
|
fn main() {
|
|
if let Err(err) = proxmox_async::runtime::main(run()) {
|
|
panic!("ERROR: {}", err);
|
|
}
|
|
}
|
|
|
|
async fn run() -> Result<(), Error> {
|
|
let file = tokio::fs::File::open("random-test.dat").await?;
|
|
|
|
let stream = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
|
|
.map_ok(|bytes| bytes.to_vec())
|
|
.map_err(Error::from);
|
|
|
|
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
|
|
let mut chunk_stream = ChunkStream::new(stream, None, None);
|
|
|
|
let start_time = std::time::Instant::now();
|
|
|
|
let mut repeat = 0;
|
|
let mut stream_len = 0;
|
|
while let Some(chunk) = chunk_stream.try_next().await? {
|
|
if chunk.len() > 16 * 1024 * 1024 {
|
|
panic!("Chunk too large {}", chunk.len());
|
|
}
|
|
|
|
repeat += 1;
|
|
stream_len += chunk.len();
|
|
|
|
println!("Got chunk {}", chunk.len());
|
|
}
|
|
|
|
let speed =
|
|
((stream_len * 1_000_000) / (1024 * 1024)) / (start_time.elapsed().as_micros() as usize);
|
|
println!(
|
|
"Uploaded {} chunks in {} seconds ({} MB/s).",
|
|
repeat,
|
|
start_time.elapsed().as_secs(),
|
|
speed
|
|
);
|
|
println!("Average chunk size was {} bytes.", stream_len / repeat);
|
|
println!(
|
|
"time per request: {} microseconds.",
|
|
(start_time.elapsed().as_micros()) / (repeat as u128)
|
|
);
|
|
|
|
Ok(())
|
|
}
|