pull: add support for pulling from local datastore

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Lukas Wagner <l.wagner@proxmox.com>
Tested-by: Tested-by: Gabriel Goller <g.goller@proxmox.com>
This commit is contained in:
Hannes Laimer 2023-11-21 15:31:52 +01:00 committed by Thomas Lamprecht
parent 05a52d0106
commit 076f36ec4e

View File

@ -1,8 +1,8 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
use std::io::Seek;
use std::path::Path;
use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_datastore::{
check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
};
use pbs_tools::sha::sha256;
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
struct RemoteReader {
@ -40,6 +42,12 @@ struct RemoteReader {
dir: BackupDir,
}
struct LocalReader {
_dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
path: PathBuf,
datastore: Arc<DataStore>,
}
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
client: HttpClient,
}
pub(crate) struct LocalSource {
store: Arc<DataStore>,
ns: BackupNamespace,
}
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
}
}
#[async_trait::async_trait]
impl PullSource for LocalSource {
async fn list_namespaces(
&self,
max_depth: &mut Option<usize>,
_worker: &WorkerTask,
) -> Result<Vec<BackupNamespace>, Error> {
ListNamespacesRecursive::new_max_depth(
self.store.clone(),
self.ns.clone(),
max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
)?
.collect()
}
async fn list_groups(
&self,
namespace: &BackupNamespace,
owner: &Authid,
) -> Result<Vec<BackupGroup>, Error> {
Ok(ListAccessibleBackupGroups::new_with_privs(
&self.store,
namespace.clone(),
0,
None,
None,
Some(owner),
)?
.filter_map(Result::ok)
.map(|backup_group| backup_group.group().clone())
.collect::<Vec<pbs_api_types::BackupGroup>>())
}
async fn list_backup_dirs(
&self,
namespace: &BackupNamespace,
group: &BackupGroup,
_worker: &WorkerTask,
) -> Result<Vec<BackupDir>, Error> {
Ok(self
.store
.backup_group(namespace.clone(), group.clone())
.iter_snapshots()?
.filter_map(Result::ok)
.map(|snapshot| snapshot.dir().to_owned())
.collect::<Vec<BackupDir>>())
}
fn get_ns(&self) -> BackupNamespace {
self.ns.clone()
}
fn print_store_and_ns(&self) -> String {
print_store_and_ns(self.store.name(), &self.ns)
}
async fn reader(
&self,
ns: &BackupNamespace,
dir: &BackupDir,
) -> Result<Arc<dyn PullReader>, Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
Ok(Arc::new(LocalReader {
_dir_lock: Arc::new(Mutex::new(dir_lock)),
path: dir.full_path(),
datastore: dir.datastore().clone(),
}))
}
}
#[async_trait::async_trait]
/// `PullReader` is a trait that provides an interface for reading data from a source.
/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
}
}
#[async_trait::async_trait]
impl PullReader for LocalReader {
fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
Arc::new(LocalChunkReader::new(
self.datastore.clone(),
None,
crypt_mode,
))
}
async fn load_file_into(
&self,
filename: &str,
into: &Path,
_worker: &WorkerTask,
) -> Result<Option<DataBlob>, Error> {
let mut tmp_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.read(true)
.open(into)?;
let mut from_path = self.path.clone();
from_path.push(filename);
tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
tmp_file.rewind()?;
Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
}
async fn try_download_client_log(
&self,
_to_path: &Path,
_worker: &WorkerTask,
) -> Result<(), Error> {
Ok(())
}
fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
self.datastore.name() == target_store_name
}
}
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@ -399,7 +529,10 @@ impl PullParameters {
client,
})
} else {
bail!("local sync not implemented yet")
Arc::new(LocalSource {
store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
ns: remote_ns,
})
};
let target = PullTarget {
store: DataStore::lookup_datastore(store, Some(Operation::Write))?,