From 3e2e09ee8c34212f05630b71aacbfe6ac348da15 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 20 Aug 2020 14:14:24 +0200 Subject: [PATCH] feat: implement /dht/findprovs Signed-off-by: ljedrz --- http/src/v0.rs | 6 +++-- http/src/v0/dht.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 29 ++++++++++++++++++++++++ src/p2p/behaviour.rs | 38 ++++++++++++++++++++++---------- 4 files changed, 111 insertions(+), 14 deletions(-) diff --git a/http/src/v0.rs b/http/src/v0.rs index 6d922fb3..94e617cb 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -106,7 +106,10 @@ pub fn routes( and_boxed!(warp::path!("put"), dag::put(ipfs)), and_boxed!(warp::path!("resolve"), dag::resolve(ipfs)), )), - warp::path("dht").and(and_boxed!(warp::path!("findpeer"), dht::find_peer(ipfs))), + warp::path("dht").and(combine!( + and_boxed!(warp::path!("findpeer"), dht::find_peer(ipfs)), + and_boxed!(warp::path!("findprovs"), dht::find_providers(ipfs)), + )), warp::path("pubsub").and(combine!( and_boxed!(warp::path!("peers"), pubsub::peers(ipfs)), and_boxed!(warp::path!("ls"), pubsub::list_subscriptions(ipfs)), @@ -127,7 +130,6 @@ pub fn routes( warp::path!("bootstrap" / ..), warp::path!("config" / ..), warp::path!("dht" / "get"), - warp::path!("dht" / "findprovs"), warp::path!("dht" / "provide"), warp::path!("dht" / "put"), warp::path!("dht" / "query"), diff --git a/http/src/v0/dht.rs b/http/src/v0/dht.rs index a31abd38..3c0723ac 100644 --- a/http/src/v0/dht.rs +++ b/http/src/v0/dht.rs @@ -73,3 +73,55 @@ pub fn find_peer( .and(query::()) .and_then(find_peer_query) } + +#[derive(Debug, Deserialize)] +pub struct FindProvidersQuery { + arg: String, + // FIXME: doesn't seem to be used at the moment + verbose: Option, + #[serde(rename = "num-providers")] + num_providers: Option, + timeout: Option>, +} + +async fn find_providers_query( + ipfs: Ipfs, + query: FindProvidersQuery, +) -> Result { + let FindProvidersQuery { + arg, + verbose: _, + num_providers, + timeout, + } = query; + let providers = ipfs + .get_providers(arg.into_bytes()) + .maybe_timeout(timeout.map(StringSerialized::into_inner)) + .await + .map_err(StringError::from)? + .map_err(StringError::from)? + .into_iter() + .take(if let Some(n) = num_providers { n } else { 20 }) + .map(|peer_id| ResponsesMember { + addrs: vec![], + id: peer_id.to_string(), + }) + .collect(); + + let response = Response { + extra: Default::default(), + id: Default::default(), + responses: providers, + r#type: 2, + }; + + Ok(warp::reply::json(&response)) +} + +pub fn find_providers( + ipfs: &Ipfs, +) -> impl Filter + Clone { + with_ipfs(ipfs) + .and(query::()) + .and_then(find_providers_query) +} diff --git a/src/lib.rs b/src/lib.rs index 00c82085..63299035 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub use libp2p::core::{ connection::ListenerId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; pub use libp2p::identity::Keypair; +use libp2p::kad::record::Key; use libp2p::swarm::NetworkBehaviour; use std::path::PathBuf; use tracing::Span; @@ -268,6 +269,7 @@ enum IpfsEvent { bool, OneshotSender, SubscriptionFuture>>, ), + GetProviders(Key, OneshotSender>), Exit, } @@ -746,6 +748,29 @@ impl Ipfs { .await } + /// Performs a DHT lookup for providers of a value to the given key. + pub async fn get_providers>(&self, key: T) -> Result, Error> { + let kad_result = async move { + let (tx, rx) = oneshot_channel(); + + self.to_task + .clone() + .send(IpfsEvent::GetProviders(key.into(), tx)) + .await?; + + Ok(rx.await?).map_err(|e: String| anyhow!(e)) + } + .instrument(self.span.clone()) + .await? + .await; + + match kad_result { + Ok(KadResult::Providers(providers)) => Ok(providers), + Ok(_) => unreachable!(), + Err(e) => Err(anyhow!(e)), + } + } + /// Exit daemon. pub async fn exit_daemon(self) { // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of @@ -1047,6 +1072,10 @@ impl Future for IpfsFuture { }; let _ = ret.send(addrs); } + IpfsEvent::GetProviders(key, ret) => { + let future = self.swarm.get_providers(key); + let _ = ret.send(future); + } IpfsEvent::Exit => { // FIXME: we could do a proper teardown return Poll::Ready(()); diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index eb8a316a..b05dd850 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -9,7 +9,7 @@ use bitswap::{Bitswap, BitswapEvent}; use cid::Cid; use libp2p::core::{Multiaddr, PeerId}; use libp2p::identify::{Identify, IdentifyEvent}; -use libp2p::kad::record::store::MemoryStore; +use libp2p::kad::record::{store::MemoryStore, Key}; use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent}; use libp2p::mdns::{MdnsEvent, TokioMdns}; use libp2p::ping::{Ping, PingEvent}; @@ -40,6 +40,7 @@ pub struct Behaviour { #[derive(Debug, Clone, PartialEq)] pub enum KadResult { Complete, + Providers(Vec), } impl NetworkBehaviourEventProcess<()> for Behaviour { @@ -78,7 +79,8 @@ impl NetworkBehaviourEventProcess for Behaviour match event { QueryResult { result, id, .. } => { - if self.kademlia.query(&id).is_none() { + // only some subscriptions return actual values + if !matches!(result, GetProviders(_)) && self.kademlia.query(&id).is_none() { self.kad_subscriptions .finish_subscription(id.into(), Ok(KadResult::Complete)); } @@ -112,23 +114,30 @@ impl NetworkBehaviourEventProcess for Behaviour } } GetProviders(Ok(GetProvidersOk { - key, + key: _, providers, - closest_peers, + closest_peers: _, })) => { - let key = multibase::encode(Base::Base32Lower, key); - if providers.is_empty() && closest_peers.is_empty() { - warn!("kad: could not find a provider for {}", key); - } else { - for peer in closest_peers.into_iter().chain(providers.into_iter()) { - debug!("kad: {} is provided by {}", key, peer); - self.bitswap.connect(peer); - } + if self.kademlia.query(&id).is_none() { + let providers = providers.into_iter().collect::>(); + + self.kad_subscriptions.finish_subscription( + id.into(), + Ok(KadResult::Providers(providers)), + ); } } GetProviders(Err(GetProvidersError::Timeout { key, .. })) => { let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out trying to get providers for {}", key); + + if self.kademlia.query(&id).is_none() { + self.kad_subscriptions.finish_subscription( + id.into(), + Err("timed out trying to obtain providers for the given key" + .to_string()), + ); + } } StartProviding(Ok(AddProviderOk { key })) => { let key = multibase::encode(Base::Base32Lower, key); @@ -498,6 +507,11 @@ impl Behaviour { self.kad_subscriptions .create_subscription(self.kademlia.get_closest_peers(id.as_bytes()).into(), None) } + + pub fn get_providers(&mut self, key: Key) -> SubscriptionFuture { + self.kad_subscriptions + .create_subscription(self.kademlia.get_providers(key).into(), None) + } } /// Create a IPFS behaviour with the IPFS bootstrap nodes.