Use async mutex.

This commit is contained in:
David Craven 2020-03-13 08:01:46 -03:00
parent b1122afe85
commit 42329dacc6
2 changed files with 25 additions and 19 deletions

View File

@ -2,11 +2,11 @@
use crate::error::Error;
use crate::repo::{BlockStore, Column, DataStore};
use async_std::path::PathBuf;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bitswap::Block;
use libipld::cid::Cid;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct MemBlockStore {
@ -30,7 +30,7 @@ impl BlockStore for MemBlockStore {
}
async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
let contains = self.blocks.lock().unwrap().contains_key(cid);
let contains = self.blocks.lock().await.contains_key(cid);
Ok(contains)
}
@ -38,7 +38,7 @@ impl BlockStore for MemBlockStore {
let block = self
.blocks
.lock()
.unwrap()
.await
.get(cid)
.map(|block| block.to_owned());
Ok(block)
@ -46,12 +46,12 @@ impl BlockStore for MemBlockStore {
async fn put(&self, block: Block) -> Result<Cid, Error> {
let cid = block.cid().to_owned();
self.blocks.lock().unwrap().insert(cid.clone(), block);
self.blocks.lock().await.insert(cid.clone(), block);
Ok(cid)
}
async fn remove(&self, cid: &Cid) -> Result<(), Error> {
self.blocks.lock().unwrap().remove(cid);
self.blocks.lock().await.remove(cid);
Ok(())
}
}
@ -81,7 +81,7 @@ impl DataStore for MemDataStore {
let map = match col {
Column::Ipns => &self.ipns,
};
let contains = map.lock().unwrap().contains_key(key);
let contains = map.lock().await.contains_key(key);
Ok(contains)
}
@ -89,7 +89,7 @@ impl DataStore for MemDataStore {
let map = match col {
Column::Ipns => &self.ipns,
};
let value = map.lock().unwrap().get(key).map(|value| value.to_owned());
let value = map.lock().await.get(key).map(|value| value.to_owned());
Ok(value)
}
@ -97,7 +97,7 @@ impl DataStore for MemDataStore {
let map = match col {
Column::Ipns => &self.ipns,
};
map.lock().unwrap().insert(key.to_owned(), value.to_owned());
map.lock().await.insert(key.to_owned(), value.to_owned());
Ok(())
}
@ -105,7 +105,7 @@ impl DataStore for MemDataStore {
let map = match col {
Column::Ipns => &self.ipns,
};
map.lock().unwrap().remove(key);
map.lock().await.remove(key);
Ok(())
}
}

View File

@ -3,6 +3,7 @@ use crate::error::Error;
use crate::path::IpfsPath;
use crate::IpfsOptions;
use async_std::path::PathBuf;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bitswap::Block;
use core::fmt::Debug;
@ -14,7 +15,6 @@ use crossbeam::{Receiver, Sender};
use libipld::cid::Cid;
use libp2p::PeerId;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub mod fs;
pub mod mem;
@ -133,8 +133,8 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
/// Puts a block into the block store.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
let cid = self.block_store.put(block.clone()).await?;
if let Some(subscription) = self.subscriptions.lock().unwrap().remove(&cid) {
subscription.lock().unwrap().wake(block);
if let Some(subscription) = self.subscriptions.lock().await.remove(&cid) {
subscription.lock().await.wake(block);
}
// sending only fails if no one is listening anymore
// and that is okay with us.
@ -152,7 +152,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
self.events.send(RepoEvent::WantBlock(cid.clone())).ok();
self.subscriptions
.lock()
.unwrap()
.await
.entry(cid.clone())
.or_default()
.clone()
@ -253,12 +253,18 @@ impl Future for BlockFuture {
type Output = Block;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscription = self.subscription.lock().unwrap();
if let Some(result) = subscription.result() {
Poll::Ready(result)
} else {
subscription.add_waker(context.waker().clone());
Poll::Pending
let future = self.subscription.lock();
futures::pin_mut!(future);
match future.poll(context) {
Poll::Ready(mut subscription) => {
if let Some(result) = subscription.result() {
Poll::Ready(result)
} else {
subscription.add_waker(context.waker().clone());
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}