server: sync: move sync related stats to common module

Move and rename the `PullStats` to `SyncStats` as well as moving the
`RemovedVanishedStats` to make them reusable for sync operations in
push direction as well as pull direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-09-12 16:32:51 +02:00 committed by Fabian Grünbichler
parent 0a916665ae
commit ffe1dd4369
3 changed files with 89 additions and 84 deletions

View File

@ -34,6 +34,7 @@ pub use report::*;
pub mod auth;
pub(crate) mod pull;
pub(crate) mod sync;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;

View File

@ -5,7 +5,7 @@ use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
@ -34,6 +34,7 @@ use pbs_datastore::{
};
use pbs_tools::sha::sha256;
use super::sync::{RemovedVanishedStats, SyncStats};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
@ -64,54 +65,6 @@ pub(crate) struct LocalSource {
ns: BackupNamespace,
}
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
pub(crate) snapshots: usize,
pub(crate) namespaces: usize,
}
impl RemovedVanishedStats {
fn add(&mut self, rhs: RemovedVanishedStats) {
self.groups += rhs.groups;
self.snapshots += rhs.snapshots;
self.namespaces += rhs.namespaces;
}
}
#[derive(Default)]
pub(crate) struct PullStats {
pub(crate) chunk_count: usize,
pub(crate) bytes: usize,
pub(crate) elapsed: Duration,
pub(crate) removed: Option<RemovedVanishedStats>,
}
impl From<RemovedVanishedStats> for PullStats {
fn from(removed: RemovedVanishedStats) -> Self {
Self {
removed: Some(removed),
..Default::default()
}
}
}
impl PullStats {
fn add(&mut self, rhs: PullStats) {
self.chunk_count += rhs.chunk_count;
self.bytes += rhs.bytes;
self.elapsed += rhs.elapsed;
if let Some(rhs_removed) = rhs.removed {
if let Some(ref mut removed) = self.removed {
removed.add(rhs_removed);
} else {
self.removed = Some(rhs_removed);
}
}
}
}
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@ -576,7 +529,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
@ -663,7 +616,7 @@ async fn pull_index_chunks<I: IndexFile>(
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
Ok(PullStats {
Ok(SyncStats {
chunk_count,
bytes,
elapsed,
@ -701,7 +654,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
) -> Result<SyncStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@ -709,7 +662,7 @@ async fn pull_single_archive<'a>(
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
let mut pull_stats = PullStats::default();
let mut sync_stats = SyncStats::default();
info!("sync archive {archive_name}");
@ -735,7 +688,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
pull_stats.add(stats);
sync_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
@ -755,7 +708,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
pull_stats.add(stats);
sync_stats.add(stats);
}
}
ArchiveType::Blob => {
@ -767,7 +720,7 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
Ok(pull_stats)
Ok(sync_stats)
}
/// Actual implementation of pulling a snapshot.
@ -783,8 +736,8 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
let mut pull_stats = PullStats::default();
) -> Result<SyncStats, Error> {
let mut sync_stats = SyncStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
@ -800,7 +753,7 @@ async fn pull_snapshot<'a>(
{
tmp_manifest_blob = data;
} else {
return Ok(pull_stats);
return Ok(sync_stats);
}
if manifest_name.exists() {
@ -822,7 +775,7 @@ async fn pull_snapshot<'a>(
};
info!("no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(pull_stats); // nothing changed
return Ok(sync_stats); // nothing changed
}
}
@ -869,7 +822,7 @@ async fn pull_snapshot<'a>(
let stats =
pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
pull_stats.add(stats);
sync_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@ -883,7 +836,7 @@ async fn pull_snapshot<'a>(
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
Ok(pull_stats)
Ok(sync_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@ -894,12 +847,12 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
) -> Result<SyncStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
let pull_stats = if is_new {
let sync_stats = if is_new {
info!("sync snapshot {}", snapshot.dir());
match pull_snapshot(reader, snapshot, downloaded_chunks).await {
@ -913,9 +866,9 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
Ok(pull_stats) => {
Ok(sync_stats) => {
info!("sync snapshot {} done", snapshot.dir());
pull_stats
sync_stats
}
}
} else {
@ -923,7 +876,7 @@ async fn pull_snapshot_from<'a>(
pull_snapshot(reader, snapshot, downloaded_chunks).await?
};
Ok(pull_stats)
Ok(sync_stats)
}
#[derive(PartialEq, Eq)]
@ -1027,7 +980,7 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
) -> Result<PullStats, Error> {
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@ -1084,7 +1037,7 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
let mut pull_stats = PullStats::default();
let mut sync_stats = SyncStats::default();
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
@ -1102,7 +1055,7 @@ async fn pull_group(
info!("percentage done: {progress}");
let stats = result?; // stop on error
pull_stats.add(stats);
sync_stats.add(stats);
}
if params.remove_vanished {
@ -1128,7 +1081,7 @@ async fn pull_group(
.target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
pull_stats.add(PullStats::from(RemovedVanishedStats {
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: 1,
groups: 0,
namespaces: 0,
@ -1136,7 +1089,7 @@ async fn pull_group(
}
}
Ok(pull_stats)
Ok(sync_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@ -1253,7 +1206,7 @@ fn check_and_remove_vanished_ns(
/// - remote namespaces are filtered by remote
/// - creation and removal of sub-NS checked here
/// - access to sub-NS checked here
pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats, Error> {
pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
@ -1286,7 +1239,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
let mut pull_stats = PullStats::default();
let mut sync_stats = SyncStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@ -1310,10 +1263,10 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
}
match pull_ns(&namespace, &mut params).await {
Ok((ns_progress, ns_pull_stats, ns_errors)) => {
Ok((ns_progress, ns_sync_stats, ns_errors)) => {
errors |= ns_errors;
pull_stats.add(ns_pull_stats);
sync_stats.add(ns_sync_stats);
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
@ -1342,14 +1295,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
if params.remove_vanished {
let (has_errors, stats) = check_and_remove_vanished_ns(&params, synced_ns)?;
errors |= has_errors;
pull_stats.add(PullStats::from(stats));
sync_stats.add(SyncStats::from(stats));
}
if errors {
bail!("sync failed with some errors.");
}
Ok(pull_stats)
Ok(sync_stats)
}
/// Pulls a namespace according to `params`.
@ -1367,7 +1320,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
pub(crate) async fn pull_ns(
namespace: &BackupNamespace,
params: &mut PullParameters,
) -> Result<(StoreProgress, PullStats, bool), Error> {
) -> Result<(StoreProgress, SyncStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
list.sort_unstable_by(|a, b| {
@ -1397,7 +1350,7 @@ pub(crate) async fn pull_ns(
}
let mut progress = StoreProgress::new(list.len() as u64);
let mut pull_stats = PullStats::default();
let mut sync_stats = SyncStats::default();
let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
@ -1432,7 +1385,7 @@ pub(crate) async fn pull_ns(
errors = true; // do not stop here, instead continue
} else {
match pull_group(params, namespace, &group, &mut progress).await {
Ok(stats) => pull_stats.add(stats),
Ok(stats) => sync_stats.add(stats),
Err(err) => {
info!("sync group {} failed - {err}", &group);
errors = true; // do not stop here, instead continue
@ -1466,13 +1419,13 @@ pub(crate) async fn pull_ns(
Ok(stats) => {
if !stats.all_removed() {
info!("kept some protected snapshots of group '{local_group}'");
pull_stats.add(PullStats::from(RemovedVanishedStats {
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
namespaces: 0,
}));
} else {
pull_stats.add(PullStats::from(RemovedVanishedStats {
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 1,
namespaces: 0,
@ -1493,5 +1446,5 @@ pub(crate) async fn pull_ns(
};
}
Ok((progress, pull_stats, errors))
Ok((progress, sync_stats, errors))
}

51
src/server/sync.rs Normal file
View File

@ -0,0 +1,51 @@
//! Sync datastore contents from source to target, either in push or pull direction
use std::time::Duration;
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
pub(crate) snapshots: usize,
pub(crate) namespaces: usize,
}
impl RemovedVanishedStats {
pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) {
self.groups += rhs.groups;
self.snapshots += rhs.snapshots;
self.namespaces += rhs.namespaces;
}
}
#[derive(Default)]
pub(crate) struct SyncStats {
pub(crate) chunk_count: usize,
pub(crate) bytes: usize,
pub(crate) elapsed: Duration,
pub(crate) removed: Option<RemovedVanishedStats>,
}
impl From<RemovedVanishedStats> for SyncStats {
fn from(removed: RemovedVanishedStats) -> Self {
Self {
removed: Some(removed),
..Default::default()
}
}
}
impl SyncStats {
pub(crate) fn add(&mut self, rhs: SyncStats) {
self.chunk_count += rhs.chunk_count;
self.bytes += rhs.bytes;
self.elapsed += rhs.elapsed;
if let Some(rhs_removed) = rhs.removed {
if let Some(ref mut removed) = self.removed {
removed.add(rhs_removed);
} else {
self.removed = Some(rhs_removed);
}
}
}
}