320: Implement Ipfs::find_peer and /dht/findpeer r=koivunej a=ljedrz

Builds on https://github.com/rs-ipfs/rust-ipfs/pull/319, only the last 2 commits are new.

Introduces `Ipfs::find_peer` and `/dht/findpeer` that return addresses belonging to the given `PeerId`; if they are connected to already, the values are returned right away, otherwise the local DHT records are checked.

Blocked by https://github.com/rs-ipfs/rust-ipfs/pull/319.



Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-08-20 10:18:15 +00:00 committed by GitHub
commit 140a86bdcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 5 deletions

View File

@ -4,6 +4,7 @@ use warp::{query, Filter};
pub mod bitswap;
pub mod block;
pub mod dag;
pub mod dht;
pub mod id;
pub mod pubsub;
pub mod refs;
@ -105,6 +106,7 @@ pub fn routes<T: IpfsTypes>(
and_boxed!(warp::path!("put"), dag::put(ipfs)),
and_boxed!(warp::path!("resolve"), dag::resolve(ipfs)),
)),
and_boxed!(warp::path!("dht" / "findpeer"), dht::find_peer(ipfs)),
warp::path("pubsub").and(combine!(
and_boxed!(warp::path!("peers"), pubsub::peers(ipfs)),
and_boxed!(warp::path!("ls"), pubsub::list_subscriptions(ipfs)),
@ -124,7 +126,11 @@ pub fn routes<T: IpfsTypes>(
combine_unify!(
warp::path!("bootstrap" / ..),
warp::path!("config" / ..),
warp::path!("dht" / ..),
warp::path!("dht" / "get"),
warp::path!("dht" / "findprovs"),
warp::path!("dht" / "provide"),
warp::path!("dht" / "put"),
warp::path!("dht" / "query"),
warp::path!("key" / ..),
warp::path!("name" / ..),
warp::path!("object" / ..),

75
http/src/v0/dht.rs Normal file
View File

@ -0,0 +1,75 @@
use crate::v0::support::{with_ipfs, MaybeTimeoutExt, StringError, StringSerialized};
use ipfs::{Ipfs, IpfsTypes, PeerId};
use serde::{Deserialize, Serialize};
use warp::{query, Filter, Rejection, Reply};
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
struct Response {
// blank
extra: String,
// blank
#[serde(rename = "ID")]
id: String,
// the actual response
responses: Vec<ResponsesMember>,
// TODO: what's this?
r#type: usize,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
struct ResponsesMember {
// Multiaddrs
addrs: Vec<String>,
// PeerId
#[serde(rename = "ID")]
id: String,
}
#[derive(Debug, Deserialize)]
pub struct FindPeerQuery {
arg: String,
// FIXME: doesn't seem to be used at the moment
verbose: Option<bool>,
timeout: Option<StringSerialized<humantime::Duration>>,
}
async fn find_peer_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: FindPeerQuery,
) -> Result<impl Reply, Rejection> {
let FindPeerQuery {
arg,
verbose: _,
timeout,
} = query;
let peer_id = arg.parse::<PeerId>().map_err(StringError::from)?;
let addrs = ipfs
.find_peer(peer_id.clone())
.maybe_timeout(timeout.map(StringSerialized::into_inner))
.await
.map_err(StringError::from)?
.map_err(StringError::from)?
.into_iter()
.map(|addr| addr.to_string())
.collect();
let id = peer_id.to_string();
let response = Response {
extra: Default::default(),
id: Default::default(),
responses: vec![ResponsesMember { addrs, id }],
r#type: 2,
};
Ok(warp::reply::json(&response))
}
pub fn find_peer<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
with_ipfs(ipfs)
.and(query::<FindPeerQuery>())
.and_then(find_peer_query)
}

View File

@ -19,6 +19,7 @@ pub use libp2p::core::{
connection::ListenerId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
pub use libp2p::identity::Keypair;
use libp2p::swarm::NetworkBehaviour;
use std::path::PathBuf;
use tracing::Span;
use tracing_futures::Instrument;
@ -49,7 +50,7 @@ pub use self::error::Error;
use self::ipns::Ipns;
pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream};
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
pub use self::p2p::{Connection, KadResult, MultiaddrWithPeerId};
pub use self::p2p::{Connection, KadResult, MultiaddrWithPeerId, MultiaddrWithoutPeerId};
pub use self::path::IpfsPath;
pub use self::repo::RepoTypes;
use self::repo::{create_repo, Repo, RepoEvent, RepoOptions};
@ -261,6 +262,7 @@ enum IpfsEvent {
AddPeer(PeerId, Multiaddr),
GetClosestPeers(PeerId, OneshotSender<SubscriptionFuture<KadResult, String>>),
GetBitswapPeers(OneshotSender<Vec<PeerId>>),
FindPeer(PeerId, OneshotSender<Vec<Multiaddr>>),
Exit,
}
@ -705,6 +707,23 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.await
}
/// Obtain the addresses associated with the given `PeerId`; they are first searched for locally
/// and the DHT is used as a fallback.
pub async fn find_peer(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>, Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::FindPeer(peer_id, tx))
.await?;
Ok(rx.await?)
}
.instrument(self.span.clone())
.await
}
/// Exit daemon.
pub async fn exit_daemon(self) {
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
@ -992,6 +1011,15 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
.collect();
let _ = ret.send(peers);
}
IpfsEvent::FindPeer(peer_id, ret) => {
let known_addrs = self.swarm.swarm.addresses_of_peer(&peer_id);
let addrs = if !known_addrs.is_empty() {
known_addrs
} else {
self.swarm.kademlia().addresses_of_peer(&peer_id)
};
let _ = ret.send(addrs);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());

View File

@ -46,6 +46,13 @@ impl TryFrom<Multiaddr> for MultiaddrWithoutPeerId {
}
}
impl From<MultiaddrWithPeerId> for MultiaddrWithoutPeerId {
fn from(addr: MultiaddrWithPeerId) -> Self {
let MultiaddrWithPeerId { multiaddr, .. } = addr;
MultiaddrWithoutPeerId(multiaddr.into())
}
}
impl From<MultiaddrWithoutPeerId> for Multiaddr {
fn from(addr: MultiaddrWithoutPeerId) -> Self {
let MultiaddrWithoutPeerId(multiaddr) = addr;

View File

@ -33,7 +33,7 @@ pub struct Behaviour<Types: IpfsTypes> {
ping: Ping,
identify: Identify,
pubsub: Pubsub,
swarm: SwarmApi,
pub swarm: SwarmApi,
}
/// Represents the result of a Kademlia query.
@ -488,6 +488,10 @@ impl<Types: IpfsTypes> Behaviour<Types> {
}
}
pub fn kademlia(&mut self) -> &mut Kademlia<MemoryStore> {
&mut self.kademlia
}
pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture<KadResult, String> {
let id = id.to_base58();

View File

@ -1,9 +1,88 @@
use cid::Cid;
use ipfs::{IpfsOptions, Node};
use ipfs::{p2p::MultiaddrWithPeerId, IpfsOptions, Node};
use libp2p::{multiaddr::Protocol, Multiaddr, PeerId};
use std::time::Duration;
use std::{convert::TryInto, time::Duration};
use tokio::time::timeout;
fn strip_peer_id(addr: Multiaddr) -> Multiaddr {
let MultiaddrWithPeerId { multiaddr, .. } = addr.try_into().unwrap();
multiaddr.into()
}
/// Check if `Ipfs::find_peer` works without DHT involvement.
#[tokio::test(max_threads = 1)]
async fn find_peer_local() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
let (b_id, mut b_addrs) = node_b.identity().await.unwrap();
let b_id = b_id.into_peer_id();
node_a.connect(b_addrs[0].clone()).await.unwrap();
// while node_a is connected to node_b, they know each other's
// addresses and can find them without using the DHT
let found_addrs = node_a.find_peer(b_id).await.unwrap();
// remove Protocol::P2p from b_addrs
for addr in &mut b_addrs {
assert!(matches!(addr.pop(), Some(Protocol::P2p(_))));
}
assert_eq!(found_addrs, b_addrs);
}
/// Check if `Ipfs::find_peer` works using DHT.
#[tokio::test(max_threads = 1)]
async fn find_peer_dht() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
let node_c = Node::new("c").await;
let (b_id, b_addrs) = node_b.identity().await.unwrap();
let b_id = b_id.into_peer_id();
let (c_id, mut c_addrs) = node_c.identity().await.unwrap();
let c_id = c_id.into_peer_id();
// register the nodes' addresses so they can bootstrap against
// one another; at this point node_a is not aware of node_c's
// existence
node_a
.add_peer(b_id.clone(), strip_peer_id(b_addrs[0].clone()))
.await
.unwrap();
node_b
.add_peer(c_id.clone(), strip_peer_id(c_addrs[0].clone()))
.await
.unwrap();
node_c
.add_peer(b_id, strip_peer_id(b_addrs[0].clone()))
.await
.unwrap();
node_a.bootstrap().await.unwrap();
node_b.bootstrap().await.unwrap();
node_c.bootstrap().await.unwrap();
// after the Kademlia bootstrap node_a is auto-connected to node_c, so
// disconnect it in order to remove its addresses from the ones known
// outside of the DHT.
node_a
.disconnect(c_addrs[0].clone().try_into().unwrap())
.await
.unwrap();
let found_addrs = node_a.find_peer(c_id).await.unwrap();
// remove Protocol::P2p from c_addrs
for addr in &mut c_addrs {
assert!(matches!(addr.pop(), Some(Protocol::P2p(_))));
}
assert_eq!(found_addrs, c_addrs);
}
// TODO: split into separate, more advanced bootstrap and closest_peers
#[tokio::test(max_threads = 1)]
async fn kademlia_local_peer_discovery() {
const BOOTSTRAPPER_COUNT: usize = 20;