From 42329dacc6ebd4539490b6dcb27cf3731c087374 Mon Sep 17 00:00:00 2001 From: David Craven Date: Fri, 13 Mar 2020 08:01:46 -0300 Subject: [PATCH] Use async mutex. --- src/repo/mem.rs | 18 +++++++++--------- src/repo/mod.rs | 26 ++++++++++++++++---------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/repo/mem.rs b/src/repo/mem.rs index 8095758b..3758527e 100644 --- a/src/repo/mem.rs +++ b/src/repo/mem.rs @@ -2,11 +2,11 @@ use crate::error::Error; use crate::repo::{BlockStore, Column, DataStore}; use async_std::path::PathBuf; +use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bitswap::Block; use libipld::cid::Cid; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct MemBlockStore { @@ -30,7 +30,7 @@ impl BlockStore for MemBlockStore { } async fn contains(&self, cid: &Cid) -> Result { - let contains = self.blocks.lock().unwrap().contains_key(cid); + let contains = self.blocks.lock().await.contains_key(cid); Ok(contains) } @@ -38,7 +38,7 @@ impl BlockStore for MemBlockStore { let block = self .blocks .lock() - .unwrap() + .await .get(cid) .map(|block| block.to_owned()); Ok(block) @@ -46,12 +46,12 @@ impl BlockStore for MemBlockStore { async fn put(&self, block: Block) -> Result { let cid = block.cid().to_owned(); - self.blocks.lock().unwrap().insert(cid.clone(), block); + self.blocks.lock().await.insert(cid.clone(), block); Ok(cid) } async fn remove(&self, cid: &Cid) -> Result<(), Error> { - self.blocks.lock().unwrap().remove(cid); + self.blocks.lock().await.remove(cid); Ok(()) } } @@ -81,7 +81,7 @@ impl DataStore for MemDataStore { let map = match col { Column::Ipns => &self.ipns, }; - let contains = map.lock().unwrap().contains_key(key); + let contains = map.lock().await.contains_key(key); Ok(contains) } @@ -89,7 +89,7 @@ impl DataStore for MemDataStore { let map = match col { Column::Ipns => &self.ipns, }; - let value = map.lock().unwrap().get(key).map(|value| value.to_owned()); + let value = map.lock().await.get(key).map(|value| value.to_owned()); Ok(value) } @@ -97,7 +97,7 @@ impl DataStore for MemDataStore { let map = match col { Column::Ipns => &self.ipns, }; - map.lock().unwrap().insert(key.to_owned(), value.to_owned()); + map.lock().await.insert(key.to_owned(), value.to_owned()); Ok(()) } @@ -105,7 +105,7 @@ impl DataStore for MemDataStore { let map = match col { Column::Ipns => &self.ipns, }; - map.lock().unwrap().remove(key); + map.lock().await.remove(key); Ok(()) } } diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 380378e4..b3357985 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -3,6 +3,7 @@ use crate::error::Error; use crate::path::IpfsPath; use crate::IpfsOptions; use async_std::path::PathBuf; +use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bitswap::Block; use core::fmt::Debug; @@ -14,7 +15,6 @@ use crossbeam::{Receiver, Sender}; use libipld::cid::Cid; use libp2p::PeerId; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; pub mod fs; pub mod mem; @@ -133,8 +133,8 @@ impl Repo { /// Puts a block into the block store. pub async fn put_block(&self, block: Block) -> Result { let cid = self.block_store.put(block.clone()).await?; - if let Some(subscription) = self.subscriptions.lock().unwrap().remove(&cid) { - subscription.lock().unwrap().wake(block); + if let Some(subscription) = self.subscriptions.lock().await.remove(&cid) { + subscription.lock().await.wake(block); } // sending only fails if no one is listening anymore // and that is okay with us. @@ -152,7 +152,7 @@ impl Repo { self.events.send(RepoEvent::WantBlock(cid.clone())).ok(); self.subscriptions .lock() - .unwrap() + .await .entry(cid.clone()) .or_default() .clone() @@ -253,12 +253,18 @@ impl Future for BlockFuture { type Output = Block; fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll { - let mut subscription = self.subscription.lock().unwrap(); - if let Some(result) = subscription.result() { - Poll::Ready(result) - } else { - subscription.add_waker(context.waker().clone()); - Poll::Pending + let future = self.subscription.lock(); + futures::pin_mut!(future); + match future.poll(context) { + Poll::Ready(mut subscription) => { + if let Some(result) = subscription.result() { + Poll::Ready(result) + } else { + subscription.add_waker(context.waker().clone()); + Poll::Pending + } + } + Poll::Pending => Poll::Pending, } } }