Merge pull request #108 from eqlabs/add_connect_integration_test

Add connect integration test
This commit is contained in:
Joonas Koivunen 2020-03-22 19:02:17 +02:00 committed by GitHub
commit 290fcfa3ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 77 deletions

View File

@ -105,6 +105,18 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
}
}
impl IpfsOptions<TestTypes> {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys(mdns: bool) -> Self {
Self::new(
std::env::temp_dir().into(),
Keypair::generate_ed25519(),
vec![],
mdns,
)
}
}
/// Workaround for libp2p::identity::Keypair missing a Debug impl, works with references and owned
/// keypairs.
#[derive(Clone)]
@ -407,84 +419,98 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::Swarm;
use libp2p::{swarm::SwarmEvent, Swarm};
// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
// consolidate logging of these events here, if necessary?
loop {
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
};
match inner {
SwarmEvent::Behaviour(()) => {}
SwarmEvent::Connected(_peer_id) => {}
SwarmEvent::Disconnected(_peer_id) => {}
SwarmEvent::NewListenAddr(_addr) => {}
SwarmEvent::ExpiredListenAddr(_addr) => {}
SwarmEvent::UnreachableAddr {
peer_id: _peer_id,
address: _address,
error: _error,
} => {}
SwarmEvent::StartConnect(_peer_id) => {}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
}
{
let poll = Pin::new(&mut self.swarm).poll_next(ctx);
match poll {
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => {
// this should never happen with libp2p swarm
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
Poll::Pending
}
}

View File

@ -4,8 +4,9 @@ use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, Swarm};
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::Waker;
use std::time::Duration;
#[derive(Clone, Debug)]
@ -26,13 +27,18 @@ impl Disconnector {
}
}
// Currently this is swarm::NetworkBehaviourAction<Void, Void>
type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, <SwarmApi as NetworkBehaviour>::OutEvent>;
#[derive(Debug, Default)]
pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction<void::Void, void::Void>>,
events: VecDeque<NetworkBehaviourAction>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<Multiaddr, Result<(), String>>,
connections: HashMap<Multiaddr, Connection>,
connected_peers: HashMap<PeerId, Multiaddr>,
/// The waker of the last polled task, if any.
waker: Option<Waker>,
}
impl SwarmApi {
@ -65,15 +71,23 @@ impl SwarmApi {
}
pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
log::trace!("connect {}", address.to_string());
self.events.push_back(NetworkBehaviourAction::DialAddress {
log::trace!("starting to connect to {}", address);
self.push_action(NetworkBehaviourAction::DialAddress {
address: address.clone(),
});
self.connect_registry.create_subscription(address)
}
fn push_action(&mut self, action: NetworkBehaviourAction) {
self.events.push_back(action);
if let Some(waker) = self.waker.as_ref() {
waker.wake_by_ref();
}
}
pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
log::trace!("disconnect {}", address.to_string());
log::trace!("disconnect {}", address);
self.connections.remove(&address);
let peer_id = self
.connections
@ -144,10 +158,13 @@ impl NetworkBehaviour for SwarmApi {
.finish_subscription(addr, Err(format!("{}", error)));
}
#[allow(clippy::type_complexity)]
fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<
Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
log::trace!("poll");
fn poll(
&mut self,
ctx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction> {
// store the poller so that we can wake the task on next push_action
self.waker = Some(ctx.waker().clone());
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {

71
tests/connect_two.rs Normal file
View File

@ -0,0 +1,71 @@
use async_std::task;
/// Make sure two instances of ipfs can be connected.
#[test]
fn connect_two_nodes() {
// env_logger::init();
// make sure the connection will only happen through explicit connect
let mdns = false;
let (tx, rx) = futures::channel::oneshot::channel();
let node_a = task::spawn(async move {
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns);
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
.await
.unwrap();
let jh = task::spawn(fut);
let (pk, addrs) = ipfs
.identity()
.await
.expect("failed to read identity() on node_a");
assert!(!addrs.is_empty());
tx.send((pk, addrs, ipfs, jh)).unwrap();
});
task::block_on(async move {
let (other_pk, other_addrs, other_ipfs, other_jh) = rx.await.unwrap();
println!("got back from the other node: {:?}", other_addrs);
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(mdns);
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
.await
.unwrap();
let jh = task::spawn(fut);
let _other_peerid = other_pk.into_peer_id();
let mut connected = None;
for addr in other_addrs {
println!("trying {}", addr);
match ipfs.connect(addr.clone()).await {
Ok(_) => {
connected = Some(addr);
break;
}
Err(e) => {
println!("Failed connecting to {}: {}", addr, e);
}
}
}
let connected = connected.expect("Failed to connect to anything");
println!("connected to {}", connected);
other_ipfs.exit_daemon().await;
other_jh.await;
node_a.await;
ipfs.exit_daemon().await;
jh.await;
});
}