Merge #210
210: A round of cleanups r=koivunej a=ljedrz Assorted drive-by cleanups and small refactorings; individual commits describe the specific changes. Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
commit
008693fcd1
@ -16,6 +16,8 @@ use std::io;
|
||||
// https://github.com/ipfs/js-ipfs-bitswap/blob/d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16
|
||||
const MAX_BUF_SIZE: usize = 524_288;
|
||||
|
||||
type FutureResult<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct BitswapConfig {}
|
||||
|
||||
@ -35,8 +37,7 @@ where
|
||||
{
|
||||
type Output = Message;
|
||||
type Error = BitswapError;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
type Future = FutureResult<Self::Output, Self::Error>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade_inbound(self, mut socket: TSocket, info: Self::Info) -> Self::Future {
|
||||
@ -66,8 +67,7 @@ where
|
||||
{
|
||||
type Output = ();
|
||||
type Error = io::Error;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
type Future = FutureResult<Self::Output, Self::Error>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade_outbound(self, mut socket: TSocket, info: Self::Info) -> Self::Future {
|
||||
|
@ -51,13 +51,10 @@ fn main() {
|
||||
// FIXME: need to process cmdline args here, but trying to understand js-ipfsd-ctl right now a
|
||||
// bit more.
|
||||
|
||||
let home = match home {
|
||||
Some(path) => path,
|
||||
None => {
|
||||
eprintln!("IPFS_PATH and HOME unset");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
let home = home.unwrap_or_else(|| {
|
||||
eprintln!("IPFS_PATH and HOME unset");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let config_path = home.join("config");
|
||||
|
||||
|
@ -68,7 +68,7 @@ async fn inner_peers<T: IpfsTypes>(
|
||||
topic: Option<String>,
|
||||
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
let peers = ipfs
|
||||
.pubsub_peers(topic.as_deref())
|
||||
.pubsub_peers(topic)
|
||||
.await
|
||||
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
|
||||
|
||||
@ -112,8 +112,7 @@ async fn inner_publish<T: IpfsTypes>(
|
||||
ipfs: Ipfs<T>,
|
||||
PublishArgs { topic, message }: PublishArgs,
|
||||
) -> Result<impl warp::Reply, warp::Rejection> {
|
||||
// FIXME: perhaps these should be taken by value as they are always moved?
|
||||
ipfs.pubsub_publish(&topic, &message.into_inner())
|
||||
ipfs.pubsub_publish(topic, message.into_inner())
|
||||
.await
|
||||
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
|
||||
Ok(warp::reply::reply())
|
||||
@ -161,7 +160,7 @@ async fn inner_subscribe<T: IpfsTypes>(
|
||||
|
||||
// the returned stream needs to be set up to be shoveled in a background task
|
||||
let shoveled = ipfs
|
||||
.pubsub_subscribe(&topic)
|
||||
.pubsub_subscribe(topic.clone())
|
||||
.await
|
||||
.expect("new subscriptions shouldn't fail while holding the lock");
|
||||
|
||||
@ -257,7 +256,7 @@ async fn shovel<T: IpfsTypes>(
|
||||
topic
|
||||
);
|
||||
shoveled = ipfs
|
||||
.pubsub_subscribe(&topic)
|
||||
.pubsub_subscribe(topic.clone())
|
||||
.await
|
||||
.expect("new subscriptions shouldn't fail while holding the lock");
|
||||
} else {
|
||||
@ -319,11 +318,11 @@ struct PubsubHttpApiMessage {
|
||||
topics: Vec<String>,
|
||||
}
|
||||
|
||||
impl<'a, T> From<&'a T> for PubsubHttpApiMessage
|
||||
impl<T> From<T> for PubsubHttpApiMessage
|
||||
where
|
||||
T: AsRef<ipfs::PubsubMessage>,
|
||||
{
|
||||
fn from(msg: &'a T) -> Self {
|
||||
fn from(msg: T) -> Self {
|
||||
use multibase::Base::Base64Pad;
|
||||
let msg = msg.as_ref();
|
||||
|
||||
|
12
src/lib.rs
12
src/lib.rs
@ -471,12 +471,12 @@ impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
/// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
|
||||
/// The subscription can be unsubscribed by dropping the stream or calling
|
||||
/// [`pubsub_unsubscribe`].
|
||||
pub async fn pubsub_subscribe(&self, topic: &str) -> Result<SubscriptionStream, Error> {
|
||||
pub async fn pubsub_subscribe(&self, topic: String) -> Result<SubscriptionStream, Error> {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::PubsubSubscribe(topic.into(), tx))
|
||||
.send(IpfsEvent::PubsubSubscribe(topic.clone(), tx))
|
||||
.await?;
|
||||
|
||||
rx.await?
|
||||
@ -484,12 +484,12 @@ impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
}
|
||||
|
||||
/// Publishes to the topic which may have been subscribed to earlier
|
||||
pub async fn pubsub_publish(&self, topic: &str, data: &[u8]) -> Result<(), Error> {
|
||||
pub async fn pubsub_publish(&self, topic: String, data: Vec<u8>) -> Result<(), Error> {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::PubsubPublish(topic.into(), data.to_vec(), tx))
|
||||
.send(IpfsEvent::PubsubPublish(topic, data, tx))
|
||||
.await?;
|
||||
|
||||
Ok(rx.await?)
|
||||
@ -508,12 +508,12 @@ impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
}
|
||||
|
||||
/// Returns all known pubsub peers with the optional topic filter
|
||||
pub async fn pubsub_peers(&self, topic: Option<&str>) -> Result<Vec<PeerId>, Error> {
|
||||
pub async fn pubsub_peers(&self, topic: Option<String>) -> Result<Vec<PeerId>, Error> {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::PubsubPeers(topic.map(String::from), tx))
|
||||
.send(IpfsEvent::PubsubPeers(topic, tx))
|
||||
.await?;
|
||||
|
||||
Ok(rx.await?)
|
||||
|
@ -29,7 +29,7 @@ impl BlockStore for FsBlockStore {
|
||||
fn new(path: PathBuf) -> Self {
|
||||
FsBlockStore {
|
||||
path,
|
||||
cids: Arc::new(Mutex::new(HashSet::new())),
|
||||
cids: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ impl BlockStore for FsBlockStore {
|
||||
|
||||
let mut stream = fs::read_dir(path).await?;
|
||||
|
||||
fn append_cid(cids: &Arc<Mutex<HashSet<Cid>>>, path: PathBuf) {
|
||||
fn append_cid(cids: &Mutex<HashSet<Cid>>, path: PathBuf) {
|
||||
if path.extension() != Some(OsStr::new("data")) {
|
||||
return;
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ use super::{BlockRm, BlockRmError};
|
||||
// FIXME: Transition to Persistent Map to make iterating more consistent
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct MemBlockStore {
|
||||
blocks: Arc<Mutex<HashMap<Cid, Block>>>,
|
||||
}
|
||||
@ -20,9 +20,7 @@ pub struct MemBlockStore {
|
||||
#[async_trait]
|
||||
impl BlockStore for MemBlockStore {
|
||||
fn new(_path: PathBuf) -> Self {
|
||||
MemBlockStore {
|
||||
blocks: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
Default::default()
|
||||
}
|
||||
|
||||
async fn init(&self) -> Result<(), Error> {
|
||||
@ -74,7 +72,7 @@ impl BlockStore for MemBlockStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct MemDataStore {
|
||||
ipns: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
|
||||
pin: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
|
||||
@ -83,10 +81,7 @@ pub struct MemDataStore {
|
||||
#[async_trait]
|
||||
impl DataStore for MemDataStore {
|
||||
fn new(_path: PathBuf) -> Self {
|
||||
MemDataStore {
|
||||
ipns: Arc::new(Mutex::new(HashMap::new())),
|
||||
pin: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
Default::default()
|
||||
}
|
||||
|
||||
async fn init(&self) -> Result<(), Error> {
|
||||
|
@ -9,27 +9,27 @@ const MDNS: bool = false;
|
||||
#[async_std::test]
|
||||
async fn subscribe_only_once() {
|
||||
let a = Node::new(MDNS).await;
|
||||
let _stream = a.pubsub_subscribe("some_topic").await.unwrap();
|
||||
a.pubsub_subscribe("some_topic").await.unwrap_err();
|
||||
let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap();
|
||||
a.pubsub_subscribe("some_topic".into()).await.unwrap_err();
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn resubscribe_after_unsubscribe() {
|
||||
let a = Node::new(MDNS).await;
|
||||
|
||||
let mut stream = a.pubsub_subscribe("topic").await.unwrap();
|
||||
let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap();
|
||||
a.pubsub_unsubscribe("topic").await.unwrap();
|
||||
// sender has been dropped
|
||||
assert_eq!(stream.next().await, None);
|
||||
|
||||
drop(a.pubsub_subscribe("topic").await.unwrap());
|
||||
drop(a.pubsub_subscribe("topic".into()).await.unwrap());
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
async fn unsubscribe_via_drop() {
|
||||
let a = Node::new(MDNS).await;
|
||||
|
||||
let msgs = a.pubsub_subscribe("topic").await.unwrap();
|
||||
let msgs = a.pubsub_subscribe("topic".into()).await.unwrap();
|
||||
assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]);
|
||||
|
||||
drop(msgs);
|
||||
@ -41,7 +41,9 @@ async fn unsubscribe_via_drop() {
|
||||
#[async_std::test]
|
||||
async fn can_publish_without_subscribing() {
|
||||
let a = Node::new(MDNS).await;
|
||||
a.pubsub_publish("topic", b"foobar").await.unwrap()
|
||||
a.pubsub_publish("topic".into(), b"foobar".to_vec())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
@ -52,16 +54,22 @@ async fn publish_between_two_nodes() {
|
||||
|
||||
let ((a, a_id), (b, b_id)) = two_connected_nodes().await;
|
||||
|
||||
let topic = "shared";
|
||||
let topic = "shared".to_owned();
|
||||
|
||||
let mut a_msgs = a.pubsub_subscribe(topic).await.unwrap();
|
||||
let mut b_msgs = b.pubsub_subscribe(topic).await.unwrap();
|
||||
let mut a_msgs = a.pubsub_subscribe(topic.clone()).await.unwrap();
|
||||
let mut b_msgs = b.pubsub_subscribe(topic.clone()).await.unwrap();
|
||||
|
||||
// need to wait to see both sides so that the messages will get through
|
||||
let mut appeared = false;
|
||||
for _ in 0..100usize {
|
||||
if a.pubsub_peers(Some(topic)).await.unwrap().contains(&b_id)
|
||||
&& b.pubsub_peers(Some(topic)).await.unwrap().contains(&a_id)
|
||||
if a.pubsub_peers(Some(topic.clone()))
|
||||
.await
|
||||
.unwrap()
|
||||
.contains(&b_id)
|
||||
&& b.pubsub_peers(Some(topic.clone()))
|
||||
.await
|
||||
.unwrap()
|
||||
.contains(&a_id)
|
||||
{
|
||||
appeared = true;
|
||||
break;
|
||||
@ -76,21 +84,22 @@ async fn publish_between_two_nodes() {
|
||||
"timed out before both nodes appeared as pubsub peers"
|
||||
);
|
||||
|
||||
a.pubsub_publish(topic, b"foobar").await.unwrap();
|
||||
b.pubsub_publish(topic, b"barfoo").await.unwrap();
|
||||
a.pubsub_publish(topic.clone(), b"foobar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
b.pubsub_publish(topic.clone(), b"barfoo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// the order is not defined, but both should see the other's message and the message they sent
|
||||
let expected = [(&[topic], &a_id, b"foobar"), (&[topic], &b_id, b"barfoo")]
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|(topics, id, data)| {
|
||||
(
|
||||
topics.iter().map(|&s| s.to_string()).collect::<Vec<_>>(),
|
||||
id.clone(),
|
||||
data.to_vec(),
|
||||
)
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
let expected = [
|
||||
(&[topic.clone()], &a_id, b"foobar"),
|
||||
(&[topic.clone()], &b_id, b"barfoo"),
|
||||
]
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|(topics, id, data)| (topics.to_vec(), id.clone(), data.to_vec()))
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] {
|
||||
let actual = st
|
||||
@ -108,7 +117,12 @@ async fn publish_between_two_nodes() {
|
||||
|
||||
let mut disappeared = false;
|
||||
for _ in 0..100usize {
|
||||
if !a.pubsub_peers(Some(topic)).await.unwrap().contains(&b_id) {
|
||||
if !a
|
||||
.pubsub_peers(Some(topic.clone()))
|
||||
.await
|
||||
.unwrap()
|
||||
.contains(&b_id)
|
||||
{
|
||||
disappeared = true;
|
||||
break;
|
||||
}
|
||||
|
@ -17,6 +17,8 @@ pub struct IdleFileVisit {
|
||||
range: Option<Range<u64>>,
|
||||
}
|
||||
|
||||
type FileVisitResult<'a> = (&'a [u8], u64, Metadata, Option<FileVisit>);
|
||||
|
||||
impl IdleFileVisit {
|
||||
/// Target range represents the target byte range of the file we are interested in visiting.
|
||||
pub fn with_target_range(self, range: Range<u64>) -> Self {
|
||||
@ -27,31 +29,25 @@ impl IdleFileVisit {
|
||||
///
|
||||
/// Returns (on success) a tuple of file bytes, total file size, any metadata associated, and
|
||||
/// optionally a `FileVisit` to continue the walk.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn start(
|
||||
self,
|
||||
block: &[u8],
|
||||
) -> Result<(&[u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
|
||||
pub fn start(self, block: &'_ [u8]) -> Result<FileVisitResult<'_>, FileReadFailed> {
|
||||
let fr = FileReader::from_block(block)?;
|
||||
self.start_from_reader(fr, &mut None)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(crate) fn start_from_parsed<'a>(
|
||||
self,
|
||||
block: FlatUnixFs<'a>,
|
||||
cache: &'_ mut Option<Cache>,
|
||||
) -> Result<(&'a [u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
|
||||
) -> Result<FileVisitResult<'a>, FileReadFailed> {
|
||||
let fr = FileReader::from_parsed(block)?;
|
||||
self.start_from_reader(fr, cache)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn start_from_reader<'a>(
|
||||
self,
|
||||
fr: FileReader<'a>,
|
||||
cache: &'_ mut Option<Cache>,
|
||||
) -> Result<(&'a [u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
|
||||
) -> Result<FileVisitResult<'a>, FileReadFailed> {
|
||||
let metadata = fr.as_ref().to_owned();
|
||||
|
||||
let (content, traversal) = fr.content();
|
||||
|
@ -1,5 +1,4 @@
|
||||
// Modified automatically generated rust module for 'merkledag.proto' file
|
||||
#![allow(rust_2018_idioms)]
|
||||
#![allow(non_snake_case)]
|
||||
#![allow(non_upper_case_globals)]
|
||||
#![allow(non_camel_case_types)]
|
||||
|
@ -1,5 +1,4 @@
|
||||
// Modified automatically generated rust module for 'unixfs.proto' file
|
||||
#![allow(rust_2018_idioms)]
|
||||
#![allow(non_snake_case)]
|
||||
#![allow(non_upper_case_globals)]
|
||||
#![allow(non_camel_case_types)]
|
||||
|
Loading…
Reference in New Issue
Block a user