mirror of
git://git.proxmox.com/git/proxmox-backup.git
synced 2025-01-21 18:03:59 +03:00
fix #3786: api: add resync-corrupt option to sync jobs
This option allows us to "fix" corrupt snapshots (and/or their chunks) by pulling them from another remote. When traversing the remote snapshots, we check if it exists locally, and if it is, we check if the last verification of it failed. If the local snapshot is broken and the `resync-corrupt` option is turned on, we pull in the remote snapshot, overwriting the local one. This is very useful and has been requested a lot, as there is currently no way to "fix" corrupt chunks/snapshots even if the user has a healthy version of it on their offsite instance. Originally-by: Shannon Sterz <s.sterz@proxmox.com> Signed-off-by: Gabriel Goller <g.goller@proxmox.com> Reviewed-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
This commit is contained in:
parent
b5be65cf8a
commit
0974ddfa17
@ -536,6 +536,10 @@ impl SyncDirection {
|
||||
}
|
||||
}
|
||||
|
||||
pub const RESYNC_CORRUPT_SCHEMA: Schema =
|
||||
BooleanSchema::new("If the verification failed for a local snapshot, try to pull it again.")
|
||||
.schema();
|
||||
|
||||
#[api(
|
||||
properties: {
|
||||
id: {
|
||||
@ -590,6 +594,10 @@ impl SyncDirection {
|
||||
schema: TRANSFER_LAST_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
"resync-corrupt": {
|
||||
schema: RESYNC_CORRUPT_SCHEMA,
|
||||
optional: true,
|
||||
}
|
||||
}
|
||||
)]
|
||||
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
|
||||
@ -623,6 +631,8 @@ pub struct SyncJobConfig {
|
||||
pub limit: RateLimitConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub transfer_last: Option<usize>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub resync_corrupt: Option<bool>,
|
||||
}
|
||||
|
||||
impl SyncJobConfig {
|
||||
|
@ -471,6 +471,9 @@ pub fn update_sync_job(
|
||||
if let Some(transfer_last) = update.transfer_last {
|
||||
data.transfer_last = Some(transfer_last);
|
||||
}
|
||||
if let Some(resync_corrupt) = update.resync_corrupt {
|
||||
data.resync_corrupt = Some(resync_corrupt);
|
||||
}
|
||||
|
||||
if update.limit.rate_in.is_some() {
|
||||
data.limit.rate_in = update.limit.rate_in;
|
||||
@ -629,6 +632,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
|
||||
ns: None,
|
||||
owner: Some(write_auth_id.clone()),
|
||||
comment: None,
|
||||
resync_corrupt: None,
|
||||
remove_vanished: None,
|
||||
max_depth: None,
|
||||
group_filter: None,
|
||||
|
@ -10,7 +10,7 @@ use pbs_api_types::{
|
||||
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
|
||||
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
|
||||
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
|
||||
TRANSFER_LAST_SCHEMA,
|
||||
RESYNC_CORRUPT_SCHEMA, TRANSFER_LAST_SCHEMA,
|
||||
};
|
||||
use pbs_config::CachedUserInfo;
|
||||
use proxmox_rest_server::WorkerTask;
|
||||
@ -87,6 +87,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
|
||||
sync_job.group_filter.clone(),
|
||||
sync_job.limit.clone(),
|
||||
sync_job.transfer_last,
|
||||
sync_job.resync_corrupt,
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -132,6 +133,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
|
||||
schema: TRANSFER_LAST_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
"resync-corrupt": {
|
||||
schema: RESYNC_CORRUPT_SCHEMA,
|
||||
optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
access: {
|
||||
@ -156,6 +161,7 @@ async fn pull(
|
||||
group_filter: Option<Vec<GroupFilter>>,
|
||||
limit: RateLimitConfig,
|
||||
transfer_last: Option<usize>,
|
||||
resync_corrupt: Option<bool>,
|
||||
rpcenv: &mut dyn RpcEnvironment,
|
||||
) -> Result<String, Error> {
|
||||
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
|
||||
@ -193,6 +199,7 @@ async fn pull(
|
||||
group_filter,
|
||||
limit,
|
||||
transfer_last,
|
||||
resync_corrupt,
|
||||
)?;
|
||||
|
||||
// fixme: set to_stdout to false?
|
||||
|
@ -12,8 +12,9 @@ use tracing::info;
|
||||
|
||||
use pbs_api_types::{
|
||||
print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup,
|
||||
BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, CLIENT_LOG_BLOB_NAME,
|
||||
MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
|
||||
BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, VerifyState,
|
||||
CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT,
|
||||
PRIV_DATASTORE_BACKUP,
|
||||
};
|
||||
use pbs_client::BackupRepository;
|
||||
use pbs_config::CachedUserInfo;
|
||||
@ -54,6 +55,8 @@ pub(crate) struct PullParameters {
|
||||
group_filter: Vec<GroupFilter>,
|
||||
/// How many snapshots should be transferred at most (taking the newest N snapshots)
|
||||
transfer_last: Option<usize>,
|
||||
/// Whether to re-sync corrupted snapshots
|
||||
resync_corrupt: bool,
|
||||
}
|
||||
|
||||
impl PullParameters {
|
||||
@ -71,12 +74,14 @@ impl PullParameters {
|
||||
group_filter: Option<Vec<GroupFilter>>,
|
||||
limit: RateLimitConfig,
|
||||
transfer_last: Option<usize>,
|
||||
resync_corrupt: Option<bool>,
|
||||
) -> Result<Self, Error> {
|
||||
if let Some(max_depth) = max_depth {
|
||||
ns.check_max_depth(max_depth)?;
|
||||
remote_ns.check_max_depth(max_depth)?;
|
||||
};
|
||||
let remove_vanished = remove_vanished.unwrap_or(false);
|
||||
let resync_corrupt = resync_corrupt.unwrap_or(false);
|
||||
|
||||
let source: Arc<dyn SyncSource> = if let Some(remote) = remote {
|
||||
let (remote_config, _digest) = pbs_config::remote::config()?;
|
||||
@ -115,6 +120,7 @@ impl PullParameters {
|
||||
max_depth,
|
||||
group_filter,
|
||||
transfer_last,
|
||||
resync_corrupt,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -322,7 +328,7 @@ async fn pull_single_archive<'a>(
|
||||
///
|
||||
/// Pulling a snapshot consists of the following steps:
|
||||
/// - (Re)download the manifest
|
||||
/// -- if it matches, only download log and treat snapshot as already synced
|
||||
/// -- if it matches and is not corrupt, only download log and treat snapshot as already synced
|
||||
/// - Iterate over referenced files
|
||||
/// -- if file already exists, verify contents
|
||||
/// -- if not, pull it from the remote
|
||||
@ -331,6 +337,7 @@ async fn pull_snapshot<'a>(
|
||||
reader: Arc<dyn SyncSourceReader + 'a>,
|
||||
snapshot: &'a pbs_datastore::BackupDir,
|
||||
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||
corrupt: bool,
|
||||
) -> Result<SyncStats, Error> {
|
||||
let mut sync_stats = SyncStats::default();
|
||||
let mut manifest_name = snapshot.full_path();
|
||||
@ -351,7 +358,7 @@ async fn pull_snapshot<'a>(
|
||||
return Ok(sync_stats);
|
||||
}
|
||||
|
||||
if manifest_name.exists() {
|
||||
if manifest_name.exists() && !corrupt {
|
||||
let manifest_blob = proxmox_lang::try_block!({
|
||||
let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
|
||||
format_err!("unable to open local manifest {manifest_name:?} - {err}")
|
||||
@ -380,7 +387,7 @@ async fn pull_snapshot<'a>(
|
||||
let mut path = snapshot.full_path();
|
||||
path.push(&item.filename);
|
||||
|
||||
if path.exists() {
|
||||
if !corrupt && path.exists() {
|
||||
let filename: BackupArchiveName = item.filename.as_str().try_into()?;
|
||||
match filename.archive_type() {
|
||||
ArchiveType::DynamicIndex => {
|
||||
@ -443,6 +450,7 @@ async fn pull_snapshot_from<'a>(
|
||||
reader: Arc<dyn SyncSourceReader + 'a>,
|
||||
snapshot: &'a pbs_datastore::BackupDir,
|
||||
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
|
||||
corrupt: bool,
|
||||
) -> Result<SyncStats, Error> {
|
||||
let (_path, is_new, _snap_lock) = snapshot
|
||||
.datastore()
|
||||
@ -451,7 +459,8 @@ async fn pull_snapshot_from<'a>(
|
||||
let sync_stats = if is_new {
|
||||
info!("sync snapshot {}", snapshot.dir());
|
||||
|
||||
match pull_snapshot(reader, snapshot, downloaded_chunks).await {
|
||||
// this snapshot is new, so it can never be corrupt
|
||||
match pull_snapshot(reader, snapshot, downloaded_chunks, false).await {
|
||||
Err(err) => {
|
||||
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
|
||||
snapshot.backup_ns(),
|
||||
@ -467,9 +476,13 @@ async fn pull_snapshot_from<'a>(
|
||||
sync_stats
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if corrupt {
|
||||
info!("re-sync snapshot {} due to corruption", snapshot.dir());
|
||||
} else {
|
||||
info!("re-sync snapshot {}", snapshot.dir());
|
||||
pull_snapshot(reader, snapshot, downloaded_chunks).await?
|
||||
}
|
||||
pull_snapshot(reader, snapshot, downloaded_chunks, corrupt).await?
|
||||
};
|
||||
|
||||
Ok(sync_stats)
|
||||
@ -523,26 +536,52 @@ async fn pull_group(
|
||||
.last_successful_backup(&target_ns, group)?
|
||||
.unwrap_or(i64::MIN);
|
||||
|
||||
let list: Vec<BackupDir> = raw_list
|
||||
// Filter remote BackupDirs to include in pull
|
||||
// Also stores if the snapshot is corrupt (verification job failed)
|
||||
let list: Vec<(BackupDir, bool)> = raw_list
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter(|&(pos, ref dir)| {
|
||||
.filter_map(|(pos, dir)| {
|
||||
source_snapshots.insert(dir.time);
|
||||
// If resync_corrupt is set, check if the corresponding local snapshot failed to
|
||||
// verification
|
||||
if params.resync_corrupt {
|
||||
let local_dir = params
|
||||
.target
|
||||
.store
|
||||
.backup_dir(target_ns.clone(), dir.clone());
|
||||
if let Ok(local_dir) = local_dir {
|
||||
match local_dir.verify_state() {
|
||||
Ok(Some(state)) => {
|
||||
if state == VerifyState::Failed {
|
||||
return Some((dir, true));
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// The verify_state item was not found in the manifest, this means the
|
||||
// snapshot is new.
|
||||
}
|
||||
Err(_) => {
|
||||
// There was an error loading the manifest, probably better if we
|
||||
// resync.
|
||||
return Some((dir, true));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Note: the snapshot represented by `last_sync_time` might be missing its backup log
|
||||
// or post-backup verification state if those were not yet available during the last
|
||||
// sync run, always resync it
|
||||
if last_sync_time > dir.time {
|
||||
already_synced_skip_info.update(dir.time);
|
||||
return false;
|
||||
return None;
|
||||
}
|
||||
|
||||
if pos < cutoff && last_sync_time != dir.time {
|
||||
transfer_last_skip_info.update(dir.time);
|
||||
return false;
|
||||
return None;
|
||||
}
|
||||
true
|
||||
Some((dir, false))
|
||||
})
|
||||
.map(|(_, dir)| dir)
|
||||
.collect();
|
||||
|
||||
if already_synced_skip_info.count > 0 {
|
||||
@ -561,7 +600,7 @@ async fn pull_group(
|
||||
|
||||
let mut sync_stats = SyncStats::default();
|
||||
|
||||
for (pos, from_snapshot) in list.into_iter().enumerate() {
|
||||
for (pos, (from_snapshot, corrupt)) in list.into_iter().enumerate() {
|
||||
let to_snapshot = params
|
||||
.target
|
||||
.store
|
||||
@ -571,7 +610,8 @@ async fn pull_group(
|
||||
.source
|
||||
.reader(source_namespace, &from_snapshot)
|
||||
.await?;
|
||||
let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
|
||||
let result =
|
||||
pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
|
||||
|
||||
progress.done_snapshots = pos as u64 + 1;
|
||||
info!("percentage done: {progress}");
|
||||
|
Loading…
x
Reference in New Issue
Block a user