From b26a8db3f1c0de4991526ac66911a95dfe02d9a0 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 6 Jul 2020 13:54:21 +0200 Subject: [PATCH] src/worker_task.rs: use OnceCell to avoid locking --- Cargo.toml | 1 + src/worker_task.rs | 54 +++++++++++++++++++++++++--------------------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f0c68ef..6288e68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,5 @@ tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", openssl = "0.10" h2 = { version = "0.2", features = ["stream"] } lazy_static = "1.4" +once_cell = "1.3.1" diff --git a/src/worker_task.rs b/src/worker_task.rs index 699f734..22707a7 100644 --- a/src/worker_task.rs +++ b/src/worker_task.rs @@ -1,6 +1,7 @@ -use anyhow::{bail, Error}; +use anyhow::{format_err, bail, Error}; use std::collections::HashSet; use std::sync::{Mutex, Arc}; +use once_cell::sync::OnceCell; use std::os::raw::c_int; use futures::future::{Future, Either, FutureExt}; @@ -16,12 +17,12 @@ pub(crate) struct BackupTask { setup: BackupSetup, runtime: tokio::runtime::Runtime, crypt_config: Option>, - writer: Mutex>>, - last_manifest: Mutex>>, + writer: OnceCell>, + last_manifest: OnceCell>, registry: Arc>>, known_chunks: Arc>>, abort: tokio::sync::broadcast::Sender<()>, - aborted: Mutex>, // set on abort, conatins abort reason + aborted: OnceCell, // set on abort, conatins abort reason } impl BackupTask { @@ -56,8 +57,8 @@ impl BackupTask { let known_chunks = Arc::new(Mutex::new(HashSet::new())); Ok(Self { runtime, setup, crypt_config, abort, registry, known_chunks, - writer: Mutex::new(None), last_manifest: Mutex::new(None), - aborted: Mutex::new(None) }) + writer: OnceCell::new(), last_manifest: OnceCell::new(), + aborted: OnceCell::new() }) } pub fn runtime(&self) -> tokio::runtime::Handle { @@ -65,7 +66,7 @@ impl BackupTask { } fn check_aborted(&self) -> Result<(), Error> { - if (*self.aborted.lock().unwrap()).is_some() { + if self.aborted.get().is_some() { bail!("task already aborted"); } Ok(()) @@ -76,10 +77,7 @@ impl BackupTask { // should not happen, but log to stderr eprintln!("BackupTask send abort failed."); } - let mut aborted = self.aborted.lock().unwrap(); - if (*aborted).is_none() { - *aborted = Some(reason); - } + let _ = self.aborted.set(reason); } pub async fn connect(&self) -> Result { @@ -100,10 +98,12 @@ impl BackupTask { let mut result = 0; if let Ok(last_manifest) = last_manifest { result = 1; - *self.last_manifest.lock().unwrap() = Some(Arc::new(last_manifest)); + self.last_manifest.set(Arc::new(last_manifest)) + .map_err(|_| format_err!("already connected!"))?; } - *self.writer.lock().unwrap() = Some(writer); + self.writer.set(writer) + .map_err(|_| format_err!("already connected!"))?; Ok(result) }; @@ -120,8 +120,8 @@ impl BackupTask { self.check_aborted()?; - let writer = match *self.writer.lock().unwrap() { - Some(ref writer) => writer.clone(), + let writer = match self.writer.get() { + Some(writer) => writer.clone(), None => bail!("not connected"), }; @@ -145,8 +145,8 @@ impl BackupTask { self.check_aborted()?; - let writer = match *self.writer.lock().unwrap() { - Some(ref writer) => writer.clone(), + let writer = match self.writer.get() { + Some(writer) => writer.clone(), None => bail!("not connected"), }; @@ -166,12 +166,16 @@ impl BackupTask { abortable_command(command_future, abort_rx.recv()).await } + fn last_manifest(&self) -> Option> { + self.last_manifest.get().map(|m| m.clone()) + } + pub fn check_incremental( &self, device_name: String, size: u64, ) -> bool { - check_last_incremental_csum(self.last_manifest.lock().unwrap().clone(), device_name, size) + check_last_incremental_csum(self.last_manifest(), device_name, size) } pub async fn register_image( @@ -183,15 +187,15 @@ impl BackupTask { self.check_aborted()?; - let writer = match *self.writer.lock().unwrap() { - Some(ref writer) => writer.clone(), + let writer = match self.writer.get() { + Some(writer) => writer.clone(), None => bail!("not connected"), }; let command_future = register_image( writer, self.crypt_config.clone(), - self.last_manifest.lock().unwrap().clone(), + self.last_manifest.get().map(|m| m.clone()), self.registry.clone(), self.known_chunks.clone(), device_name, @@ -208,8 +212,8 @@ impl BackupTask { self.check_aborted()?; - let writer = match *self.writer.lock().unwrap() { - Some(ref writer) => writer.clone(), + let writer = match self.writer.get() { + Some(writer) => writer.clone(), None => bail!("not connected"), }; @@ -222,8 +226,8 @@ impl BackupTask { self.check_aborted()?; - let writer = match *self.writer.lock().unwrap() { - Some(ref writer) => writer.clone(), + let writer = match self.writer.get() { + Some(writer) => writer.clone(), None => bail!("not connected"), };