mirror of
git://git.proxmox.com/git/proxmox-backup.git
synced 2025-01-18 06:03:59 +03:00
client: chunk stream: switch payload stream chunker
Use the dedicated chunker with boundary suggestions for the payload stream, by attaching the channel sender to the archiver and the channel receiver to the payload stream chunker. The archiver sends the file boundaries for the chunker to consume. Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
parent
589f510e7d
commit
5a5d454083
@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
|
||||
.map_err(Error::from);
|
||||
|
||||
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
|
||||
let mut chunk_stream = ChunkStream::new(stream, None, None);
|
||||
let mut chunk_stream = ChunkStream::new(stream, None, None, None);
|
||||
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
|
@ -7,7 +7,7 @@ use bytes::BytesMut;
|
||||
use futures::ready;
|
||||
use futures::stream::{Stream, TryStream};
|
||||
|
||||
use pbs_datastore::{Chunker, ChunkerImpl};
|
||||
use pbs_datastore::{Chunker, ChunkerImpl, PayloadChunker};
|
||||
|
||||
use crate::inject_reused_chunks::InjectChunks;
|
||||
|
||||
@ -42,11 +42,20 @@ pub struct ChunkStream<S: Unpin> {
|
||||
}
|
||||
|
||||
impl<S: Unpin> ChunkStream<S> {
|
||||
pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
|
||||
pub fn new(
|
||||
input: S,
|
||||
chunk_size: Option<usize>,
|
||||
injection_data: Option<InjectionData>,
|
||||
suggested_boundaries: Option<mpsc::Receiver<u64>>,
|
||||
) -> Self {
|
||||
let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024);
|
||||
Self {
|
||||
input,
|
||||
chunker: Box::new(ChunkerImpl::new(chunk_size)),
|
||||
chunker: if let Some(suggested) = suggested_boundaries {
|
||||
Box::new(PayloadChunker::new(chunk_size, suggested))
|
||||
} else {
|
||||
Box::new(ChunkerImpl::new(chunk_size))
|
||||
},
|
||||
buffer: BytesMut::new(),
|
||||
scan_pos: 0,
|
||||
consumed: 0,
|
||||
|
@ -169,6 +169,7 @@ struct Archiver {
|
||||
file_copy_buffer: Vec<u8>,
|
||||
skip_e2big_xattr: bool,
|
||||
forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
|
||||
suggested_boundaries: Option<mpsc::Sender<u64>>,
|
||||
previous_payload_index: Option<DynamicIndexReader>,
|
||||
cache: PxarLookaheadCache,
|
||||
reuse_stats: ReuseStats,
|
||||
@ -197,6 +198,7 @@ pub async fn create_archive<T, F>(
|
||||
callback: F,
|
||||
options: PxarCreateOptions,
|
||||
forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
|
||||
suggested_boundaries: Option<mpsc::Sender<u64>>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
T: SeqWrite + Send,
|
||||
@ -271,6 +273,7 @@ where
|
||||
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
|
||||
skip_e2big_xattr: options.skip_e2big_xattr,
|
||||
forced_boundaries,
|
||||
suggested_boundaries,
|
||||
previous_payload_index,
|
||||
cache: PxarLookaheadCache::new(None),
|
||||
reuse_stats: ReuseStats::default(),
|
||||
@ -862,6 +865,11 @@ impl Archiver {
|
||||
.add_file(c_file_name, file_size, stat.st_mtime)?;
|
||||
}
|
||||
|
||||
if let Some(sender) = self.suggested_boundaries.as_mut() {
|
||||
let offset = encoder.payload_position()?.raw();
|
||||
sender.send(offset)?;
|
||||
}
|
||||
|
||||
let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
|
||||
self.reuse_stats.total_reused_payload_size +=
|
||||
file_size + size_of::<pxar::format::Header>() as u64;
|
||||
|
@ -27,6 +27,7 @@ use crate::pxar::create::PxarWriters;
|
||||
/// consumer.
|
||||
pub struct PxarBackupStream {
|
||||
rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
|
||||
pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
|
||||
handle: Option<AbortHandle>,
|
||||
error: Arc<Mutex<Option<String>>>,
|
||||
}
|
||||
@ -55,22 +56,26 @@ impl PxarBackupStream {
|
||||
));
|
||||
let writer = pxar::encoder::sync::StandardWriter::new(writer);
|
||||
|
||||
let (writer, payload_rx) = if separate_payload_stream {
|
||||
let (tx, rx) = std::sync::mpsc::sync_channel(10);
|
||||
let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
|
||||
buffer_size,
|
||||
StdChannelWriter::new(tx),
|
||||
));
|
||||
(
|
||||
pxar::PxarVariant::Split(
|
||||
writer,
|
||||
pxar::encoder::sync::StandardWriter::new(payload_writer),
|
||||
),
|
||||
Some(rx),
|
||||
)
|
||||
} else {
|
||||
(pxar::PxarVariant::Unified(writer), None)
|
||||
};
|
||||
let (writer, payload_rx, suggested_boundaries_tx, suggested_boundaries_rx) =
|
||||
if separate_payload_stream {
|
||||
let (tx, rx) = std::sync::mpsc::sync_channel(10);
|
||||
let (suggested_boundaries_tx, suggested_boundaries_rx) = std::sync::mpsc::channel();
|
||||
let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
|
||||
buffer_size,
|
||||
StdChannelWriter::new(tx),
|
||||
));
|
||||
(
|
||||
pxar::PxarVariant::Split(
|
||||
writer,
|
||||
pxar::encoder::sync::StandardWriter::new(payload_writer),
|
||||
),
|
||||
Some(rx),
|
||||
Some(suggested_boundaries_tx),
|
||||
Some(suggested_boundaries_rx),
|
||||
)
|
||||
} else {
|
||||
(pxar::PxarVariant::Unified(writer), None, None, None)
|
||||
};
|
||||
|
||||
let error = Arc::new(Mutex::new(None));
|
||||
let error2 = Arc::clone(&error);
|
||||
@ -85,6 +90,7 @@ impl PxarBackupStream {
|
||||
},
|
||||
options,
|
||||
boundaries,
|
||||
suggested_boundaries_tx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@ -99,12 +105,14 @@ impl PxarBackupStream {
|
||||
|
||||
let backup_stream = Self {
|
||||
rx: Some(rx),
|
||||
suggested_boundaries: None,
|
||||
handle: Some(handle.clone()),
|
||||
error: Arc::clone(&error),
|
||||
};
|
||||
|
||||
let backup_payload_stream = payload_rx.map(|rx| Self {
|
||||
rx: Some(rx),
|
||||
suggested_boundaries: suggested_boundaries_rx,
|
||||
handle: Some(handle),
|
||||
error,
|
||||
});
|
||||
|
@ -209,7 +209,7 @@ async fn backup_directory<P: AsRef<Path>>(
|
||||
payload_target.is_some(),
|
||||
)?;
|
||||
|
||||
let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
|
||||
let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None, None);
|
||||
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
|
||||
|
||||
let stream = ReceiverStream::new(rx).map_err(Error::from);
|
||||
@ -223,14 +223,19 @@ async fn backup_directory<P: AsRef<Path>>(
|
||||
|
||||
let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
|
||||
|
||||
if let Some(payload_stream) = payload_stream {
|
||||
if let Some(mut payload_stream) = payload_stream {
|
||||
let payload_target = payload_target
|
||||
.ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
|
||||
|
||||
let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
|
||||
let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
|
||||
let mut payload_chunk_stream =
|
||||
ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
|
||||
let suggested_boundaries = payload_stream.suggested_boundaries.take();
|
||||
let mut payload_chunk_stream = ChunkStream::new(
|
||||
payload_stream,
|
||||
chunk_size,
|
||||
Some(injection_data),
|
||||
suggested_boundaries,
|
||||
);
|
||||
let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
|
||||
let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
|
||||
|
||||
@ -573,7 +578,8 @@ fn spawn_catalog_upload(
|
||||
let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
|
||||
let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
|
||||
let catalog_chunk_size = 512 * 1024;
|
||||
let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
|
||||
let catalog_chunk_stream =
|
||||
ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None, None);
|
||||
|
||||
let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
|
||||
StdChannelWriter::new(catalog_tx),
|
||||
|
@ -364,8 +364,16 @@ fn extract(
|
||||
};
|
||||
|
||||
let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer));
|
||||
create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None)
|
||||
.await
|
||||
create_archive(
|
||||
dir,
|
||||
PxarWriters::new(pxar_writer, None),
|
||||
Flags::DEFAULT,
|
||||
|_| Ok(()),
|
||||
options,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.await;
|
||||
if let Err(err) = result {
|
||||
|
@ -442,6 +442,7 @@ async fn create_archive(
|
||||
},
|
||||
options,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -40,6 +40,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
|
||||
|_| Ok(()),
|
||||
options,
|
||||
None,
|
||||
None,
|
||||
))?;
|
||||
|
||||
Command::new("cmp")
|
||||
|
Loading…
x
Reference in New Issue
Block a user