client: pxar: switch to stack based encoder state

... and adapt to the new reader/writer variant for encoder or
decoder/accessor to attach a dedicated payload input/output for split
pxar archives.

In preparation for look-ahead caching, where a passing around of
per-directory level encoder instances with internal references is
not feasible.

Previously, for each directory level a new encoder instance has been
generated, restricting possible implementation errors. These encoder
instances have been internally linked by references to keep track of
the state changes in a parent child relationship.

This is however not feasible when the encoder has to be passed by
mutable reference, as required by the look-ahead cache
implementation. The encoder has therefore been adapted to use a
single instance implementation with an internal stack keeping track
of the state.

Depends on the bumped pxar library version, including the patches to
attach the corresponding variant for the pxar reader/writer
instantiation.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-02-20 17:07:08 +01:00 committed by Fabian Grünbichler
parent 4940514b0f
commit e2784a594e
11 changed files with 25 additions and 17 deletions

View File

@ -170,7 +170,7 @@ where
set.insert(stat.st_dev);
}
let mut encoder = Encoder::new(&mut writer, &metadata).await?;
let mut encoder = Encoder::new(pxar::PxarVariant::Unified(&mut writer), &metadata).await?;
let mut patterns = options.patterns;
@ -203,6 +203,8 @@ where
.archive_dir_contents(&mut encoder, source_dir, true)
.await?;
encoder.finish().await?;
encoder.close().await?;
Ok(())
}
@ -663,7 +665,7 @@ impl Archiver {
) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
let mut encoder = encoder.create_directory(dir_name, metadata).await?;
encoder.create_directory(dir_name, metadata).await?;
let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags;
@ -686,7 +688,7 @@ impl Archiver {
log::info!("skipping mount point: {:?}", self.path);
Ok(())
} else {
self.archive_dir_contents(&mut encoder, dir, false).await
self.archive_dir_contents(encoder, dir, false).await
};
self.fs_magic = old_fs_magic;

View File

@ -66,7 +66,7 @@ impl Session {
let file = std::fs::File::open(archive_path)?;
let file_size = file.metadata()?.len();
let reader: Reader = Arc::new(accessor::sync::FileReader::new(file));
let accessor = Accessor::new(reader, file_size).await?;
let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), file_size).await?;
Self::mount(accessor, options, verbose, mountpoint)
}

View File

@ -220,7 +220,8 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
let decoder =
pbs_pxar_fuse::Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
client.download(CATALOG_NAME, &mut tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)

View File

@ -1458,7 +1458,7 @@ async fn restore(
if let Some(target) = target {
pbs_client::pxar::extract_archive(
pxar::decoder::Decoder::from_std(reader)?,
pxar::decoder::Decoder::from_std(pxar::PxarVariant::Unified(reader))?,
Path::new(target),
feature_flags,
|path| {

View File

@ -296,7 +296,8 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
let decoder =
pbs_pxar_fuse::Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
let session =
pbs_pxar_fuse::Session::mount(decoder, options, false, Path::new(target.unwrap()))

View File

@ -457,7 +457,7 @@ async fn extract(
let archive_size = reader.archive_size();
let reader = LocalDynamicReadAt::new(reader);
let decoder = Accessor::new(reader, archive_size).await?;
let decoder = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
extract_to_target(decoder, &path, target, format, zstd).await?;
}
ExtractPath::VM(file, path) => {
@ -483,7 +483,7 @@ async fn extract(
false,
)
.await?;
let decoder = Decoder::from_tokio(reader).await?;
let decoder = Decoder::from_tokio(pxar::PxarVariant::Unified(reader)).await?;
extract_sub_dir_seq(&target, decoder).await?;
// we extracted a .pxarexclude-cli file auto-generated by the VM when encoding the

View File

@ -26,7 +26,7 @@ fn extract_archive_from_reader<R: std::io::Read>(
options: PxarExtractOptions,
) -> Result<(), Error> {
pbs_client::pxar::extract_archive(
pxar::decoder::Decoder::from_std(reader)?,
pxar::decoder::Decoder::from_std(pxar::PxarVariant::Unified(reader))?,
Path::new(target),
feature_flags,
|path| {
@ -436,7 +436,7 @@ async fn mount_archive(archive: String, mountpoint: String, verbose: bool) -> Re
)]
/// List the contents of an archive.
fn dump_archive(archive: String) -> Result<(), Error> {
for entry in pxar::decoder::Decoder::open(archive)? {
for entry in pxar::decoder::Decoder::open(pxar::PxarVariant::Unified(archive))? {
let entry = entry?;
if log::log_enabled!(log::Level::Debug) {

View File

@ -1813,7 +1813,7 @@ pub fn pxar_file_download(
let (reader, archive_size) =
get_local_pxar_reader(datastore.clone(), &manifest, &backup_dir, pxar_name)?;
let decoder = Accessor::new(reader, archive_size).await?;
let decoder = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
let root = decoder.open_root().await?;
let path = OsStr::from_bytes(file_path).to_os_string();
let file = root

View File

@ -1069,7 +1069,8 @@ fn restore_snapshots_to_tmpdir(
"File {file_num}: snapshot archive {source_datastore}:{snapshot}",
);
let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
let mut decoder =
pxar::decoder::sync::Decoder::from_std(pxar::PxarVariant::Unified(reader))?;
let target_datastore = match store_map.target_store(&source_datastore) {
Some(datastore) => datastore,
@ -1685,7 +1686,7 @@ fn restore_snapshot_archive<'a>(
reader: Box<dyn 'a + TapeRead>,
snapshot_path: &Path,
) -> Result<bool, Error> {
let mut decoder = pxar::decoder::sync::Decoder::from_std(reader)?;
let mut decoder = pxar::decoder::sync::Decoder::from_std(pxar::PxarVariant::Unified(reader))?;
match try_restore_snapshot_archive(worker, &mut decoder, snapshot_path) {
Ok(_) => Ok(true),
Err(err) => {

View File

@ -277,7 +277,7 @@ async fn open_dynamic_index(
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
let reader: Arc<dyn ReadAt + Send + Sync> = Arc::new(LocalDynamicReadAt::new(reader));
let accessor = Accessor::new(reader, archive_size).await?;
let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
Ok((lookup_index, accessor))
}

View File

@ -58,8 +58,10 @@ pub fn tape_write_snapshot_archive<'a>(
));
}
let mut encoder =
pxar::encoder::sync::Encoder::new(PxarTapeWriter::new(writer), &root_metadata)?;
let mut encoder = pxar::encoder::sync::Encoder::new(
pxar::PxarVariant::Unified(PxarTapeWriter::new(writer)),
&root_metadata,
)?;
for filename in file_list.iter() {
let mut file = snapshot_reader.open_file(filename).map_err(|err| {
@ -89,6 +91,7 @@ pub fn tape_write_snapshot_archive<'a>(
}
}
encoder.finish()?;
encoder.close()?;
Ok(())
});