226: Assorted improvements to Repo and SubscriptionRegistry r=koivunej a=ljedrz

Individual commits describe the specific changes; notable ones:
- remove the `Arc` and its clones where not needed
- reduce allocations around subscription wakers
- use `futures:🔒:Mutex` instead of `std::sync::Mutex` where applicable
- add a `wipe` method to the stores (useful for tests/benches)

cc https://github.com/rs-ipfs/rust-ipfs/issues/174

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-07-08 08:29:04 +00:00 committed by GitHub
commit c78aca5881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 72 deletions

View File

@ -61,7 +61,7 @@ impl<T: RepoTypes> SwarmTypes for T {}
impl<T: SwarmTypes + RepoTypes> IpfsTypes for T {}
/// Default IPFS types.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Types;
impl RepoTypes for Types {
type TBlockStore = repo::fs::FsBlockStore;
@ -72,7 +72,7 @@ impl RepoTypes for Types {
}
/// Testing IPFS types
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct TestTypes;
impl RepoTypes for TestTypes {
type TBlockStore = repo::mem::MemBlockStore;
@ -207,9 +207,15 @@ impl<T: IpfsTypes> Default for IpfsOptions<T> {
}
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Ipfs<Types: IpfsTypes>(Arc<IpfsInner<Types>>);
impl<Types: IpfsTypes> Clone for Ipfs<Types> {
fn clone(&self) -> Self {
Ipfs(Arc::clone(&self.0))
}
}
/// Ipfs struct creates a new IPFS node and is the main entry point
/// for interacting with IPFS.
#[derive(Debug)]

View File

@ -9,19 +9,18 @@ use async_std::prelude::*;
use async_trait::async_trait;
use bitswap::Block;
use core::convert::TryFrom;
use futures::lock::Mutex;
use futures::stream::StreamExt;
use libipld::cid::Cid;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::sync::{Arc, Mutex};
use super::{BlockRm, BlockRmError};
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct FsBlockStore {
path: PathBuf,
// FIXME: this lock is not from futures
cids: Arc<Mutex<HashSet<Cid>>>,
cids: Mutex<HashSet<Cid>>,
}
#[async_trait]
@ -40,25 +39,25 @@ impl BlockStore for FsBlockStore {
}
async fn open(&self) -> Result<(), Error> {
let path = self.path.clone();
let cids = self.cids.clone();
let path = &self.path;
let cids = &self.cids;
let mut stream = fs::read_dir(path).await?;
fn append_cid(cids: &Mutex<HashSet<Cid>>, path: PathBuf) {
async fn append_cid(cids: &Mutex<HashSet<Cid>>, path: PathBuf) {
if path.extension() != Some(OsStr::new("data")) {
return;
}
let cid_str = path.file_stem().unwrap();
let cid = Cid::try_from(cid_str.to_str().unwrap()).unwrap();
cids.lock().unwrap_or_else(|p| p.into_inner()).insert(cid);
cids.lock().await.insert(cid);
}
loop {
let dir = stream.next().await;
match dir {
Some(Ok(dir)) => append_cid(&cids, dir.path()),
Some(Ok(dir)) => append_cid(&cids, dir.path()).await,
Some(Err(e)) => return Err(e.into()),
None => return Ok(()),
}
@ -66,7 +65,7 @@ impl BlockStore for FsBlockStore {
}
async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
let contains = self.cids.lock().unwrap().contains(cid);
let contains = self.cids.lock().await.contains(cid);
Ok(contains)
}
@ -91,12 +90,12 @@ impl BlockStore for FsBlockStore {
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
let path = block_path(self.path.clone(), &block.cid());
let cids = self.cids.clone();
let cids = &self.cids;
let data = block.data();
let mut file = fs::File::create(path).await?;
file.write_all(&*data).await?;
file.flush().await?;
let retval = if cids.lock().unwrap().insert(block.cid().to_owned()) {
let retval = if cids.lock().await.insert(block.cid().to_owned()) {
BlockPut::NewBlock
} else {
BlockPut::Existed
@ -110,11 +109,11 @@ impl BlockStore for FsBlockStore {
async fn remove(&self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error> {
let path = block_path(self.path.clone(), cid);
let cids = self.cids.clone();
let cids = &self.cids;
// We want to panic if there's a mutex unlock error
// TODO: Check for pinned blocks here? Instead of repo?
if cids.lock().unwrap().remove(&cid) {
if cids.lock().await.remove(&cid) {
fs::remove_file(path).await?;
Ok(Ok(BlockRm::Removed(cid.clone())))
} else {
@ -124,17 +123,20 @@ impl BlockStore for FsBlockStore {
async fn list(&self) -> Result<Vec<Cid>, Error> {
// unwrapping as we want to panic on poisoned lock
let guard = self.cids.lock().unwrap();
let guard = self.cids.lock().await;
Ok(guard.iter().cloned().collect())
}
async fn wipe(&self) {
self.cids.lock().await.clear();
}
}
#[derive(Clone, Debug)]
#[derive(Debug)]
#[cfg(feature = "rocksdb")]
pub struct RocksDataStore {
path: PathBuf,
// FIXME: this lock is not from futures
db: Arc<Mutex<Option<rocksdb::DB>>>,
db: Mutex<Option<rocksdb::DB>>,
}
#[cfg(feature = "rocksdb")]
@ -147,6 +149,7 @@ impl ResolveColumnFamily for Column {
fn resolve<'a>(&self, db: &'a rocksdb::DB) -> &'a rocksdb::ColumnFamily {
let name = match *self {
Column::Ipns => "ipns",
Column::Pin => "pin",
};
// not sure why this isn't always present?
@ -160,7 +163,7 @@ impl DataStore for RocksDataStore {
fn new(path: PathBuf) -> Self {
RocksDataStore {
path,
db: Arc::new(Mutex::new(None)),
db: Default::default(),
}
}
@ -169,7 +172,7 @@ impl DataStore for RocksDataStore {
}
async fn open(&self) -> Result<(), Error> {
let db = self.db.clone();
let db = &self.db;
let path = self.path.clone();
let mut db_opts = rocksdb::Options::default();
db_opts.create_missing_column_families(true);
@ -178,14 +181,14 @@ impl DataStore for RocksDataStore {
let ipns_opts = rocksdb::Options::default();
let ipns_cf = rocksdb::ColumnFamilyDescriptor::new("ipns", ipns_opts);
let rdb = rocksdb::DB::open_cf_descriptors(&db_opts, &path, vec![ipns_cf])?;
*db.lock().unwrap() = Some(rdb);
*db.lock().await = Some(rdb);
Ok(())
}
async fn contains(&self, col: Column, key: &[u8]) -> Result<bool, Error> {
let db = self.db.clone();
let db = &self.db;
let key = key.to_owned();
let db = db.lock().unwrap();
let db = db.lock().await;
let db = db.as_ref().unwrap();
let cf = col.resolve(db);
let contains = db.get_cf(cf, &key)?.is_some();
@ -193,9 +196,9 @@ impl DataStore for RocksDataStore {
}
async fn get(&self, col: Column, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let db = self.db.clone();
let db = &self.db;
let key = key.to_owned();
let db = db.lock().unwrap();
let db = db.lock().await;
let db = db.as_ref().unwrap();
let cf = col.resolve(db);
let get = db.get_cf(cf, &key)?.map(|value| value.to_vec());
@ -203,10 +206,10 @@ impl DataStore for RocksDataStore {
}
async fn put(&self, col: Column, key: &[u8], value: &[u8]) -> Result<(), Error> {
let db = self.db.clone();
let db = &self.db;
let key = key.to_owned();
let value = value.to_owned();
let db = db.lock().unwrap();
let db = db.lock().await;
let db = db.as_ref().unwrap();
let cf = col.resolve(db);
db.put_cf(cf, &key, &value)?;
@ -214,14 +217,18 @@ impl DataStore for RocksDataStore {
}
async fn remove(&self, col: Column, key: &[u8]) -> Result<(), Error> {
let db = self.db.clone();
let db = &self.db;
let key = key.to_owned();
let db = db.lock().unwrap();
let db = db.lock().await;
let db = db.as_ref().unwrap();
let cf = col.resolve(db);
db.delete_cf(cf, &key)?;
Ok(())
}
async fn wipe(&self) {
// this function is currently only intended to be used with in-memory test setups
}
}
fn block_path(mut base: PathBuf, cid: &Cid) -> PathBuf {

View File

@ -2,7 +2,7 @@
use crate::error::Error;
use crate::repo::{BlockPut, BlockStore, Column, DataStore};
use async_std::path::PathBuf;
use async_std::sync::{Arc, Mutex};
use async_std::sync::Mutex;
use async_trait::async_trait;
use bitswap::Block;
use libipld::cid::Cid;
@ -12,9 +12,9 @@ use super::{BlockRm, BlockRmError};
// FIXME: Transition to Persistent Map to make iterating more consistent
use std::collections::HashMap;
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub struct MemBlockStore {
blocks: Arc<Mutex<HashMap<Cid, Block>>>,
blocks: Mutex<HashMap<Cid, Block>>,
}
#[async_trait]
@ -70,12 +70,16 @@ impl BlockStore for MemBlockStore {
let guard = self.blocks.lock().await;
Ok(guard.iter().map(|(cid, _block)| cid).cloned().collect())
}
async fn wipe(&self) {
self.blocks.lock().await.clear();
}
}
#[derive(Clone, Debug, Default)]
#[derive(Debug, Default)]
pub struct MemDataStore {
ipns: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
pin: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
ipns: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
pin: Mutex<HashMap<Vec<u8>, Vec<u8>>>,
}
#[async_trait]
@ -127,6 +131,11 @@ impl DataStore for MemDataStore {
map.lock().await.remove(key);
Ok(())
}
async fn wipe(&self) {
self.ipns.lock().await.clear();
self.pin.lock().await.clear();
}
}
#[cfg(test)]

View File

@ -4,12 +4,12 @@ use crate::path::IpfsPath;
use crate::subscription::SubscriptionRegistry;
use crate::IpfsOptions;
use async_std::path::PathBuf;
use async_std::sync::Mutex;
use async_trait::async_trait;
use bitswap::Block;
use core::fmt::Debug;
use core::marker::PhantomData;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::lock::Mutex;
use futures::sink::SinkExt;
use libipld::cid::{self, Cid};
use libp2p::core::PeerId;
@ -17,7 +17,7 @@ use libp2p::core::PeerId;
pub mod fs;
pub mod mem;
pub trait RepoTypes: Clone + Send + Sync + 'static {
pub trait RepoTypes: Send + Sync + 'static {
type TBlockStore: BlockStore;
type TDataStore: DataStore;
}
@ -69,7 +69,7 @@ pub enum BlockRmError {
/// This API is being discussed and evolved, which will likely lead to breakage.
#[async_trait]
pub trait BlockStore: Debug + Clone + Send + Sync + Unpin + 'static {
pub trait BlockStore: Debug + Send + Sync + Unpin + 'static {
fn new(path: PathBuf) -> Self;
async fn init(&self) -> Result<(), Error>;
async fn open(&self) -> Result<(), Error>;
@ -78,10 +78,11 @@ pub trait BlockStore: Debug + Clone + Send + Sync + Unpin + 'static {
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
async fn remove(&self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error>;
async fn list(&self) -> Result<Vec<Cid>, Error>;
async fn wipe(&self);
}
#[async_trait]
pub trait DataStore: Debug + Clone + Send + Sync + Unpin + 'static {
pub trait DataStore: Debug + Send + Sync + Unpin + 'static {
fn new(path: PathBuf) -> Self;
async fn init(&self) -> Result<(), Error>;
async fn open(&self) -> Result<(), Error>;
@ -89,6 +90,7 @@ pub trait DataStore: Debug + Clone + Send + Sync + Unpin + 'static {
async fn get(&self, col: Column, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
async fn put(&self, col: Column, key: &[u8], value: &[u8]) -> Result<(), Error>;
async fn remove(&self, col: Column, key: &[u8]) -> Result<(), Error>;
async fn wipe(&self);
}
#[derive(Clone, Copy, Debug)]
@ -216,7 +218,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
self.events
.clone()
// provide only cidv1
.send(RepoEvent::ProvideBlock(cid.clone()))
.send(RepoEvent::ProvideBlock(cid))
.await
.ok();
Ok((original_cid, res))
@ -287,7 +289,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
pub async fn get_ipns(&self, ipns: &PeerId) -> Result<Option<IpfsPath>, Error> {
use std::str::FromStr;
let data_store = self.data_store.clone();
let data_store = &self.data_store;
let key = ipns.to_owned();
let bytes = data_store.get(Column::Ipns, key.as_bytes()).await?;
match bytes {

View File

@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex};
pub struct SubscriptionRegistry<TReq: Debug + Eq + Hash, TRes: Debug> {
subscriptions: HashMap<TReq, Arc<Mutex<Subscription<TRes>>>>,
cancelled: bool,
shutting_down: bool,
}
impl<TReq: Debug + Eq + Hash, TRes: Debug> fmt::Debug for SubscriptionRegistry<TReq, TRes> {
@ -26,16 +26,9 @@ impl<TReq: Debug + Eq + Hash, TRes: Debug> fmt::Debug for SubscriptionRegistry<T
}
impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
pub fn new() -> Self {
Self {
subscriptions: Default::default(),
cancelled: false,
}
}
pub fn create_subscription(&mut self, req: TReq) -> SubscriptionFuture<TRes> {
let subscription = self.subscriptions.entry(req).or_default().clone();
if self.cancelled {
if self.shutting_down {
subscription.lock().unwrap().cancel();
}
SubscriptionFuture { subscription }
@ -49,10 +42,10 @@ impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
/// After shutdown all SubscriptionFutures will return Err(Cancelled)
pub fn shutdown(&mut self) {
if self.cancelled {
if self.shutting_down {
return;
}
self.cancelled = true;
self.shutting_down = true;
log::debug!("Shutting down {:?}", self);
@ -94,7 +87,10 @@ impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
impl<TReq: Debug + Eq + Hash, TRes: Debug> Default for SubscriptionRegistry<TReq, TRes> {
fn default() -> Self {
Self::new()
Self {
subscriptions: Default::default(),
shutting_down: false,
}
}
}
@ -140,18 +136,12 @@ impl<TResult> fmt::Debug for Subscription<TResult> {
}
impl<TResult> Subscription<TResult> {
pub fn new() -> Self {
Self {
result: Default::default(),
wakers: Default::default(),
cancelled: false,
pub fn add_waker(&mut self, waker: &Waker) {
if !self.wakers.iter().any(|w| w.will_wake(waker)) {
self.wakers.push(waker.clone());
}
}
pub fn add_waker(&mut self, waker: Waker) {
self.wakers.push(waker);
}
pub fn wake(&mut self, result: TResult) {
self.result = Some(result);
for waker in self.wakers.drain(..) {
@ -182,7 +172,11 @@ impl<TResult: Clone> Subscription<TResult> {
impl<TResult> Default for Subscription<TResult> {
fn default() -> Self {
Self::new()
Self {
result: Default::default(),
wakers: Default::default(),
cancelled: false,
}
}
}
@ -202,7 +196,7 @@ impl<TResult: Clone> Future for SubscriptionFuture<TResult> {
if let Some(result) = subscription.result() {
Poll::Ready(Ok(result))
} else {
subscription.add_waker(context.waker().clone());
subscription.add_waker(&context.waker());
Poll::Pending
}
}
@ -224,7 +218,7 @@ mod tests {
#[async_std::test]
async fn subscription() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let mut registry = SubscriptionRegistry::<u32, u32>::default();
let s1 = registry.create_subscription(0);
let s2 = registry.create_subscription(0);
registry.finish_subscription(&0, 10);
@ -234,7 +228,7 @@ mod tests {
#[async_std::test]
async fn subscription_cancelled_on_dropping_registry() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let mut registry = SubscriptionRegistry::<u32, u32>::default();
let s1 = registry.create_subscription(0);
drop(registry);
s1.await.unwrap_err();
@ -242,7 +236,7 @@ mod tests {
#[async_std::test]
async fn subscription_cancelled_on_shutdown() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let mut registry = SubscriptionRegistry::<u32, u32>::default();
let s1 = registry.create_subscription(0);
registry.shutdown();
s1.await.unwrap_err();
@ -250,7 +244,7 @@ mod tests {
#[async_std::test]
async fn new_subscriptions_cancelled_after_shutdown() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let mut registry = SubscriptionRegistry::<u32, u32>::default();
registry.shutdown();
let s1 = registry.create_subscription(0);
s1.await.unwrap_err();
@ -261,7 +255,7 @@ mod tests {
use async_std::future::timeout;
use std::time::Duration;
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let mut registry = SubscriptionRegistry::<u32, u32>::default();
let s1 = timeout(Duration::from_millis(1), registry.create_subscription(0));
let s2 = registry.create_subscription(0);