mirror of
git://git.proxmox.com/git/proxmox-backup.git
synced 2025-01-20 14:03:53 +03:00
api: sync: move sync job invocation to server sync module
Moves and refactores the sync_job_do function into the common server sync module so that it can be reused for both sync directions, pull and push. Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
parent
e898887f54
commit
46951c103b
@ -10,16 +10,16 @@ use proxmox_router::{
|
||||
use proxmox_schema::api;
|
||||
use proxmox_sortable_macro::sortable;
|
||||
|
||||
use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA};
|
||||
use pbs_api_types::{
|
||||
Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA,
|
||||
};
|
||||
use pbs_config::sync;
|
||||
use pbs_config::CachedUserInfo;
|
||||
|
||||
use crate::{
|
||||
api2::{
|
||||
config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
|
||||
pull::do_sync_job,
|
||||
},
|
||||
api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
|
||||
server::jobstate::{compute_schedule_status, Job, JobState},
|
||||
server::sync::do_sync_job,
|
||||
};
|
||||
|
||||
#[api(
|
||||
@ -116,7 +116,14 @@ pub fn run_sync_job(
|
||||
|
||||
let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
|
||||
|
||||
let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
|
||||
let upid_str = do_sync_job(
|
||||
job,
|
||||
sync_job,
|
||||
&auth_id,
|
||||
None,
|
||||
SyncDirection::Pull,
|
||||
to_stdout,
|
||||
)?;
|
||||
|
||||
Ok(upid_str)
|
||||
}
|
||||
|
108
src/api2/pull.rs
108
src/api2/pull.rs
@ -13,10 +13,8 @@ use pbs_api_types::{
|
||||
TRANSFER_LAST_SCHEMA,
|
||||
};
|
||||
use pbs_config::CachedUserInfo;
|
||||
use proxmox_human_byte::HumanByte;
|
||||
use proxmox_rest_server::WorkerTask;
|
||||
|
||||
use crate::server::jobstate::Job;
|
||||
use crate::server::pull::{pull_store, PullParameters};
|
||||
|
||||
pub fn check_pull_privs(
|
||||
@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn do_sync_job(
|
||||
mut job: Job,
|
||||
sync_job: SyncJobConfig,
|
||||
auth_id: &Authid,
|
||||
schedule: Option<String>,
|
||||
to_stdout: bool,
|
||||
) -> Result<String, Error> {
|
||||
let job_id = format!(
|
||||
"{}:{}:{}:{}:{}",
|
||||
sync_job.remote.as_deref().unwrap_or("-"),
|
||||
sync_job.remote_store,
|
||||
sync_job.store,
|
||||
sync_job.ns.clone().unwrap_or_default(),
|
||||
job.jobname()
|
||||
);
|
||||
let worker_type = job.jobtype().to_string();
|
||||
|
||||
if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
|
||||
bail!("can't sync to same datastore");
|
||||
}
|
||||
|
||||
let upid_str = WorkerTask::spawn(
|
||||
&worker_type,
|
||||
Some(job_id.clone()),
|
||||
auth_id.to_string(),
|
||||
to_stdout,
|
||||
move |worker| async move {
|
||||
job.start(&worker.upid().to_string())?;
|
||||
|
||||
let worker2 = worker.clone();
|
||||
let sync_job2 = sync_job.clone();
|
||||
|
||||
let worker_future = async move {
|
||||
let pull_params = PullParameters::try_from(&sync_job)?;
|
||||
|
||||
info!("Starting datastore sync job '{job_id}'");
|
||||
if let Some(event_str) = schedule {
|
||||
info!("task triggered by schedule '{event_str}'");
|
||||
}
|
||||
|
||||
info!(
|
||||
"sync datastore '{}' from '{}{}'",
|
||||
sync_job.store,
|
||||
sync_job
|
||||
.remote
|
||||
.as_deref()
|
||||
.map_or(String::new(), |remote| format!("{remote}/")),
|
||||
sync_job.remote_store,
|
||||
);
|
||||
|
||||
let pull_stats = pull_store(pull_params).await?;
|
||||
|
||||
if pull_stats.bytes != 0 {
|
||||
let amount = HumanByte::from(pull_stats.bytes);
|
||||
let rate = HumanByte::new_binary(
|
||||
pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
|
||||
);
|
||||
info!(
|
||||
"Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
|
||||
pull_stats.chunk_count,
|
||||
);
|
||||
} else {
|
||||
info!("Summary: sync job found no new data to pull");
|
||||
}
|
||||
|
||||
if let Some(removed) = pull_stats.removed {
|
||||
info!(
|
||||
"Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
|
||||
removed.snapshots, removed.groups, removed.namespaces,
|
||||
);
|
||||
}
|
||||
|
||||
info!("sync job '{}' end", &job_id);
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let mut abort_future = worker2
|
||||
.abort_future()
|
||||
.map(|_| Err(format_err!("sync aborted")));
|
||||
|
||||
let result = select! {
|
||||
worker = worker_future.fuse() => worker,
|
||||
abort = abort_future => abort,
|
||||
};
|
||||
|
||||
let status = worker2.create_state(&result);
|
||||
|
||||
match job.finish(status) {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
eprintln!("could not finish job state: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
|
||||
eprintln!("send sync notification failed: {err}");
|
||||
}
|
||||
|
||||
result
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(upid_str)
|
||||
}
|
||||
|
||||
#[api(
|
||||
input: {
|
||||
properties: {
|
||||
|
@ -40,17 +40,17 @@ use pbs_buildcfg::configdir;
|
||||
use proxmox_time::CalendarEvent;
|
||||
|
||||
use pbs_api_types::{
|
||||
Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
|
||||
VerificationJobConfig,
|
||||
Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig,
|
||||
TapeBackupJobConfig, VerificationJobConfig,
|
||||
};
|
||||
|
||||
use proxmox_backup::auth_helpers::*;
|
||||
use proxmox_backup::server::{self, metric_collection};
|
||||
use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
|
||||
|
||||
use proxmox_backup::api2::pull::do_sync_job;
|
||||
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
|
||||
use proxmox_backup::server::do_prune_job;
|
||||
use proxmox_backup::server::do_sync_job;
|
||||
use proxmox_backup::server::do_verification_job;
|
||||
|
||||
fn main() -> Result<(), Error> {
|
||||
@ -611,7 +611,14 @@ async fn schedule_datastore_sync_jobs() {
|
||||
};
|
||||
|
||||
let auth_id = Authid::root_auth_id().clone();
|
||||
if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
|
||||
if let Err(err) = do_sync_job(
|
||||
job,
|
||||
job_config,
|
||||
&auth_id,
|
||||
Some(event_str),
|
||||
SyncDirection::Pull,
|
||||
false,
|
||||
) {
|
||||
eprintln!("unable to start datastore sync job {job_id} - {err}");
|
||||
}
|
||||
};
|
||||
|
@ -38,6 +38,7 @@ pub mod metric_collection;
|
||||
pub(crate) mod pull;
|
||||
pub(crate) mod push;
|
||||
pub(crate) mod sync;
|
||||
pub use sync::do_sync_job;
|
||||
|
||||
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
|
||||
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
|
||||
|
@ -6,16 +6,19 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use anyhow::{bail, format_err, Context, Error};
|
||||
use futures::{future::FutureExt, select};
|
||||
use http::StatusCode;
|
||||
use serde_json::json;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use proxmox_human_byte::HumanByte;
|
||||
use proxmox_rest_server::WorkerTask;
|
||||
use proxmox_router::HttpError;
|
||||
|
||||
use pbs_api_types::{
|
||||
Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
|
||||
MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
|
||||
SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
|
||||
};
|
||||
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
|
||||
use pbs_datastore::data_blob::DataBlob;
|
||||
@ -24,6 +27,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk;
|
||||
use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
|
||||
|
||||
use crate::backup::ListAccessibleBackupGroups;
|
||||
use crate::server::jobstate::Job;
|
||||
use crate::server::pull::{pull_store, PullParameters};
|
||||
use crate::server::push::{push_store, PushParameters};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct RemovedVanishedStats {
|
||||
@ -579,3 +585,143 @@ pub(crate) fn check_namespace_depth_limit(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run a sync job in given direction
|
||||
pub fn do_sync_job(
|
||||
mut job: Job,
|
||||
sync_job: SyncJobConfig,
|
||||
auth_id: &Authid,
|
||||
schedule: Option<String>,
|
||||
sync_direction: SyncDirection,
|
||||
to_stdout: bool,
|
||||
) -> Result<String, Error> {
|
||||
let job_id = format!(
|
||||
"{}:{}:{}:{}:{}",
|
||||
sync_job.remote.as_deref().unwrap_or("-"),
|
||||
sync_job.remote_store,
|
||||
sync_job.store,
|
||||
sync_job.ns.clone().unwrap_or_default(),
|
||||
job.jobname(),
|
||||
);
|
||||
let worker_type = job.jobtype().to_string();
|
||||
|
||||
if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
|
||||
bail!("can't sync to same datastore");
|
||||
}
|
||||
|
||||
let upid_str = WorkerTask::spawn(
|
||||
&worker_type,
|
||||
Some(job_id.clone()),
|
||||
auth_id.to_string(),
|
||||
to_stdout,
|
||||
move |worker| async move {
|
||||
job.start(&worker.upid().to_string())?;
|
||||
|
||||
let worker2 = worker.clone();
|
||||
let sync_job2 = sync_job.clone();
|
||||
|
||||
let worker_future = async move {
|
||||
info!("Starting datastore sync job '{job_id}'");
|
||||
if let Some(event_str) = schedule {
|
||||
info!("task triggered by schedule '{event_str}'");
|
||||
}
|
||||
let sync_stats = match sync_direction {
|
||||
SyncDirection::Pull => {
|
||||
info!(
|
||||
"sync datastore '{}' from '{}{}'",
|
||||
sync_job.store,
|
||||
sync_job
|
||||
.remote
|
||||
.as_deref()
|
||||
.map_or(String::new(), |remote| format!("{remote}/")),
|
||||
sync_job.remote_store,
|
||||
);
|
||||
let pull_params = PullParameters::try_from(&sync_job)?;
|
||||
pull_store(pull_params).await?
|
||||
}
|
||||
SyncDirection::Push => {
|
||||
info!(
|
||||
"sync datastore '{}' to '{}{}'",
|
||||
sync_job.store,
|
||||
sync_job
|
||||
.remote
|
||||
.as_deref()
|
||||
.map_or(String::new(), |remote| format!("{remote}/")),
|
||||
sync_job.remote_store,
|
||||
);
|
||||
let push_params = PushParameters::new(
|
||||
&sync_job.store,
|
||||
sync_job.ns.clone().unwrap_or_default(),
|
||||
sync_job
|
||||
.remote
|
||||
.as_deref()
|
||||
.context("missing required remote")?,
|
||||
&sync_job.remote_store,
|
||||
sync_job.remote_ns.clone().unwrap_or_default(),
|
||||
sync_job
|
||||
.owner
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| Authid::root_auth_id())
|
||||
.clone(),
|
||||
sync_job.remove_vanished,
|
||||
sync_job.max_depth,
|
||||
sync_job.group_filter.clone(),
|
||||
sync_job.limit.clone(),
|
||||
sync_job.transfer_last,
|
||||
)
|
||||
.await?;
|
||||
push_store(push_params).await?
|
||||
}
|
||||
};
|
||||
|
||||
if sync_stats.bytes != 0 {
|
||||
let amount = HumanByte::from(sync_stats.bytes);
|
||||
let rate = HumanByte::new_binary(
|
||||
sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
|
||||
);
|
||||
info!(
|
||||
"Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)",
|
||||
sync_stats.chunk_count,
|
||||
);
|
||||
} else {
|
||||
info!("Summary: sync job found no new data to {sync_direction}");
|
||||
}
|
||||
|
||||
if let Some(removed) = sync_stats.removed {
|
||||
info!(
|
||||
"Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
|
||||
removed.snapshots, removed.groups, removed.namespaces,
|
||||
);
|
||||
}
|
||||
|
||||
info!("sync job '{job_id}' end");
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let mut abort_future = worker2
|
||||
.abort_future()
|
||||
.map(|_| Err(format_err!("sync aborted")));
|
||||
|
||||
let result = select! {
|
||||
worker = worker_future.fuse() => worker,
|
||||
abort = abort_future => abort,
|
||||
};
|
||||
|
||||
let status = worker2.create_state(&result);
|
||||
|
||||
match job.finish(status) {
|
||||
Ok(_) => {}
|
||||
Err(err) => eprintln!("could not finish job state: {err}"),
|
||||
}
|
||||
|
||||
if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
|
||||
eprintln!("send sync notification failed: {err}");
|
||||
}
|
||||
|
||||
result
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(upid_str)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user