mirror of
git://git.proxmox.com/git/proxmox-backup-qemu.git
synced 2025-03-12 04:58:26 +03:00
src/worker_task.rs: use OnceCell to avoid locking
This commit is contained in:
parent
ca590e8fb9
commit
b26a8db3f1
@ -28,4 +28,5 @@ tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros",
|
||||
openssl = "0.10"
|
||||
h2 = { version = "0.2", features = ["stream"] }
|
||||
lazy_static = "1.4"
|
||||
once_cell = "1.3.1"
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
use anyhow::{bail, Error};
|
||||
use anyhow::{format_err, bail, Error};
|
||||
use std::collections::HashSet;
|
||||
use std::sync::{Mutex, Arc};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::os::raw::c_int;
|
||||
|
||||
use futures::future::{Future, Either, FutureExt};
|
||||
@ -16,12 +17,12 @@ pub(crate) struct BackupTask {
|
||||
setup: BackupSetup,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
crypt_config: Option<Arc<CryptConfig>>,
|
||||
writer: Mutex<Option<Arc<BackupWriter>>>,
|
||||
last_manifest: Mutex<Option<Arc<BackupManifest>>>,
|
||||
writer: OnceCell<Arc<BackupWriter>>,
|
||||
last_manifest: OnceCell<Arc<BackupManifest>>,
|
||||
registry: Arc<Mutex<Registry<ImageUploadInfo>>>,
|
||||
known_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
|
||||
abort: tokio::sync::broadcast::Sender<()>,
|
||||
aborted: Mutex<Option<String>>, // set on abort, conatins abort reason
|
||||
aborted: OnceCell<String>, // set on abort, conatins abort reason
|
||||
}
|
||||
|
||||
impl BackupTask {
|
||||
@ -56,8 +57,8 @@ impl BackupTask {
|
||||
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
|
||||
|
||||
Ok(Self { runtime, setup, crypt_config, abort, registry, known_chunks,
|
||||
writer: Mutex::new(None), last_manifest: Mutex::new(None),
|
||||
aborted: Mutex::new(None) })
|
||||
writer: OnceCell::new(), last_manifest: OnceCell::new(),
|
||||
aborted: OnceCell::new() })
|
||||
}
|
||||
|
||||
pub fn runtime(&self) -> tokio::runtime::Handle {
|
||||
@ -65,7 +66,7 @@ impl BackupTask {
|
||||
}
|
||||
|
||||
fn check_aborted(&self) -> Result<(), Error> {
|
||||
if (*self.aborted.lock().unwrap()).is_some() {
|
||||
if self.aborted.get().is_some() {
|
||||
bail!("task already aborted");
|
||||
}
|
||||
Ok(())
|
||||
@ -76,10 +77,7 @@ impl BackupTask {
|
||||
// should not happen, but log to stderr
|
||||
eprintln!("BackupTask send abort failed.");
|
||||
}
|
||||
let mut aborted = self.aborted.lock().unwrap();
|
||||
if (*aborted).is_none() {
|
||||
*aborted = Some(reason);
|
||||
}
|
||||
let _ = self.aborted.set(reason);
|
||||
}
|
||||
|
||||
pub async fn connect(&self) -> Result<c_int, Error> {
|
||||
@ -100,10 +98,12 @@ impl BackupTask {
|
||||
let mut result = 0;
|
||||
if let Ok(last_manifest) = last_manifest {
|
||||
result = 1;
|
||||
*self.last_manifest.lock().unwrap() = Some(Arc::new(last_manifest));
|
||||
self.last_manifest.set(Arc::new(last_manifest))
|
||||
.map_err(|_| format_err!("already connected!"))?;
|
||||
}
|
||||
|
||||
*self.writer.lock().unwrap() = Some(writer);
|
||||
self.writer.set(writer)
|
||||
.map_err(|_| format_err!("already connected!"))?;
|
||||
|
||||
Ok(result)
|
||||
};
|
||||
@ -120,8 +120,8 @@ impl BackupTask {
|
||||
|
||||
self.check_aborted()?;
|
||||
|
||||
let writer = match *self.writer.lock().unwrap() {
|
||||
Some(ref writer) => writer.clone(),
|
||||
let writer = match self.writer.get() {
|
||||
Some(writer) => writer.clone(),
|
||||
None => bail!("not connected"),
|
||||
};
|
||||
|
||||
@ -145,8 +145,8 @@ impl BackupTask {
|
||||
|
||||
self.check_aborted()?;
|
||||
|
||||
let writer = match *self.writer.lock().unwrap() {
|
||||
Some(ref writer) => writer.clone(),
|
||||
let writer = match self.writer.get() {
|
||||
Some(writer) => writer.clone(),
|
||||
None => bail!("not connected"),
|
||||
};
|
||||
|
||||
@ -166,12 +166,16 @@ impl BackupTask {
|
||||
abortable_command(command_future, abort_rx.recv()).await
|
||||
}
|
||||
|
||||
fn last_manifest(&self) -> Option<Arc<BackupManifest>> {
|
||||
self.last_manifest.get().map(|m| m.clone())
|
||||
}
|
||||
|
||||
pub fn check_incremental(
|
||||
&self,
|
||||
device_name: String,
|
||||
size: u64,
|
||||
) -> bool {
|
||||
check_last_incremental_csum(self.last_manifest.lock().unwrap().clone(), device_name, size)
|
||||
check_last_incremental_csum(self.last_manifest(), device_name, size)
|
||||
}
|
||||
|
||||
pub async fn register_image(
|
||||
@ -183,15 +187,15 @@ impl BackupTask {
|
||||
|
||||
self.check_aborted()?;
|
||||
|
||||
let writer = match *self.writer.lock().unwrap() {
|
||||
Some(ref writer) => writer.clone(),
|
||||
let writer = match self.writer.get() {
|
||||
Some(writer) => writer.clone(),
|
||||
None => bail!("not connected"),
|
||||
};
|
||||
|
||||
let command_future = register_image(
|
||||
writer,
|
||||
self.crypt_config.clone(),
|
||||
self.last_manifest.lock().unwrap().clone(),
|
||||
self.last_manifest.get().map(|m| m.clone()),
|
||||
self.registry.clone(),
|
||||
self.known_chunks.clone(),
|
||||
device_name,
|
||||
@ -208,8 +212,8 @@ impl BackupTask {
|
||||
|
||||
self.check_aborted()?;
|
||||
|
||||
let writer = match *self.writer.lock().unwrap() {
|
||||
Some(ref writer) => writer.clone(),
|
||||
let writer = match self.writer.get() {
|
||||
Some(writer) => writer.clone(),
|
||||
None => bail!("not connected"),
|
||||
};
|
||||
|
||||
@ -222,8 +226,8 @@ impl BackupTask {
|
||||
|
||||
self.check_aborted()?;
|
||||
|
||||
let writer = match *self.writer.lock().unwrap() {
|
||||
Some(ref writer) => writer.clone(),
|
||||
let writer = match self.writer.get() {
|
||||
Some(writer) => writer.clone(),
|
||||
None => bail!("not connected"),
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user