feat: implement /dht/findprovs
Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
parent
a33ee5fe09
commit
3e2e09ee8c
@ -106,7 +106,10 @@ pub fn routes<T: IpfsTypes>(
|
||||
and_boxed!(warp::path!("put"), dag::put(ipfs)),
|
||||
and_boxed!(warp::path!("resolve"), dag::resolve(ipfs)),
|
||||
)),
|
||||
warp::path("dht").and(and_boxed!(warp::path!("findpeer"), dht::find_peer(ipfs))),
|
||||
warp::path("dht").and(combine!(
|
||||
and_boxed!(warp::path!("findpeer"), dht::find_peer(ipfs)),
|
||||
and_boxed!(warp::path!("findprovs"), dht::find_providers(ipfs)),
|
||||
)),
|
||||
warp::path("pubsub").and(combine!(
|
||||
and_boxed!(warp::path!("peers"), pubsub::peers(ipfs)),
|
||||
and_boxed!(warp::path!("ls"), pubsub::list_subscriptions(ipfs)),
|
||||
@ -127,7 +130,6 @@ pub fn routes<T: IpfsTypes>(
|
||||
warp::path!("bootstrap" / ..),
|
||||
warp::path!("config" / ..),
|
||||
warp::path!("dht" / "get"),
|
||||
warp::path!("dht" / "findprovs"),
|
||||
warp::path!("dht" / "provide"),
|
||||
warp::path!("dht" / "put"),
|
||||
warp::path!("dht" / "query"),
|
||||
|
@ -73,3 +73,55 @@ pub fn find_peer<T: IpfsTypes>(
|
||||
.and(query::<FindPeerQuery>())
|
||||
.and_then(find_peer_query)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct FindProvidersQuery {
|
||||
arg: String,
|
||||
// FIXME: doesn't seem to be used at the moment
|
||||
verbose: Option<bool>,
|
||||
#[serde(rename = "num-providers")]
|
||||
num_providers: Option<usize>,
|
||||
timeout: Option<StringSerialized<humantime::Duration>>,
|
||||
}
|
||||
|
||||
async fn find_providers_query<T: IpfsTypes>(
|
||||
ipfs: Ipfs<T>,
|
||||
query: FindProvidersQuery,
|
||||
) -> Result<impl Reply, Rejection> {
|
||||
let FindProvidersQuery {
|
||||
arg,
|
||||
verbose: _,
|
||||
num_providers,
|
||||
timeout,
|
||||
} = query;
|
||||
let providers = ipfs
|
||||
.get_providers(arg.into_bytes())
|
||||
.maybe_timeout(timeout.map(StringSerialized::into_inner))
|
||||
.await
|
||||
.map_err(StringError::from)?
|
||||
.map_err(StringError::from)?
|
||||
.into_iter()
|
||||
.take(if let Some(n) = num_providers { n } else { 20 })
|
||||
.map(|peer_id| ResponsesMember {
|
||||
addrs: vec![],
|
||||
id: peer_id.to_string(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let response = Response {
|
||||
extra: Default::default(),
|
||||
id: Default::default(),
|
||||
responses: providers,
|
||||
r#type: 2,
|
||||
};
|
||||
|
||||
Ok(warp::reply::json(&response))
|
||||
}
|
||||
|
||||
pub fn find_providers<T: IpfsTypes>(
|
||||
ipfs: &Ipfs<T>,
|
||||
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
|
||||
with_ipfs(ipfs)
|
||||
.and(query::<FindProvidersQuery>())
|
||||
.and_then(find_providers_query)
|
||||
}
|
||||
|
29
src/lib.rs
29
src/lib.rs
@ -20,6 +20,7 @@ pub use libp2p::core::{
|
||||
connection::ListenerId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
|
||||
};
|
||||
pub use libp2p::identity::Keypair;
|
||||
use libp2p::kad::record::Key;
|
||||
use libp2p::swarm::NetworkBehaviour;
|
||||
use std::path::PathBuf;
|
||||
use tracing::Span;
|
||||
@ -268,6 +269,7 @@ enum IpfsEvent {
|
||||
bool,
|
||||
OneshotSender<Either<Vec<Multiaddr>, SubscriptionFuture<KadResult, String>>>,
|
||||
),
|
||||
GetProviders(Key, OneshotSender<SubscriptionFuture<KadResult, String>>),
|
||||
Exit,
|
||||
}
|
||||
|
||||
@ -746,6 +748,29 @@ impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Performs a DHT lookup for providers of a value to the given key.
|
||||
pub async fn get_providers<T: Into<Key>>(&self, key: T) -> Result<Vec<PeerId>, Error> {
|
||||
let kad_result = async move {
|
||||
let (tx, rx) = oneshot_channel();
|
||||
|
||||
self.to_task
|
||||
.clone()
|
||||
.send(IpfsEvent::GetProviders(key.into(), tx))
|
||||
.await?;
|
||||
|
||||
Ok(rx.await?).map_err(|e: String| anyhow!(e))
|
||||
}
|
||||
.instrument(self.span.clone())
|
||||
.await?
|
||||
.await;
|
||||
|
||||
match kad_result {
|
||||
Ok(KadResult::Providers(providers)) => Ok(providers),
|
||||
Ok(_) => unreachable!(),
|
||||
Err(e) => Err(anyhow!(e)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Exit daemon.
|
||||
pub async fn exit_daemon(self) {
|
||||
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
|
||||
@ -1047,6 +1072,10 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
|
||||
};
|
||||
let _ = ret.send(addrs);
|
||||
}
|
||||
IpfsEvent::GetProviders(key, ret) => {
|
||||
let future = self.swarm.get_providers(key);
|
||||
let _ = ret.send(future);
|
||||
}
|
||||
IpfsEvent::Exit => {
|
||||
// FIXME: we could do a proper teardown
|
||||
return Poll::Ready(());
|
||||
|
@ -9,7 +9,7 @@ use bitswap::{Bitswap, BitswapEvent};
|
||||
use cid::Cid;
|
||||
use libp2p::core::{Multiaddr, PeerId};
|
||||
use libp2p::identify::{Identify, IdentifyEvent};
|
||||
use libp2p::kad::record::store::MemoryStore;
|
||||
use libp2p::kad::record::{store::MemoryStore, Key};
|
||||
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent};
|
||||
use libp2p::mdns::{MdnsEvent, TokioMdns};
|
||||
use libp2p::ping::{Ping, PingEvent};
|
||||
@ -40,6 +40,7 @@ pub struct Behaviour<Types: IpfsTypes> {
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum KadResult {
|
||||
Complete,
|
||||
Providers(Vec<PeerId>),
|
||||
}
|
||||
|
||||
impl<Types: IpfsTypes> NetworkBehaviourEventProcess<()> for Behaviour<Types> {
|
||||
@ -78,7 +79,8 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
|
||||
|
||||
match event {
|
||||
QueryResult { result, id, .. } => {
|
||||
if self.kademlia.query(&id).is_none() {
|
||||
// only some subscriptions return actual values
|
||||
if !matches!(result, GetProviders(_)) && self.kademlia.query(&id).is_none() {
|
||||
self.kad_subscriptions
|
||||
.finish_subscription(id.into(), Ok(KadResult::Complete));
|
||||
}
|
||||
@ -112,23 +114,30 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
|
||||
}
|
||||
}
|
||||
GetProviders(Ok(GetProvidersOk {
|
||||
key,
|
||||
key: _,
|
||||
providers,
|
||||
closest_peers,
|
||||
closest_peers: _,
|
||||
})) => {
|
||||
let key = multibase::encode(Base::Base32Lower, key);
|
||||
if providers.is_empty() && closest_peers.is_empty() {
|
||||
warn!("kad: could not find a provider for {}", key);
|
||||
} else {
|
||||
for peer in closest_peers.into_iter().chain(providers.into_iter()) {
|
||||
debug!("kad: {} is provided by {}", key, peer);
|
||||
self.bitswap.connect(peer);
|
||||
}
|
||||
if self.kademlia.query(&id).is_none() {
|
||||
let providers = providers.into_iter().collect::<Vec<_>>();
|
||||
|
||||
self.kad_subscriptions.finish_subscription(
|
||||
id.into(),
|
||||
Ok(KadResult::Providers(providers)),
|
||||
);
|
||||
}
|
||||
}
|
||||
GetProviders(Err(GetProvidersError::Timeout { key, .. })) => {
|
||||
let key = multibase::encode(Base::Base32Lower, key);
|
||||
warn!("kad: timed out trying to get providers for {}", key);
|
||||
|
||||
if self.kademlia.query(&id).is_none() {
|
||||
self.kad_subscriptions.finish_subscription(
|
||||
id.into(),
|
||||
Err("timed out trying to obtain providers for the given key"
|
||||
.to_string()),
|
||||
);
|
||||
}
|
||||
}
|
||||
StartProviding(Ok(AddProviderOk { key })) => {
|
||||
let key = multibase::encode(Base::Base32Lower, key);
|
||||
@ -498,6 +507,11 @@ impl<Types: IpfsTypes> Behaviour<Types> {
|
||||
self.kad_subscriptions
|
||||
.create_subscription(self.kademlia.get_closest_peers(id.as_bytes()).into(), None)
|
||||
}
|
||||
|
||||
pub fn get_providers(&mut self, key: Key) -> SubscriptionFuture<KadResult, String> {
|
||||
self.kad_subscriptions
|
||||
.create_subscription(self.kademlia.get_providers(key).into(), None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a IPFS behaviour with the IPFS bootstrap nodes.
|
||||
|
Loading…
x
Reference in New Issue
Block a user