client: implement prepare reference method
Implement a method that prepares the decoder instance to access a previous snapshots metadata index and payload index in order to pass it to the pxar archiver. The archiver than can utilize these to compare the metadata for files to the previous state and gather reusable chunks. Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
parent
7c00ec904d
commit
fdea4e5327
@ -18,6 +18,8 @@ use nix::sys::stat::{FileStat, Mode};
|
||||
|
||||
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
|
||||
use proxmox_sys::error::SysError;
|
||||
use pxar::accessor::aio::{Accessor, Directory};
|
||||
use pxar::accessor::ReadAt;
|
||||
use pxar::encoder::{LinkOffset, SeqWrite};
|
||||
use pxar::{Metadata, PxarVariant};
|
||||
|
||||
@ -35,7 +37,7 @@ use crate::pxar::tools::assert_single_path_component;
|
||||
use crate::pxar::Flags;
|
||||
|
||||
/// Pxar options for creating a pxar archive/stream
|
||||
#[derive(Default, Clone)]
|
||||
#[derive(Default)]
|
||||
pub struct PxarCreateOptions {
|
||||
/// Device/mountpoint st_dev numbers that should be included. None for no limitation.
|
||||
pub device_set: Option<HashSet<u64>>,
|
||||
@ -47,6 +49,20 @@ pub struct PxarCreateOptions {
|
||||
pub skip_lost_and_found: bool,
|
||||
/// Skip xattrs of files that return E2BIG error
|
||||
pub skip_e2big_xattr: bool,
|
||||
/// Reference state for partial backups
|
||||
pub previous_ref: Option<PxarPrevRef>,
|
||||
}
|
||||
|
||||
pub type MetadataArchiveReader = Arc<dyn ReadAt + Send + Sync + 'static>;
|
||||
|
||||
/// Statefull information of previous backups snapshots for partial backups
|
||||
pub struct PxarPrevRef {
|
||||
/// Reference accessor for metadata comparison
|
||||
pub accessor: Accessor<MetadataArchiveReader>,
|
||||
/// Reference index for reusing payload chunks
|
||||
pub payload_index: DynamicIndexReader,
|
||||
/// Reference archive name for partial backups
|
||||
pub archive_name: String,
|
||||
}
|
||||
|
||||
fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
|
||||
@ -136,6 +152,7 @@ struct Archiver {
|
||||
file_copy_buffer: Vec<u8>,
|
||||
skip_e2big_xattr: bool,
|
||||
forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
|
||||
previous_payload_index: Option<DynamicIndexReader>,
|
||||
}
|
||||
|
||||
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
|
||||
@ -200,6 +217,15 @@ where
|
||||
MatchType::Exclude,
|
||||
)?);
|
||||
}
|
||||
let (previous_payload_index, previous_metadata_accessor) =
|
||||
if let Some(refs) = options.previous_ref {
|
||||
(
|
||||
Some(refs.payload_index),
|
||||
refs.accessor.open_root().await.ok(),
|
||||
)
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let mut archiver = Archiver {
|
||||
feature_flags,
|
||||
@ -217,10 +243,11 @@ where
|
||||
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
|
||||
skip_e2big_xattr: options.skip_e2big_xattr,
|
||||
forced_boundaries,
|
||||
previous_payload_index,
|
||||
};
|
||||
|
||||
archiver
|
||||
.archive_dir_contents(&mut encoder, source_dir, true)
|
||||
.archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
|
||||
.await?;
|
||||
encoder.finish().await?;
|
||||
encoder.close().await?;
|
||||
@ -252,6 +279,7 @@ impl Archiver {
|
||||
fn archive_dir_contents<'a, T: SeqWrite + Send>(
|
||||
&'a mut self,
|
||||
encoder: &'a mut Encoder<'_, T>,
|
||||
mut previous_metadata_accessor: Option<Directory<MetadataArchiveReader>>,
|
||||
mut dir: Dir,
|
||||
is_root: bool,
|
||||
) -> BoxFuture<'a, Result<(), Error>> {
|
||||
@ -286,9 +314,15 @@ impl Archiver {
|
||||
|
||||
(self.callback)(&file_entry.path)?;
|
||||
self.path = file_entry.path;
|
||||
self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
|
||||
.await
|
||||
.map_err(|err| self.wrap_err(err))?;
|
||||
self.add_entry(
|
||||
encoder,
|
||||
&mut previous_metadata_accessor,
|
||||
dir_fd,
|
||||
&file_entry.name,
|
||||
&file_entry.stat,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| self.wrap_err(err))?;
|
||||
}
|
||||
self.path = old_path;
|
||||
self.entry_counter = entry_counter;
|
||||
@ -536,6 +570,7 @@ impl Archiver {
|
||||
async fn add_entry<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
|
||||
parent: RawFd,
|
||||
c_file_name: &CStr,
|
||||
stat: &FileStat,
|
||||
@ -625,7 +660,14 @@ impl Archiver {
|
||||
catalog.lock().unwrap().start_directory(c_file_name)?;
|
||||
}
|
||||
let result = self
|
||||
.add_directory(encoder, dir, c_file_name, &metadata, stat)
|
||||
.add_directory(
|
||||
encoder,
|
||||
previous_metadata,
|
||||
dir,
|
||||
c_file_name,
|
||||
&metadata,
|
||||
stat,
|
||||
)
|
||||
.await;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().end_directory()?;
|
||||
@ -678,6 +720,7 @@ impl Archiver {
|
||||
async fn add_directory<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
|
||||
dir: Dir,
|
||||
dir_name: &CStr,
|
||||
metadata: &Metadata,
|
||||
@ -708,7 +751,17 @@ impl Archiver {
|
||||
log::info!("skipping mount point: {:?}", self.path);
|
||||
Ok(())
|
||||
} else {
|
||||
self.archive_dir_contents(encoder, dir, false).await
|
||||
let mut dir_accessor = None;
|
||||
if let Some(accessor) = previous_metadata_accessor.as_mut() {
|
||||
if let Some(file_entry) = accessor.lookup(dir_name).await? {
|
||||
if file_entry.entry().is_dir() {
|
||||
let dir = file_entry.enter_directory().await?;
|
||||
dir_accessor = Some(dir);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.archive_dir_contents(encoder, dir_accessor, dir, false)
|
||||
.await
|
||||
};
|
||||
|
||||
self.fs_magic = old_fs_magic;
|
||||
|
@ -56,7 +56,9 @@ pub(crate) mod tools;
|
||||
mod flags;
|
||||
pub use flags::Flags;
|
||||
|
||||
pub use create::{create_archive, PxarCreateOptions, PxarWriters};
|
||||
pub use create::{
|
||||
create_archive, MetadataArchiveReader, PxarCreateOptions, PxarPrevRef, PxarWriters,
|
||||
};
|
||||
pub use extract::{
|
||||
create_tar, create_zip, extract_archive, extract_sub_dir, extract_sub_dir_seq, ErrorHandler,
|
||||
OverwriteFlags, PxarExtractContext, PxarExtractOptions,
|
||||
|
@ -21,6 +21,7 @@ use proxmox_router::{cli::*, ApiMethod, RpcEnvironment};
|
||||
use proxmox_schema::api;
|
||||
use proxmox_sys::fs::{file_get_json, image_size, replace_file, CreateOptions};
|
||||
use proxmox_time::{epoch_i64, strftime_local};
|
||||
use pxar::accessor::aio::Accessor;
|
||||
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
|
||||
|
||||
use pbs_api_types::{
|
||||
@ -30,7 +31,7 @@ use pbs_api_types::{
|
||||
BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
|
||||
};
|
||||
use pbs_client::catalog_shell::Shell;
|
||||
use pbs_client::pxar::ErrorHandler as PxarErrorHandler;
|
||||
use pbs_client::pxar::{ErrorHandler as PxarErrorHandler, MetadataArchiveReader, PxarPrevRef};
|
||||
use pbs_client::tools::{
|
||||
complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
|
||||
complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
|
||||
@ -43,14 +44,14 @@ use pbs_client::tools::{
|
||||
CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
|
||||
};
|
||||
use pbs_client::{
|
||||
delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
|
||||
BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
|
||||
FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
|
||||
delete_ticket_info, parse_backup_specification, view_task_result, BackupDetectionMode,
|
||||
BackupReader, BackupRepository, BackupSpecificationType, BackupStats, BackupWriter,
|
||||
ChunkStream, FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
|
||||
UploadOptions, BACKUP_SOURCE_SCHEMA,
|
||||
};
|
||||
use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
|
||||
use pbs_datastore::chunk_store::verify_chunk_size;
|
||||
use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
|
||||
use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
|
||||
use pbs_datastore::fixed_index::FixedIndexReader;
|
||||
use pbs_datastore::index::IndexFile;
|
||||
use pbs_datastore::manifest::{
|
||||
@ -687,6 +688,10 @@ fn spawn_catalog_upload(
|
||||
schema: TRAFFIC_CONTROL_BURST_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
"change-detection-mode": {
|
||||
type: BackupDetectionMode,
|
||||
optional: true,
|
||||
},
|
||||
"exclude": {
|
||||
type: Array,
|
||||
description: "List of paths or patterns for matching files to exclude.",
|
||||
@ -722,6 +727,7 @@ async fn create_backup(
|
||||
param: Value,
|
||||
all_file_systems: bool,
|
||||
skip_lost_and_found: bool,
|
||||
change_detection_mode: Option<BackupDetectionMode>,
|
||||
dry_run: bool,
|
||||
skip_e2big_xattr: bool,
|
||||
_info: &ApiMethod,
|
||||
@ -881,6 +887,8 @@ async fn create_backup(
|
||||
|
||||
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
|
||||
|
||||
let detection_mode = change_detection_mode.unwrap_or_default();
|
||||
|
||||
let http_client = connect_rate_limited(&repo, rate_limit)?;
|
||||
record_repository(&repo);
|
||||
|
||||
@ -981,7 +989,7 @@ async fn create_backup(
|
||||
None
|
||||
};
|
||||
|
||||
let mut manifest = BackupManifest::new(snapshot);
|
||||
let mut manifest = BackupManifest::new(snapshot.clone());
|
||||
|
||||
let mut catalog = None;
|
||||
let mut catalog_result_rx = None;
|
||||
@ -1028,22 +1036,21 @@ async fn create_backup(
|
||||
manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
|
||||
}
|
||||
(BackupSpecificationType::PXAR, false) => {
|
||||
let metadata_mode = false; // Until enabled via param
|
||||
|
||||
let target_base = if let Some(base) = target_base.strip_suffix(".pxar") {
|
||||
base.to_string()
|
||||
} else {
|
||||
bail!("unexpected suffix in target: {target_base}");
|
||||
};
|
||||
|
||||
let (target, payload_target) = if metadata_mode {
|
||||
(
|
||||
format!("{target_base}.mpxar.{extension}"),
|
||||
Some(format!("{target_base}.ppxar.{extension}")),
|
||||
)
|
||||
} else {
|
||||
(target, None)
|
||||
};
|
||||
let (target, payload_target) =
|
||||
if detection_mode.is_metadata() || detection_mode.is_data() {
|
||||
(
|
||||
format!("{target_base}.mpxar.{extension}"),
|
||||
Some(format!("{target_base}.ppxar.{extension}")),
|
||||
)
|
||||
} else {
|
||||
(target, None)
|
||||
};
|
||||
|
||||
// start catalog upload on first use
|
||||
if catalog.is_none() {
|
||||
@ -1060,12 +1067,42 @@ async fn create_backup(
|
||||
.unwrap()
|
||||
.start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
|
||||
|
||||
let mut previous_ref = None;
|
||||
if detection_mode.is_metadata() {
|
||||
if let Some(ref manifest) = previous_manifest {
|
||||
// BackupWriter::start created a new snapshot, get the one before
|
||||
if let Some(backup_time) = client.previous_backup_time().await? {
|
||||
let backup_dir: BackupDir =
|
||||
(snapshot.group.clone(), backup_time).into();
|
||||
let backup_reader = BackupReader::start(
|
||||
&http_client,
|
||||
crypt_config.clone(),
|
||||
repo.store(),
|
||||
&backup_ns,
|
||||
&backup_dir,
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
previous_ref = prepare_reference(
|
||||
&target,
|
||||
manifest.clone(),
|
||||
&client,
|
||||
backup_reader.clone(),
|
||||
crypt_config.clone(),
|
||||
crypto.mode,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let pxar_options = pbs_client::pxar::PxarCreateOptions {
|
||||
device_set: devices.clone(),
|
||||
patterns: pattern_list.clone(),
|
||||
entries_max: entries_max as usize,
|
||||
skip_lost_and_found,
|
||||
skip_e2big_xattr,
|
||||
previous_ref,
|
||||
};
|
||||
|
||||
let upload_options = UploadOptions {
|
||||
@ -1177,6 +1214,72 @@ async fn create_backup(
|
||||
Ok(Value::Null)
|
||||
}
|
||||
|
||||
async fn prepare_reference(
|
||||
target: &str,
|
||||
manifest: Arc<BackupManifest>,
|
||||
backup_writer: &BackupWriter,
|
||||
backup_reader: Arc<BackupReader>,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
crypt_mode: CryptMode,
|
||||
) -> Result<Option<PxarPrevRef>, Error> {
|
||||
let (target, payload_target) =
|
||||
match pbs_client::tools::get_pxar_archive_names(target, &manifest) {
|
||||
Ok((target, payload_target)) => (target, payload_target),
|
||||
Err(_) => return Ok(None),
|
||||
};
|
||||
let payload_target = payload_target.unwrap_or_default();
|
||||
|
||||
let metadata_ref_index = if let Ok(index) = backup_reader
|
||||
.download_dynamic_index(&manifest, &target)
|
||||
.await
|
||||
{
|
||||
index
|
||||
} else {
|
||||
log::info!("No previous metadata index, continue without reference");
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let file_info = match manifest.lookup_file_info(&payload_target) {
|
||||
Ok(file_info) => file_info,
|
||||
Err(_) => {
|
||||
log::info!("No previous payload index found in manifest, continue without reference");
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if file_info.crypt_mode != crypt_mode {
|
||||
log::info!("Crypt mode mismatch, continue without reference");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let known_payload_chunks = Arc::new(Mutex::new(HashSet::new()));
|
||||
let payload_ref_index = backup_writer
|
||||
.download_previous_dynamic_index(&payload_target, &manifest, known_payload_chunks)
|
||||
.await?;
|
||||
|
||||
log::info!("Using previous index as metadata reference for '{target}'");
|
||||
|
||||
let most_used = metadata_ref_index.find_most_used_chunks(8);
|
||||
let file_info = manifest.lookup_file_info(&target)?;
|
||||
let chunk_reader = RemoteChunkReader::new(
|
||||
backup_reader.clone(),
|
||||
crypt_config.clone(),
|
||||
file_info.chunk_crypt_mode(),
|
||||
most_used,
|
||||
);
|
||||
let reader = BufferedDynamicReader::new(metadata_ref_index, chunk_reader);
|
||||
let archive_size = reader.archive_size();
|
||||
let reader: MetadataArchiveReader = Arc::new(LocalDynamicReadAt::new(reader));
|
||||
// only care about the metadata, therefore do not attach payload reader
|
||||
let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
|
||||
|
||||
Ok(Some(pbs_client::pxar::PxarPrevRef {
|
||||
accessor,
|
||||
payload_index: payload_ref_index,
|
||||
archive_name: target,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn dump_image<W: Write>(
|
||||
client: Arc<BackupReader>,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
|
@ -360,6 +360,7 @@ fn extract(
|
||||
patterns,
|
||||
skip_lost_and_found: false,
|
||||
skip_e2big_xattr: false,
|
||||
previous_ref: None,
|
||||
};
|
||||
|
||||
let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer));
|
||||
|
@ -362,6 +362,7 @@ async fn create_archive(
|
||||
patterns,
|
||||
skip_lost_and_found: false,
|
||||
skip_e2big_xattr: false,
|
||||
previous_ref: None,
|
||||
};
|
||||
|
||||
let source = PathBuf::from(source);
|
||||
|
Loading…
Reference in New Issue
Block a user