315: Use explicit Multiaddr wrappers r=koivunej a=ljedrz

Whether a `Multiaddr` contains `Protocol::P2p` (i.e. essentially `/p2p/PeerId`) or not has a lot of bearing on conformance tests, the internal object and APIs and error-handling. Since it is easy to introduce related human errors, it would be a good idea to enforce some type safety here by introducing 2 wrapper objects: `MultiaddrWoPeerId` and `MultiaddrWithPeerId`.

The extent to which they should be applied (in lieu of `Multiaddr`) is a matter for discussion - in my initial attempt I decided to apply them quite liberally, as it results in more type safety (there are still a few plain `Multiaddr`s left though).

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-08-18 16:53:28 +00:00 committed by GitHub
commit 34205806a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 96 deletions

View File

@ -1,5 +1,5 @@
use super::support::{with_ipfs, StringError};
use ipfs::{Ipfs, IpfsTypes, Multiaddr};
use ipfs::{Ipfs, IpfsTypes, MultiaddrWithPeerId};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::BTreeMap;
@ -16,7 +16,7 @@ async fn connect_query<T: IpfsTypes>(
) -> Result<impl warp::Reply, warp::Rejection> {
let target = query
.arg
.parse::<Multiaddr>()
.parse::<MultiaddrWithPeerId>()
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
ipfs.connect(target)
.await
@ -80,8 +80,8 @@ async fn peers_query<T: IpfsTypes>(
None
};
Peer {
addr: conn.address.to_string(),
peer: conn.peer_id.to_string(),
addr: conn.addr.multiaddr.as_ref().to_string(),
peer: conn.addr.peer_id.to_string(),
latency,
}
})
@ -162,14 +162,18 @@ pub fn addrs_local<T: IpfsTypes>(
#[derive(Debug, Deserialize)]
struct DisconnectQuery {
arg: Multiaddr,
arg: String,
}
async fn disconnect_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: DisconnectQuery,
) -> Result<impl warp::Reply, warp::Rejection> {
ipfs.disconnect(query.arg)
let target = query
.arg
.parse::<MultiaddrWithPeerId>()
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
ipfs.disconnect(target)
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
let response: &[&str] = &[];

View File

@ -48,8 +48,8 @@ use self::dag::IpldDag;
pub use self::error::Error;
use self::ipns::Ipns;
pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream};
pub use self::p2p::Connection;
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
pub use self::p2p::{Connection, MultiaddrWithPeerId};
pub use self::path::IpfsPath;
pub use self::repo::RepoTypes;
use self::repo::{create_repo, Repo, RepoEvent, RepoOptions};
@ -235,7 +235,7 @@ type Channel<T> = OneshotSender<Result<T, Error>>;
enum IpfsEvent {
/// Connect
Connect(
Multiaddr,
MultiaddrWithPeerId,
OneshotSender<Option<SubscriptionFuture<(), String>>>,
),
/// Addresses
@ -245,7 +245,7 @@ enum IpfsEvent {
/// Connections
Connections(Channel<Vec<Connection>>),
/// Disconnect
Disconnect(Multiaddr, Channel<()>),
Disconnect(MultiaddrWithPeerId, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
PubsubSubscribe(String, OneshotSender<Option<SubscriptionStream>>),
@ -447,11 +447,7 @@ impl<Types: IpfsTypes> Ipfs<Types> {
self.ipns().cancel(key).instrument(self.span.clone()).await
}
pub async fn connect(&self, target: Multiaddr) -> Result<(), Error> {
if !target.iter().any(|p| matches!(p, Protocol::P2p(_))) {
return Err(anyhow!("The target address is missing the P2p protocol"));
}
pub async fn connect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
@ -503,12 +499,12 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.await
}
pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> {
pub async fn disconnect(&self, target: MultiaddrWithPeerId) -> Result<(), Error> {
async move {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Disconnect(addr, tx))
.send(IpfsEvent::Disconnect(target, tx))
.await?;
rx.await?
}
@ -1058,6 +1054,7 @@ pub use node::Node;
mod node {
use super::*;
use std::convert::TryFrom;
/// Node encapsulates everything to setup a testing instance so that multi-node tests become
/// easier.
@ -1074,6 +1071,11 @@ mod node {
.await
}
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
let addr = MultiaddrWithPeerId::try_from(addr).unwrap();
self.ipfs.connect(addr).await
}
pub async fn with_options(opts: IpfsOptions) -> Self {
let span = Some(Span::current());

136
src/p2p/addr.rs Normal file
View File

@ -0,0 +1,136 @@
use libp2p::{
multiaddr::{self, Protocol},
Multiaddr, PeerId,
};
use std::{
convert::{TryFrom, TryInto},
fmt,
str::FromStr,
};
/// An error that can be thrown when converting to `MultiaddrWithPeerId` and
/// `MultiaddrWithoutPeerId`.
#[derive(Debug)]
pub enum MultiaddrWrapperError {
/// The source `Multiaddr` unexpectedly contains `Protocol::P2p`.
ContainsProtocolP2p,
/// The provided `Multiaddr` is invalid.
InvalidMultiaddr(multiaddr::Error),
/// The `PeerId` created based on the `Protocol::P2p` is invalid.
InvalidPeerId,
/// The `Protocol::P2p` is unexpectedly missing from the source `Multiaddr`.
MissingProtocolP2p,
}
impl fmt::Display for MultiaddrWrapperError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for MultiaddrWrapperError {}
/// A wrapper for `Multiaddr` that does **not** contain `Protocol::P2p`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MultiaddrWithoutPeerId(Multiaddr);
impl TryFrom<Multiaddr> for MultiaddrWithoutPeerId {
type Error = MultiaddrWrapperError;
fn try_from(addr: Multiaddr) -> Result<Self, Self::Error> {
if addr.iter().any(|p| matches!(p, Protocol::P2p(_))) {
Err(MultiaddrWrapperError::ContainsProtocolP2p)
} else {
Ok(Self(addr))
}
}
}
impl From<MultiaddrWithoutPeerId> for Multiaddr {
fn from(addr: MultiaddrWithoutPeerId) -> Self {
let MultiaddrWithoutPeerId(multiaddr) = addr;
multiaddr
}
}
impl AsRef<Multiaddr> for MultiaddrWithoutPeerId {
fn as_ref(&self) -> &Multiaddr {
&self.0
}
}
impl FromStr for MultiaddrWithoutPeerId {
type Err = MultiaddrWrapperError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let multiaddr = s
.parse::<Multiaddr>()
.map_err(MultiaddrWrapperError::InvalidMultiaddr)?;
multiaddr.try_into()
}
}
/// A `Multiaddr` paired with a discrete `PeerId`. The `Multiaddr` can contain a
/// `Protocol::P2p`, but it's not as easy to work with, and some functionalities
/// don't support it being contained within the `Multiaddr`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MultiaddrWithPeerId {
pub multiaddr: MultiaddrWithoutPeerId,
pub peer_id: PeerId,
}
impl From<(MultiaddrWithoutPeerId, PeerId)> for MultiaddrWithPeerId {
fn from((multiaddr, peer_id): (MultiaddrWithoutPeerId, PeerId)) -> Self {
Self { multiaddr, peer_id }
}
}
impl TryFrom<Multiaddr> for MultiaddrWithPeerId {
type Error = MultiaddrWrapperError;
fn try_from(mut multiaddr: Multiaddr) -> Result<Self, Self::Error> {
if let Some(Protocol::P2p(hash)) = multiaddr.pop() {
let multiaddr = MultiaddrWithoutPeerId(multiaddr);
let peer_id =
PeerId::from_multihash(hash).map_err(|_| MultiaddrWrapperError::InvalidPeerId)?;
Ok(Self { multiaddr, peer_id })
} else {
Err(MultiaddrWrapperError::MissingProtocolP2p)
}
}
}
impl FromStr for MultiaddrWithPeerId {
type Err = MultiaddrWrapperError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let multiaddr = s
.parse::<Multiaddr>()
.map_err(MultiaddrWrapperError::InvalidMultiaddr)?;
Self::try_from(multiaddr)
}
}
impl fmt::Display for MultiaddrWithPeerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/p2p/{}", self.multiaddr.as_ref(), self.peer_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn connection_targets() {
let peer_id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ";
let multiaddr_wo_peer = "/ip4/104.131.131.82/tcp/4001";
let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr_wo_peer, peer_id);
let p2p_peer = format!("/p2p/{}", peer_id);
// note: /ipfs/peer_id doesn't properly parse as a Multiaddr
assert!(multiaddr_wo_peer.parse::<MultiaddrWithoutPeerId>().is_ok());
assert!(multiaddr_with_peer.parse::<MultiaddrWithPeerId>().is_ok());
assert!(p2p_peer.parse::<Multiaddr>().is_ok());
}
}

View File

@ -1,6 +1,6 @@
use super::pubsub::Pubsub;
use super::swarm::{Connection, Disconnector, SwarmApi};
use crate::p2p::SwarmOptions;
use crate::p2p::{MultiaddrWithPeerId, SwarmOptions};
use crate::repo::BlockPut;
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use crate::{Ipfs, IpfsTypes};
@ -414,11 +414,11 @@ impl<Types: IpfsTypes> Behaviour<Types> {
self.swarm.connections()
}
pub fn connect(&mut self, addr: Multiaddr) -> Option<SubscriptionFuture<(), String>> {
pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option<SubscriptionFuture<(), String>> {
self.swarm.connect(addr)
}
pub fn disconnect(&mut self, addr: Multiaddr) -> Option<Disconnector> {
pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option<Disconnector> {
self.swarm.disconnect(addr)
}

View File

@ -5,11 +5,13 @@ use libp2p::Swarm;
use libp2p::{Multiaddr, PeerId};
use tracing::Span;
mod addr;
mod behaviour;
pub(crate) mod pubsub;
mod swarm;
mod transport;
pub use addr::{MultiaddrWithPeerId, MultiaddrWithoutPeerId};
pub use swarm::Connection;
pub type TSwarm<T> = Swarm<behaviour::Behaviour<T>>;

View File

@ -1,22 +1,20 @@
use crate::p2p::{MultiaddrWithPeerId, MultiaddrWithoutPeerId};
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use core::task::{Context, Poll};
use libp2p::core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId,
};
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
};
use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm};
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::TryInto;
use std::time::Duration;
/// A description of currently active connection.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Connection {
/// The connected peer.
pub peer_id: PeerId,
/// Any connecting address of the peer as peers can have multiple connections to
pub address: Multiaddr,
/// The connected peer along with its address.
pub addr: MultiaddrWithPeerId,
/// Latest ping report on any of the connections
pub rtt: Option<Duration>,
}
@ -44,9 +42,9 @@ pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<(), String>,
connections: HashMap<Multiaddr, PeerId>,
connections: HashMap<MultiaddrWithoutPeerId, PeerId>,
roundtrip_times: HashMap<PeerId, Duration>,
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
connected_peers: HashMap<PeerId, Vec<MultiaddrWithoutPeerId>>,
}
impl SwarmApi {
@ -70,8 +68,7 @@ impl SwarmApi {
if let Some(any) = conns.first() {
Some(Connection {
peer_id: peer.clone(),
address: any.clone(),
addr: MultiaddrWithPeerId::from((any.clone(), peer.clone())),
rtt,
})
} else {
@ -85,43 +82,32 @@ impl SwarmApi {
self.roundtrip_times.insert(peer_id.clone(), rtt);
}
pub fn connect(&mut self, mut address: Multiaddr) -> Option<SubscriptionFuture<(), String>> {
if self.connections.contains_key(&address) {
pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option<SubscriptionFuture<(), String>> {
if self.connections.contains_key(&addr.multiaddr) {
return None;
}
trace!("Connecting to {:?}", address);
trace!("Connecting to {:?}", addr);
let subscription = self
.connect_registry
.create_subscription(address.clone().into(), None);
.create_subscription(addr.clone().into(), None);
// libp2p currently doesn't support dialing with the P2p protocol
let peer_id = if let Some(Protocol::P2p(peer_id)) = address.pop() {
PeerId::from_multihash(peer_id).ok()?
} else {
return None;
};
// libp2p currently doesn't support dialing with the P2p protocol, so only consider the
// "bare" Multiaddr
let MultiaddrWithPeerId { multiaddr, .. } = addr;
if address.iter().next().is_some() {
self.events
.push_back(NetworkBehaviourAction::DialAddress { address });
} else {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
});
}
self.events.push_back(NetworkBehaviourAction::DialAddress {
address: multiaddr.into(),
});
Some(subscription)
}
pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
trace!("disconnect {}", address);
pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option<Disconnector> {
trace!("disconnect {}", addr);
// FIXME: closing a single specific connection would be allowed for ProtocolHandlers
let peer_id = self.connections.remove(&address);
if let Some(peer_id) = peer_id {
if let Some(peer_id) = self.connections.remove(&addr.multiaddr) {
// wasted some time wondering if the peer should be removed here or not; it should. the
// API is a bit ackward since we can't tolerate the Disconnector::disconnect **not**
// being called.
@ -157,6 +143,7 @@ impl NetworkBehaviour for SwarmApi {
self.connected_peers
.get(peer_id)
.cloned()
.map(|addrs| addrs.into_iter().map(From::from).collect())
.unwrap_or_default()
}
@ -168,24 +155,20 @@ impl NetworkBehaviour for SwarmApi {
) {
// TODO: could be that the connection is not yet fully established at this point
trace!("inject_connected {} {:?}", peer_id, cp);
let mut addr = connection_point_addr(cp).to_owned();
if !addr
.iter()
.any(|protocol| matches!(protocol, Protocol::P2p(_)))
{
let protocol = Protocol::P2p(peer_id.to_owned().into());
addr.push(protocol);
}
let addr: MultiaddrWithoutPeerId = connection_point_addr(cp).to_owned().try_into().unwrap();
self.peers.insert(peer_id.clone());
let connections = self.connected_peers.entry(peer_id.clone()).or_default();
connections.push(addr.clone());
self.connections.insert(addr.clone(), peer_id.clone());
if let ConnectedPoint::Dialer { .. } = cp {
let addr = MultiaddrWithPeerId {
multiaddr: addr,
peer_id: peer_id.clone(),
};
self.connect_registry
.finish_subscription(addr.into(), Ok(()));
}
@ -202,8 +185,7 @@ impl NetworkBehaviour for SwarmApi {
cp: &ConnectedPoint,
) {
trace!("inject_connection_closed {} {:?}", peer_id, cp);
let mut closed_addr = connection_point_addr(cp).to_owned();
closed_addr.push(Protocol::P2p(peer_id.to_owned().into()));
let closed_addr = connection_point_addr(cp).to_owned().try_into().unwrap();
let became_empty = if let Some(connections) = self.connected_peers.get_mut(peer_id) {
if let Some(index) = connections.iter().position(|addr| *addr == closed_addr) {
@ -219,10 +201,10 @@ impl NetworkBehaviour for SwarmApi {
self.connections.remove(&closed_addr);
if let ConnectedPoint::Dialer { .. } = cp {
self.connect_registry.finish_subscription(
closed_addr.into(),
Err("Connection reset by peer".to_owned()),
);
let addr = MultiaddrWithPeerId::from((closed_addr, peer_id.to_owned()));
self.connect_registry
.finish_subscription(addr.into(), Err("Connection reset by peer".to_owned()));
}
}
@ -236,13 +218,16 @@ impl NetworkBehaviour for SwarmApi {
fn inject_addr_reach_failure(
&mut self,
_peer_id: Option<&PeerId>,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
trace!("inject_addr_reach_failure {} {}", addr, error);
self.connect_registry
.finish_subscription(addr.clone().into(), Err(error.to_string()));
if peer_id.is_some() {
let addr: MultiaddrWithPeerId = addr.clone().try_into().unwrap();
self.connect_registry
.finish_subscription(addr.into(), Err(error.to_string()));
}
}
fn poll(
@ -270,24 +255,13 @@ mod tests {
use super::*;
use crate::p2p::transport::{build_transport, TTransport};
use libp2p::identity::Keypair;
use libp2p::swarm::Swarm;
#[test]
fn connection_targets() {
let peer_id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ";
let multiaddr = "/ip4/104.131.131.82/tcp/4001";
let multiaddr_with_peer = format!("{}/p2p/{}", multiaddr, peer_id);
let p2p_peer = format!("/p2p/{}", peer_id);
// note: /ipfs/peer_id doesn't properly parse as a Multiaddr
assert!(multiaddr_with_peer.parse::<Multiaddr>().is_ok());
assert!(p2p_peer.parse::<Multiaddr>().is_ok());
}
use libp2p::{multiaddr::Protocol, multihash::Multihash, swarm::Swarm};
use std::convert::TryInto;
#[tokio::test(max_threads = 1)]
async fn swarm_api() {
let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id);
let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, SwarmApi::default(), peer2_id);
@ -295,7 +269,11 @@ mod tests {
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
for l in Swarm::listeners(&swarm1) {
if let Some(fut) = swarm2.connect(l.to_owned()) {
let mut addr = l.to_owned();
addr.push(Protocol::P2p(
Multihash::from_bytes(peer1_id.clone().into_bytes()).unwrap(),
));
if let Some(fut) = swarm2.connect(addr.try_into().unwrap()) {
fut.await.unwrap();
}
}

View File

@ -3,14 +3,14 @@
//! that contains them. `SubscriptionFuture` is the `Future` bound to pending `Subscription`s and
//! sharing the same unique numeric identifier, the `SubscriptionId`.
use crate::RepoEvent;
use crate::{p2p::MultiaddrWithPeerId, RepoEvent};
use cid::Cid;
use core::fmt::Debug;
use core::hash::Hash;
use core::pin::Pin;
use futures::channel::mpsc::Sender;
use futures::future::Future;
use libp2p::{kad::QueryId, Multiaddr};
use libp2p::kad::QueryId;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt;
@ -28,8 +28,8 @@ static GLOBAL_REQ_COUNT: AtomicU64 = AtomicU64::new(0);
/// The type of a request for subscription.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum RequestKind {
/// A request to connect to the given `Multiaddr` or `PeerId`.
Connect(Multiaddr),
/// A request to connect to the given `Multiaddr`+`PeerId` pair.
Connect(MultiaddrWithPeerId),
/// A request to obtain a `Block` with a specific `Cid`.
GetBlock(Cid),
/// A DHT request to Kademlia.
@ -38,8 +38,8 @@ pub enum RequestKind {
Num(u32),
}
impl From<Multiaddr> for RequestKind {
fn from(addr: Multiaddr) -> Self {
impl From<MultiaddrWithPeerId> for RequestKind {
fn from(addr: MultiaddrWithPeerId) -> Self {
Self::Connect(addr)
}
}

View File

@ -20,6 +20,7 @@ async fn connect_two_nodes_by_addr() {
// Make sure only a `Multiaddr` with `/p2p/` can be used to connect.
#[tokio::test(max_threads = 1)]
#[should_panic(expected = "called `Result::unwrap()` on an `Err` value: MissingProtocolP2p")]
async fn dont_connect_without_p2p() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -120,7 +121,7 @@ async fn connect_two_nodes_with_two_connections_doesnt_panic() {
// peer..
node_a
.disconnect(peers.remove(0).address)
.disconnect(peers.remove(0).addr)
.await
.expect("failed to disconnect peer_b at peer_a");