From 1a55f27f5509e565ee83490a787128d02859324c Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 12:20:35 +0200 Subject: [PATCH 1/7] feat: introduce MultiaddrWithPeerId and MultihashWoPeerId Signed-off-by: ljedrz --- src/p2p/mod.rs | 2 +- src/p2p/swarm.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index be200920..0ff5e6d2 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod pubsub; mod swarm; mod transport; -pub use swarm::Connection; +pub use swarm::{Connection, MultiaddrWithPeerId, MultiaddrWoPeerId}; pub type TSwarm = Swarm>; diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index c0f79f1b..9e162143 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -1,4 +1,5 @@ use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; +use anyhow::anyhow; use core::task::{Context, Poll}; use libp2p::core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, @@ -8,7 +9,76 @@ use libp2p::swarm::protocols_handler::{ }; use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm}; use std::collections::{HashMap, HashSet, VecDeque}; +use std::convert::TryFrom; use std::time::Duration; +use std::{fmt, str::FromStr}; + +/// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MultiaddrWoPeerId(Multiaddr); + +impl From for MultiaddrWoPeerId { + fn from(addr: Multiaddr) -> Self { + Self( + addr.into_iter() + .filter(|p| !matches!(p, Protocol::P2p(_))) + .collect(), + ) + } +} + +impl FromStr for MultiaddrWoPeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let multiaddr = s.parse::()?; + Ok(multiaddr.into()) + } +} + +/// A `Multiaddr` paired with a discrete `PeerId`. The `Multiaddr` can contain a +/// `Protocol::P2p`, but it's not as easy to work with, and some functionalities +/// don't support it being contained within the `Multiaddr`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MultiaddrWithPeerId { + multiaddr: Multiaddr, + peer_id: PeerId, +} + +impl From<(Multiaddr, PeerId)> for MultiaddrWithPeerId { + fn from((multiaddr, peer_id): (Multiaddr, PeerId)) -> Self { + Self { multiaddr, peer_id } + } +} + +impl TryFrom for MultiaddrWithPeerId { + type Error = anyhow::Error; + + fn try_from(mut multiaddr: Multiaddr) -> Result { + if let Some(Protocol::P2p(hash)) = multiaddr.pop() { + let peer_id = PeerId::from_multihash(hash) + .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; + Ok(Self { multiaddr, peer_id }) + } else { + Err(anyhow!("Missing Protocol::P2p in the Multiaddr")) + } + } +} + +impl FromStr for MultiaddrWithPeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let multiaddr = s.parse::()?; + Self::try_from(multiaddr) + } +} + +impl fmt::Display for MultiaddrWithPeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/p2p/{}", self.multiaddr, self.peer_id) + } +} /// A description of currently active connection. #[derive(Clone, Debug, PartialEq, Eq)] @@ -275,12 +345,13 @@ mod tests { #[test] fn connection_targets() { let peer_id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"; - let multiaddr = "/ip4/104.131.131.82/tcp/4001"; - let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr, peer_id); + let multiaddr_wo_peer = "/ip4/104.131.131.82/tcp/4001"; + let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr_wo_peer, peer_id); let p2p_peer = format!("/p2p/{}", peer_id); // note: /ipfs/peer_id doesn't properly parse as a Multiaddr - assert!(multiaddr_with_peer.parse::().is_ok()); + assert!(multiaddr_wo_peer.parse::().is_ok()); + assert!(multiaddr_with_peer.parse::().is_ok()); assert!(p2p_peer.parse::().is_ok()); } From 6e8dff0bb7e71038d1c04f7ff57423a141de3cec Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 13:20:59 +0200 Subject: [PATCH 2/7] feat: utilize the new Multiaddr wrappers --- http/src/v0/swarm.rs | 16 ++++-- src/lib.rs | 22 ++++---- src/p2p/behaviour.rs | 6 +- src/p2p/swarm.rs | 131 ++++++++++++++++++++++--------------------- src/subscription.rs | 12 ++-- tests/connect_two.rs | 5 +- 6 files changed, 103 insertions(+), 89 deletions(-) diff --git a/http/src/v0/swarm.rs b/http/src/v0/swarm.rs index 4933f96c..188af4a0 100644 --- a/http/src/v0/swarm.rs +++ b/http/src/v0/swarm.rs @@ -1,5 +1,5 @@ use super::support::{with_ipfs, StringError}; -use ipfs::{Ipfs, IpfsTypes, Multiaddr}; +use ipfs::{Ipfs, IpfsTypes, MultiaddrWithPeerId}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::BTreeMap; @@ -16,7 +16,7 @@ async fn connect_query( ) -> Result { let target = query .arg - .parse::() + .parse::() .map_err(|e| warp::reject::custom(StringError::from(e)))?; ipfs.connect(target) .await @@ -80,8 +80,8 @@ async fn peers_query( None }; Peer { - addr: conn.address.to_string(), - peer: conn.peer_id.to_string(), + addr: conn.addr.multiaddr.as_ref().to_string(), + peer: conn.addr.peer_id.to_string(), latency, } }) @@ -162,14 +162,18 @@ pub fn addrs_local( #[derive(Debug, Deserialize)] struct DisconnectQuery { - arg: Multiaddr, + arg: String, } async fn disconnect_query( ipfs: Ipfs, query: DisconnectQuery, ) -> Result { - ipfs.disconnect(query.arg) + let target = query + .arg + .parse::() + .map_err(|e| warp::reject::custom(StringError::from(e)))?; + ipfs.disconnect(target) .await .map_err(|e| warp::reject::custom(StringError::from(e)))?; let response: &[&str] = &[]; diff --git a/src/lib.rs b/src/lib.rs index 548f97ec..e1e85888 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,8 +48,8 @@ use self::dag::IpldDag; pub use self::error::Error; use self::ipns::Ipns; pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream}; -pub use self::p2p::Connection; use self::p2p::{create_swarm, SwarmOptions, TSwarm}; +pub use self::p2p::{Connection, MultiaddrWithPeerId}; pub use self::path::IpfsPath; pub use self::repo::RepoTypes; use self::repo::{create_repo, Repo, RepoEvent, RepoOptions}; @@ -235,7 +235,7 @@ type Channel = OneshotSender>; enum IpfsEvent { /// Connect Connect( - Multiaddr, + MultiaddrWithPeerId, OneshotSender>>, ), /// Addresses @@ -245,7 +245,7 @@ enum IpfsEvent { /// Connections Connections(Channel>), /// Disconnect - Disconnect(Multiaddr, Channel<()>), + Disconnect(MultiaddrWithPeerId, Channel<()>), /// Request background task to return the listened and external addresses GetAddresses(OneshotSender>), PubsubSubscribe(String, OneshotSender>), @@ -447,11 +447,7 @@ impl Ipfs { self.ipns().cancel(key).instrument(self.span.clone()).await } - pub async fn connect(&self, target: Multiaddr) -> Result<(), Error> { - if !target.iter().any(|p| matches!(p, Protocol::P2p(_))) { - return Err(anyhow!("The target address is missing the P2p protocol")); - } - + pub async fn connect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> { async move { let (tx, rx) = oneshot_channel(); self.to_task @@ -503,12 +499,12 @@ impl Ipfs { .await } - pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> { + pub async fn disconnect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> { async move { let (tx, rx) = oneshot_channel(); self.to_task .clone() - .send(IpfsEvent::Disconnect(addr, tx)) + .send(IpfsEvent::Disconnect(target, tx)) .await?; rx.await? } @@ -1058,6 +1054,7 @@ pub use node::Node; mod node { use super::*; + use std::convert::TryFrom; /// Node encapsulates everything to setup a testing instance so that multi-node tests become /// easier. @@ -1074,6 +1071,11 @@ mod node { .await } + pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> { + let addr = MultiaddrWithPeerId::try_from(addr).unwrap(); + self.ipfs.connect(addr).await + } + pub async fn with_options(opts: IpfsOptions) -> Self { let span = Some(Span::current()); diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index c41c331b..2eb9ab4b 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -1,6 +1,6 @@ use super::pubsub::Pubsub; use super::swarm::{Connection, Disconnector, SwarmApi}; -use crate::p2p::SwarmOptions; +use crate::p2p::{MultiaddrWithPeerId, SwarmOptions}; use crate::repo::BlockPut; use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; use crate::{Ipfs, IpfsTypes}; @@ -414,11 +414,11 @@ impl Behaviour { self.swarm.connections() } - pub fn connect(&mut self, addr: Multiaddr) -> Option> { + pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option> { self.swarm.connect(addr) } - pub fn disconnect(&mut self, addr: Multiaddr) -> Option { + pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option { self.swarm.disconnect(addr) } diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 9e162143..a145515c 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -7,14 +7,14 @@ use libp2p::core::{ use libp2p::swarm::protocols_handler::{ DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler, }; -use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm}; +use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm}; use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::TryFrom; use std::time::Duration; use std::{fmt, str::FromStr}; /// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MultiaddrWoPeerId(Multiaddr); impl From for MultiaddrWoPeerId { @@ -27,6 +27,19 @@ impl From for MultiaddrWoPeerId { } } +impl From for Multiaddr { + fn from(addr: MultiaddrWoPeerId) -> Self { + let MultiaddrWoPeerId(multiaddr) = addr; + multiaddr + } +} + +impl AsRef for MultiaddrWoPeerId { + fn as_ref(&self) -> &Multiaddr { + &self.0 + } +} + impl FromStr for MultiaddrWoPeerId { type Err = anyhow::Error; @@ -41,12 +54,12 @@ impl FromStr for MultiaddrWoPeerId { /// don't support it being contained within the `Multiaddr`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MultiaddrWithPeerId { - multiaddr: Multiaddr, - peer_id: PeerId, + pub multiaddr: MultiaddrWoPeerId, + pub peer_id: PeerId, } -impl From<(Multiaddr, PeerId)> for MultiaddrWithPeerId { - fn from((multiaddr, peer_id): (Multiaddr, PeerId)) -> Self { +impl From<(MultiaddrWoPeerId, PeerId)> for MultiaddrWithPeerId { + fn from((multiaddr, peer_id): (MultiaddrWoPeerId, PeerId)) -> Self { Self { multiaddr, peer_id } } } @@ -56,6 +69,7 @@ impl TryFrom for MultiaddrWithPeerId { fn try_from(mut multiaddr: Multiaddr) -> Result { if let Some(Protocol::P2p(hash)) = multiaddr.pop() { + let multiaddr = MultiaddrWoPeerId(multiaddr); let peer_id = PeerId::from_multihash(hash) .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; Ok(Self { multiaddr, peer_id }) @@ -76,17 +90,15 @@ impl FromStr for MultiaddrWithPeerId { impl fmt::Display for MultiaddrWithPeerId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/p2p/{}", self.multiaddr, self.peer_id) + write!(f, "{}/p2p/{}", self.multiaddr.as_ref(), self.peer_id) } } /// A description of currently active connection. #[derive(Clone, Debug, PartialEq, Eq)] pub struct Connection { - /// The connected peer. - pub peer_id: PeerId, - /// Any connecting address of the peer as peers can have multiple connections to - pub address: Multiaddr, + /// The connected peer along with its address. + pub addr: MultiaddrWithPeerId, /// Latest ping report on any of the connections pub rtt: Option, } @@ -114,9 +126,9 @@ pub struct SwarmApi { events: VecDeque, peers: HashSet, connect_registry: SubscriptionRegistry<(), String>, - connections: HashMap, + connections: HashMap, roundtrip_times: HashMap, - connected_peers: HashMap>, + connected_peers: HashMap>, } impl SwarmApi { @@ -140,8 +152,7 @@ impl SwarmApi { if let Some(any) = conns.first() { Some(Connection { - peer_id: peer.clone(), - address: any.clone(), + addr: MultiaddrWithPeerId::from((any.clone(), peer.clone())), rtt, }) } else { @@ -155,43 +166,32 @@ impl SwarmApi { self.roundtrip_times.insert(peer_id.clone(), rtt); } - pub fn connect(&mut self, mut address: Multiaddr) -> Option> { - if self.connections.contains_key(&address) { + pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option> { + if self.connections.contains_key(&addr.multiaddr) { return None; } - trace!("Connecting to {:?}", address); + trace!("Connecting to {:?}", addr); let subscription = self .connect_registry - .create_subscription(address.clone().into(), None); + .create_subscription(addr.clone().into(), None); - // libp2p currently doesn't support dialing with the P2p protocol - let peer_id = if let Some(Protocol::P2p(peer_id)) = address.pop() { - PeerId::from_multihash(peer_id).ok()? - } else { - return None; - }; + // libp2p currently doesn't support dialing with the P2p protocol, so only consider the + // "bare" Multiaddr + let MultiaddrWithPeerId { multiaddr, .. } = addr; - if address.iter().next().is_some() { - self.events - .push_back(NetworkBehaviourAction::DialAddress { address }); - } else { - self.events.push_back(NetworkBehaviourAction::DialPeer { - peer_id, - condition: DialPeerCondition::Disconnected, - }); - } + self.events.push_back(NetworkBehaviourAction::DialAddress { + address: multiaddr.into(), + }); Some(subscription) } - pub fn disconnect(&mut self, address: Multiaddr) -> Option { - trace!("disconnect {}", address); + pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option { + trace!("disconnect {}", addr); // FIXME: closing a single specific connection would be allowed for ProtocolHandlers - let peer_id = self.connections.remove(&address); - - if let Some(peer_id) = peer_id { + if let Some(peer_id) = self.connections.remove(&addr.multiaddr) { // wasted some time wondering if the peer should be removed here or not; it should. the // API is a bit ackward since we can't tolerate the Disconnector::disconnect **not** // being called. @@ -227,6 +227,7 @@ impl NetworkBehaviour for SwarmApi { self.connected_peers .get(peer_id) .cloned() + .map(|addrs| addrs.into_iter().map(From::from).collect()) .unwrap_or_default() } @@ -238,24 +239,21 @@ impl NetworkBehaviour for SwarmApi { ) { // TODO: could be that the connection is not yet fully established at this point trace!("inject_connected {} {:?}", peer_id, cp); - let mut addr = connection_point_addr(cp).to_owned(); - - if !addr - .iter() - .any(|protocol| matches!(protocol, Protocol::P2p(_))) - { - let protocol = Protocol::P2p(peer_id.to_owned().into()); - addr.push(protocol); - } + let addr = connection_point_addr(cp).to_owned(); self.peers.insert(peer_id.clone()); let connections = self.connected_peers.entry(peer_id.clone()).or_default(); + connections.push(addr.clone().into()); - connections.push(addr.clone()); - - self.connections.insert(addr.clone(), peer_id.clone()); + self.connections + .insert(addr.clone().into(), peer_id.clone()); if let ConnectedPoint::Dialer { .. } = cp { + let addr = MultiaddrWithPeerId { + multiaddr: addr.into(), + peer_id: peer_id.clone(), + }; + self.connect_registry .finish_subscription(addr.into(), Ok(())); } @@ -272,8 +270,7 @@ impl NetworkBehaviour for SwarmApi { cp: &ConnectedPoint, ) { trace!("inject_connection_closed {} {:?}", peer_id, cp); - let mut closed_addr = connection_point_addr(cp).to_owned(); - closed_addr.push(Protocol::P2p(peer_id.to_owned().into())); + let closed_addr = connection_point_addr(cp).to_owned().into(); let became_empty = if let Some(connections) = self.connected_peers.get_mut(peer_id) { if let Some(index) = connections.iter().position(|addr| *addr == closed_addr) { @@ -289,10 +286,10 @@ impl NetworkBehaviour for SwarmApi { self.connections.remove(&closed_addr); if let ConnectedPoint::Dialer { .. } = cp { - self.connect_registry.finish_subscription( - closed_addr.into(), - Err("Connection reset by peer".to_owned()), - ); + let addr = MultiaddrWithPeerId::from((closed_addr, peer_id.to_owned())); + + self.connect_registry + .finish_subscription(addr.into(), Err("Connection reset by peer".to_owned())); } } @@ -306,13 +303,17 @@ impl NetworkBehaviour for SwarmApi { fn inject_addr_reach_failure( &mut self, - _peer_id: Option<&PeerId>, + peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error, ) { trace!("inject_addr_reach_failure {} {}", addr, error); - self.connect_registry - .finish_subscription(addr.clone().into(), Err(error.to_string())); + if let Some(peer_id) = peer_id { + let ma: MultiaddrWoPeerId = addr.clone().into(); + let addr = MultiaddrWithPeerId::from((ma, peer_id.to_owned())); + self.connect_registry + .finish_subscription(addr.into(), Err(error.to_string())); + } } fn poll( @@ -340,7 +341,7 @@ mod tests { use super::*; use crate::p2p::transport::{build_transport, TTransport}; use libp2p::identity::Keypair; - use libp2p::swarm::Swarm; + use libp2p::{multihash::Multihash, swarm::Swarm}; #[test] fn connection_targets() { @@ -358,7 +359,7 @@ mod tests { #[tokio::test(max_threads = 1)] async fn swarm_api() { let (peer1_id, trans) = mk_transport(); - let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id); + let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id.clone()); let (peer2_id, trans) = mk_transport(); let mut swarm2 = Swarm::new(trans, SwarmApi::default(), peer2_id); @@ -366,7 +367,11 @@ mod tests { Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); for l in Swarm::listeners(&swarm1) { - if let Some(fut) = swarm2.connect(l.to_owned()) { + let mut addr = l.to_owned(); + addr.push(Protocol::P2p( + Multihash::from_bytes(peer1_id.clone().into_bytes()).unwrap(), + )); + if let Some(fut) = swarm2.connect(MultiaddrWithPeerId::try_from(addr).unwrap()) { fut.await.unwrap(); } } diff --git a/src/subscription.rs b/src/subscription.rs index 302b641b..e3402cbd 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -3,14 +3,14 @@ //! that contains them. `SubscriptionFuture` is the `Future` bound to pending `Subscription`s and //! sharing the same unique numeric identifier, the `SubscriptionId`. -use crate::RepoEvent; +use crate::{p2p::MultiaddrWithPeerId, RepoEvent}; use cid::Cid; use core::fmt::Debug; use core::hash::Hash; use core::pin::Pin; use futures::channel::mpsc::Sender; use futures::future::Future; -use libp2p::{kad::QueryId, Multiaddr}; +use libp2p::kad::QueryId; use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -28,8 +28,8 @@ static GLOBAL_REQ_COUNT: AtomicU64 = AtomicU64::new(0); /// The type of a request for subscription. #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum RequestKind { - /// A request to connect to the given `Multiaddr` or `PeerId`. - Connect(Multiaddr), + /// A request to connect to the given `Multiaddr`+`PeerId` pair. + Connect(MultiaddrWithPeerId), /// A request to obtain a `Block` with a specific `Cid`. GetBlock(Cid), /// A DHT request to Kademlia. @@ -38,8 +38,8 @@ pub enum RequestKind { Num(u32), } -impl From for RequestKind { - fn from(addr: Multiaddr) -> Self { +impl From for RequestKind { + fn from(addr: MultiaddrWithPeerId) -> Self { Self::Connect(addr) } } diff --git a/tests/connect_two.rs b/tests/connect_two.rs index 4d36ebb7..fc266455 100644 --- a/tests/connect_two.rs +++ b/tests/connect_two.rs @@ -20,6 +20,9 @@ async fn connect_two_nodes_by_addr() { // Make sure only a `Multiaddr` with `/p2p/` can be used to connect. #[tokio::test(max_threads = 1)] +#[should_panic( + expected = "called `Result::unwrap()` on an `Err` value: Missing Protocol::P2p in the Multiaddr" +)] async fn dont_connect_without_p2p() { let node_a = Node::new("a").await; let node_b = Node::new("b").await; @@ -120,7 +123,7 @@ async fn connect_two_nodes_with_two_connections_doesnt_panic() { // peer.. node_a - .disconnect(peers.remove(0).address) + .disconnect(peers.remove(0).addr) .await .expect("failed to disconnect peer_b at peer_a"); From c4435c8c94b895736b9510444065e33c377c82ae Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 16:02:16 +0200 Subject: [PATCH 3/7] chore: rename MultiaddrWoPeerId to MultiaddrWithoutPeerId Signed-off-by: ljedrz --- src/p2p/mod.rs | 2 +- src/p2p/swarm.rs | 30 +++++++++++++++--------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 0ff5e6d2..e9622318 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod pubsub; mod swarm; mod transport; -pub use swarm::{Connection, MultiaddrWithPeerId, MultiaddrWoPeerId}; +pub use swarm::{Connection, MultiaddrWithPeerId, MultiaddrWithoutPeerId}; pub type TSwarm = Swarm>; diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index a145515c..9e5ad2fa 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -15,9 +15,9 @@ use std::{fmt, str::FromStr}; /// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct MultiaddrWoPeerId(Multiaddr); +pub struct MultiaddrWithoutPeerId(Multiaddr); -impl From for MultiaddrWoPeerId { +impl From for MultiaddrWithoutPeerId { fn from(addr: Multiaddr) -> Self { Self( addr.into_iter() @@ -27,20 +27,20 @@ impl From for MultiaddrWoPeerId { } } -impl From for Multiaddr { - fn from(addr: MultiaddrWoPeerId) -> Self { - let MultiaddrWoPeerId(multiaddr) = addr; +impl From for Multiaddr { + fn from(addr: MultiaddrWithoutPeerId) -> Self { + let MultiaddrWithoutPeerId(multiaddr) = addr; multiaddr } } -impl AsRef for MultiaddrWoPeerId { +impl AsRef for MultiaddrWithoutPeerId { fn as_ref(&self) -> &Multiaddr { &self.0 } } -impl FromStr for MultiaddrWoPeerId { +impl FromStr for MultiaddrWithoutPeerId { type Err = anyhow::Error; fn from_str(s: &str) -> Result { @@ -54,12 +54,12 @@ impl FromStr for MultiaddrWoPeerId { /// don't support it being contained within the `Multiaddr`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MultiaddrWithPeerId { - pub multiaddr: MultiaddrWoPeerId, + pub multiaddr: MultiaddrWithoutPeerId, pub peer_id: PeerId, } -impl From<(MultiaddrWoPeerId, PeerId)> for MultiaddrWithPeerId { - fn from((multiaddr, peer_id): (MultiaddrWoPeerId, PeerId)) -> Self { +impl From<(MultiaddrWithoutPeerId, PeerId)> for MultiaddrWithPeerId { + fn from((multiaddr, peer_id): (MultiaddrWithoutPeerId, PeerId)) -> Self { Self { multiaddr, peer_id } } } @@ -69,7 +69,7 @@ impl TryFrom for MultiaddrWithPeerId { fn try_from(mut multiaddr: Multiaddr) -> Result { if let Some(Protocol::P2p(hash)) = multiaddr.pop() { - let multiaddr = MultiaddrWoPeerId(multiaddr); + let multiaddr = MultiaddrWithoutPeerId(multiaddr); let peer_id = PeerId::from_multihash(hash) .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; Ok(Self { multiaddr, peer_id }) @@ -126,9 +126,9 @@ pub struct SwarmApi { events: VecDeque, peers: HashSet, connect_registry: SubscriptionRegistry<(), String>, - connections: HashMap, + connections: HashMap, roundtrip_times: HashMap, - connected_peers: HashMap>, + connected_peers: HashMap>, } impl SwarmApi { @@ -309,7 +309,7 @@ impl NetworkBehaviour for SwarmApi { ) { trace!("inject_addr_reach_failure {} {}", addr, error); if let Some(peer_id) = peer_id { - let ma: MultiaddrWoPeerId = addr.clone().into(); + let ma: MultiaddrWithoutPeerId = addr.clone().into(); let addr = MultiaddrWithPeerId::from((ma, peer_id.to_owned())); self.connect_registry .finish_subscription(addr.into(), Err(error.to_string())); @@ -351,7 +351,7 @@ mod tests { let p2p_peer = format!("/p2p/{}", peer_id); // note: /ipfs/peer_id doesn't properly parse as a Multiaddr - assert!(multiaddr_wo_peer.parse::().is_ok()); + assert!(multiaddr_wo_peer.parse::().is_ok()); assert!(multiaddr_with_peer.parse::().is_ok()); assert!(p2p_peer.parse::().is_ok()); } From c0389bf68426c1c4c34a48ff2f22e33f9f1d7094 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 16:13:57 +0200 Subject: [PATCH 4/7] chore: move the Multiaddr wrappers to their own module Signed-off-by: ljedrz --- src/p2p/addr.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++ src/p2p/mod.rs | 4 ++- src/p2p/swarm.rs | 89 ++---------------------------------------------- 3 files changed, 89 insertions(+), 88 deletions(-) create mode 100644 src/p2p/addr.rs diff --git a/src/p2p/addr.rs b/src/p2p/addr.rs new file mode 100644 index 00000000..a4a6475d --- /dev/null +++ b/src/p2p/addr.rs @@ -0,0 +1,84 @@ +use anyhow::anyhow; +use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; +use std::{convert::TryFrom, fmt, str::FromStr}; + +/// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MultiaddrWithoutPeerId(Multiaddr); + +impl From for MultiaddrWithoutPeerId { + fn from(addr: Multiaddr) -> Self { + Self( + addr.into_iter() + .filter(|p| !matches!(p, Protocol::P2p(_))) + .collect(), + ) + } +} + +impl From for Multiaddr { + fn from(addr: MultiaddrWithoutPeerId) -> Self { + let MultiaddrWithoutPeerId(multiaddr) = addr; + multiaddr + } +} + +impl AsRef for MultiaddrWithoutPeerId { + fn as_ref(&self) -> &Multiaddr { + &self.0 + } +} + +impl FromStr for MultiaddrWithoutPeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let multiaddr = s.parse::()?; + Ok(multiaddr.into()) + } +} + +/// A `Multiaddr` paired with a discrete `PeerId`. The `Multiaddr` can contain a +/// `Protocol::P2p`, but it's not as easy to work with, and some functionalities +/// don't support it being contained within the `Multiaddr`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MultiaddrWithPeerId { + pub multiaddr: MultiaddrWithoutPeerId, + pub peer_id: PeerId, +} + +impl From<(MultiaddrWithoutPeerId, PeerId)> for MultiaddrWithPeerId { + fn from((multiaddr, peer_id): (MultiaddrWithoutPeerId, PeerId)) -> Self { + Self { multiaddr, peer_id } + } +} + +impl TryFrom for MultiaddrWithPeerId { + type Error = anyhow::Error; + + fn try_from(mut multiaddr: Multiaddr) -> Result { + if let Some(Protocol::P2p(hash)) = multiaddr.pop() { + let multiaddr = MultiaddrWithoutPeerId(multiaddr); + let peer_id = PeerId::from_multihash(hash) + .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; + Ok(Self { multiaddr, peer_id }) + } else { + Err(anyhow!("Missing Protocol::P2p in the Multiaddr")) + } + } +} + +impl FromStr for MultiaddrWithPeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let multiaddr = s.parse::()?; + Self::try_from(multiaddr) + } +} + +impl fmt::Display for MultiaddrWithPeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/p2p/{}", self.multiaddr.as_ref(), self.peer_id) + } +} diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index e9622318..9a7c404f 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -5,12 +5,14 @@ use libp2p::Swarm; use libp2p::{Multiaddr, PeerId}; use tracing::Span; +mod addr; mod behaviour; pub(crate) mod pubsub; mod swarm; mod transport; -pub use swarm::{Connection, MultiaddrWithPeerId, MultiaddrWithoutPeerId}; +pub use addr::{MultiaddrWithPeerId, MultiaddrWithoutPeerId}; +pub use swarm::Connection; pub type TSwarm = Swarm>; diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 9e5ad2fa..80756fef 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -1,98 +1,13 @@ +use crate::p2p::{MultiaddrWithPeerId, MultiaddrWithoutPeerId}; use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; -use anyhow::anyhow; use core::task::{Context, Poll}; -use libp2p::core::{ - connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, -}; +use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p::swarm::protocols_handler::{ DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler, }; use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm}; use std::collections::{HashMap, HashSet, VecDeque}; -use std::convert::TryFrom; use std::time::Duration; -use std::{fmt, str::FromStr}; - -/// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct MultiaddrWithoutPeerId(Multiaddr); - -impl From for MultiaddrWithoutPeerId { - fn from(addr: Multiaddr) -> Self { - Self( - addr.into_iter() - .filter(|p| !matches!(p, Protocol::P2p(_))) - .collect(), - ) - } -} - -impl From for Multiaddr { - fn from(addr: MultiaddrWithoutPeerId) -> Self { - let MultiaddrWithoutPeerId(multiaddr) = addr; - multiaddr - } -} - -impl AsRef for MultiaddrWithoutPeerId { - fn as_ref(&self) -> &Multiaddr { - &self.0 - } -} - -impl FromStr for MultiaddrWithoutPeerId { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let multiaddr = s.parse::()?; - Ok(multiaddr.into()) - } -} - -/// A `Multiaddr` paired with a discrete `PeerId`. The `Multiaddr` can contain a -/// `Protocol::P2p`, but it's not as easy to work with, and some functionalities -/// don't support it being contained within the `Multiaddr`. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct MultiaddrWithPeerId { - pub multiaddr: MultiaddrWithoutPeerId, - pub peer_id: PeerId, -} - -impl From<(MultiaddrWithoutPeerId, PeerId)> for MultiaddrWithPeerId { - fn from((multiaddr, peer_id): (MultiaddrWithoutPeerId, PeerId)) -> Self { - Self { multiaddr, peer_id } - } -} - -impl TryFrom for MultiaddrWithPeerId { - type Error = anyhow::Error; - - fn try_from(mut multiaddr: Multiaddr) -> Result { - if let Some(Protocol::P2p(hash)) = multiaddr.pop() { - let multiaddr = MultiaddrWithoutPeerId(multiaddr); - let peer_id = PeerId::from_multihash(hash) - .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; - Ok(Self { multiaddr, peer_id }) - } else { - Err(anyhow!("Missing Protocol::P2p in the Multiaddr")) - } - } -} - -impl FromStr for MultiaddrWithPeerId { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let multiaddr = s.parse::()?; - Self::try_from(multiaddr) - } -} - -impl fmt::Display for MultiaddrWithPeerId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}/p2p/{}", self.multiaddr.as_ref(), self.peer_id) - } -} /// A description of currently active connection. #[derive(Clone, Debug, PartialEq, Eq)] From 4599fff709843598828cdf444ea41e6ac49a071b Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 16:59:17 +0200 Subject: [PATCH 5/7] feat: introduce a custom error for the Multiaddr wrappers Signed-off-by: ljedrz --- src/p2p/addr.rs | 41 ++++++++++++++++++++++++++++++++--------- src/p2p/swarm.rs | 5 +++-- tests/connect_two.rs | 4 +--- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/p2p/addr.rs b/src/p2p/addr.rs index a4a6475d..c1eacc73 100644 --- a/src/p2p/addr.rs +++ b/src/p2p/addr.rs @@ -1,7 +1,26 @@ -use anyhow::anyhow; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; use std::{convert::TryFrom, fmt, str::FromStr}; +/// An error that can be thrown when converting to `MultiaddrWithPeerId` and +/// `MultiaddrWithoutPeerId`. +#[derive(Debug, Clone)] +pub enum MultiaddrWrapperError { + /// The provided `Multiaddr` is invalid. + InvalidMultiaddr, + /// The `Protocol::P2p` is missing from the source `Multiaddr`. + MissingProtocolP2p, + /// The `PeerId` created based on the `Protocol::P2p` is invalid. + InvalidPeerId, +} + +impl fmt::Display for MultiaddrWrapperError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for MultiaddrWrapperError {} + /// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MultiaddrWithoutPeerId(Multiaddr); @@ -30,10 +49,12 @@ impl AsRef for MultiaddrWithoutPeerId { } impl FromStr for MultiaddrWithoutPeerId { - type Err = anyhow::Error; + type Err = MultiaddrWrapperError; fn from_str(s: &str) -> Result { - let multiaddr = s.parse::()?; + let multiaddr = s + .parse::() + .map_err(|_| MultiaddrWrapperError::InvalidMultiaddr)?; Ok(multiaddr.into()) } } @@ -54,25 +75,27 @@ impl From<(MultiaddrWithoutPeerId, PeerId)> for MultiaddrWithPeerId { } impl TryFrom for MultiaddrWithPeerId { - type Error = anyhow::Error; + type Error = MultiaddrWrapperError; fn try_from(mut multiaddr: Multiaddr) -> Result { if let Some(Protocol::P2p(hash)) = multiaddr.pop() { let multiaddr = MultiaddrWithoutPeerId(multiaddr); - let peer_id = PeerId::from_multihash(hash) - .map_err(|_| anyhow!("Invalid Multihash in Protocol::P2p"))?; + let peer_id = + PeerId::from_multihash(hash).map_err(|_| MultiaddrWrapperError::InvalidPeerId)?; Ok(Self { multiaddr, peer_id }) } else { - Err(anyhow!("Missing Protocol::P2p in the Multiaddr")) + Err(MultiaddrWrapperError::MissingProtocolP2p) } } } impl FromStr for MultiaddrWithPeerId { - type Err = anyhow::Error; + type Err = MultiaddrWrapperError; fn from_str(s: &str) -> Result { - let multiaddr = s.parse::()?; + let multiaddr = s + .parse::() + .map_err(|_| MultiaddrWrapperError::InvalidMultiaddr)?; Self::try_from(multiaddr) } } diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 80756fef..231d2c8e 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -256,7 +256,8 @@ mod tests { use super::*; use crate::p2p::transport::{build_transport, TTransport}; use libp2p::identity::Keypair; - use libp2p::{multihash::Multihash, swarm::Swarm}; + use libp2p::{multiaddr::Protocol, multihash::Multihash, swarm::Swarm}; + use std::convert::TryInto; #[test] fn connection_targets() { @@ -286,7 +287,7 @@ mod tests { addr.push(Protocol::P2p( Multihash::from_bytes(peer1_id.clone().into_bytes()).unwrap(), )); - if let Some(fut) = swarm2.connect(MultiaddrWithPeerId::try_from(addr).unwrap()) { + if let Some(fut) = swarm2.connect(addr.try_into().unwrap()) { fut.await.unwrap(); } } diff --git a/tests/connect_two.rs b/tests/connect_two.rs index fc266455..703105f0 100644 --- a/tests/connect_two.rs +++ b/tests/connect_two.rs @@ -20,9 +20,7 @@ async fn connect_two_nodes_by_addr() { // Make sure only a `Multiaddr` with `/p2p/` can be used to connect. #[tokio::test(max_threads = 1)] -#[should_panic( - expected = "called `Result::unwrap()` on an `Err` value: Missing Protocol::P2p in the Multiaddr" -)] +#[should_panic(expected = "called `Result::unwrap()` on an `Err` value: MissingProtocolP2p")] async fn dont_connect_without_p2p() { let node_a = Node::new("a").await; let node_b = Node::new("b").await; From 10639a614a78bd00cf746e33f10a61ae98390163 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Mon, 17 Aug 2020 17:27:26 +0200 Subject: [PATCH 6/7] chore: move a test between modules Signed-off-by: ljedrz --- src/p2p/addr.rs | 18 ++++++++++++++++++ src/p2p/swarm.rs | 13 ------------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/p2p/addr.rs b/src/p2p/addr.rs index c1eacc73..d97bb4d3 100644 --- a/src/p2p/addr.rs +++ b/src/p2p/addr.rs @@ -105,3 +105,21 @@ impl fmt::Display for MultiaddrWithPeerId { write!(f, "{}/p2p/{}", self.multiaddr.as_ref(), self.peer_id) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn connection_targets() { + let peer_id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"; + let multiaddr_wo_peer = "/ip4/104.131.131.82/tcp/4001"; + let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr_wo_peer, peer_id); + let p2p_peer = format!("/p2p/{}", peer_id); + // note: /ipfs/peer_id doesn't properly parse as a Multiaddr + + assert!(multiaddr_wo_peer.parse::().is_ok()); + assert!(multiaddr_with_peer.parse::().is_ok()); + assert!(p2p_peer.parse::().is_ok()); + } +} diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 231d2c8e..8643541d 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -259,19 +259,6 @@ mod tests { use libp2p::{multiaddr::Protocol, multihash::Multihash, swarm::Swarm}; use std::convert::TryInto; - #[test] - fn connection_targets() { - let peer_id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"; - let multiaddr_wo_peer = "/ip4/104.131.131.82/tcp/4001"; - let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr_wo_peer, peer_id); - let p2p_peer = format!("/p2p/{}", peer_id); - // note: /ipfs/peer_id doesn't properly parse as a Multiaddr - - assert!(multiaddr_wo_peer.parse::().is_ok()); - assert!(multiaddr_with_peer.parse::().is_ok()); - assert!(p2p_peer.parse::().is_ok()); - } - #[tokio::test(max_threads = 1)] async fn swarm_api() { let (peer1_id, trans) = mk_transport(); From dcae04f6e26522a18241a8e416c992e0d5b5f9f6 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 18 Aug 2020 15:59:27 +0200 Subject: [PATCH 7/7] feat: extended Multiaddr wrapper error handling Signed-off-by: ljedrz --- src/p2p/addr.rs | 43 +++++++++++++++++++++++++++---------------- src/p2p/swarm.rs | 17 ++++++++--------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/src/p2p/addr.rs b/src/p2p/addr.rs index d97bb4d3..5e2b4b2e 100644 --- a/src/p2p/addr.rs +++ b/src/p2p/addr.rs @@ -1,16 +1,25 @@ -use libp2p::{multiaddr::Protocol, Multiaddr, PeerId}; -use std::{convert::TryFrom, fmt, str::FromStr}; +use libp2p::{ + multiaddr::{self, Protocol}, + Multiaddr, PeerId, +}; +use std::{ + convert::{TryFrom, TryInto}, + fmt, + str::FromStr, +}; /// An error that can be thrown when converting to `MultiaddrWithPeerId` and /// `MultiaddrWithoutPeerId`. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum MultiaddrWrapperError { + /// The source `Multiaddr` unexpectedly contains `Protocol::P2p`. + ContainsProtocolP2p, /// The provided `Multiaddr` is invalid. - InvalidMultiaddr, - /// The `Protocol::P2p` is missing from the source `Multiaddr`. - MissingProtocolP2p, + InvalidMultiaddr(multiaddr::Error), /// The `PeerId` created based on the `Protocol::P2p` is invalid. InvalidPeerId, + /// The `Protocol::P2p` is unexpectedly missing from the source `Multiaddr`. + MissingProtocolP2p, } impl fmt::Display for MultiaddrWrapperError { @@ -25,13 +34,15 @@ impl std::error::Error for MultiaddrWrapperError {} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct MultiaddrWithoutPeerId(Multiaddr); -impl From for MultiaddrWithoutPeerId { - fn from(addr: Multiaddr) -> Self { - Self( - addr.into_iter() - .filter(|p| !matches!(p, Protocol::P2p(_))) - .collect(), - ) +impl TryFrom for MultiaddrWithoutPeerId { + type Error = MultiaddrWrapperError; + + fn try_from(addr: Multiaddr) -> Result { + if addr.iter().any(|p| matches!(p, Protocol::P2p(_))) { + Err(MultiaddrWrapperError::ContainsProtocolP2p) + } else { + Ok(Self(addr)) + } } } @@ -54,8 +65,8 @@ impl FromStr for MultiaddrWithoutPeerId { fn from_str(s: &str) -> Result { let multiaddr = s .parse::() - .map_err(|_| MultiaddrWrapperError::InvalidMultiaddr)?; - Ok(multiaddr.into()) + .map_err(MultiaddrWrapperError::InvalidMultiaddr)?; + multiaddr.try_into() } } @@ -95,7 +106,7 @@ impl FromStr for MultiaddrWithPeerId { fn from_str(s: &str) -> Result { let multiaddr = s .parse::() - .map_err(|_| MultiaddrWrapperError::InvalidMultiaddr)?; + .map_err(MultiaddrWrapperError::InvalidMultiaddr)?; Self::try_from(multiaddr) } } diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 8643541d..61960aaa 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -7,6 +7,7 @@ use libp2p::swarm::protocols_handler::{ }; use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm}; use std::collections::{HashMap, HashSet, VecDeque}; +use std::convert::TryInto; use std::time::Duration; /// A description of currently active connection. @@ -154,18 +155,17 @@ impl NetworkBehaviour for SwarmApi { ) { // TODO: could be that the connection is not yet fully established at this point trace!("inject_connected {} {:?}", peer_id, cp); - let addr = connection_point_addr(cp).to_owned(); + let addr: MultiaddrWithoutPeerId = connection_point_addr(cp).to_owned().try_into().unwrap(); self.peers.insert(peer_id.clone()); let connections = self.connected_peers.entry(peer_id.clone()).or_default(); - connections.push(addr.clone().into()); + connections.push(addr.clone()); - self.connections - .insert(addr.clone().into(), peer_id.clone()); + self.connections.insert(addr.clone(), peer_id.clone()); if let ConnectedPoint::Dialer { .. } = cp { let addr = MultiaddrWithPeerId { - multiaddr: addr.into(), + multiaddr: addr, peer_id: peer_id.clone(), }; @@ -185,7 +185,7 @@ impl NetworkBehaviour for SwarmApi { cp: &ConnectedPoint, ) { trace!("inject_connection_closed {} {:?}", peer_id, cp); - let closed_addr = connection_point_addr(cp).to_owned().into(); + let closed_addr = connection_point_addr(cp).to_owned().try_into().unwrap(); let became_empty = if let Some(connections) = self.connected_peers.get_mut(peer_id) { if let Some(index) = connections.iter().position(|addr| *addr == closed_addr) { @@ -223,9 +223,8 @@ impl NetworkBehaviour for SwarmApi { error: &dyn std::error::Error, ) { trace!("inject_addr_reach_failure {} {}", addr, error); - if let Some(peer_id) = peer_id { - let ma: MultiaddrWithoutPeerId = addr.clone().into(); - let addr = MultiaddrWithPeerId::from((ma, peer_id.to_owned())); + if peer_id.is_some() { + let addr: MultiaddrWithPeerId = addr.clone().try_into().unwrap(); self.connect_registry .finish_subscription(addr.into(), Err(error.to_string())); }