fix: poll swarm first, use SwarmEvents

polling swarm first seems important, otherwise there'll be a race to
first listening endpoint getting up at least.
This commit is contained in:
Joonas Koivunen 2020-03-17 15:51:08 +02:00
parent cd7a416528
commit 307a724b1a

View File

@ -414,84 +414,94 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::Swarm;
use libp2p::{Swarm, swarm::SwarmEvent};
// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
// consolidate logging of these events here, if necessary?
loop {
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
let inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Poll::Ready(inner) => inner,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
};
match inner {
SwarmEvent::Behaviour(()) => {},
SwarmEvent::Connected(_peer_id) => {},
SwarmEvent::Disconnected(_peer_id) => {},
SwarmEvent::NewListenAddr(_addr) => {},
SwarmEvent::ExpiredListenAddr(_addr) => {},
SwarmEvent::UnreachableAddr { peer_id: _peer_id, address: _address, error: _error } => {},
SwarmEvent::StartConnect(_peer_id) => {}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
// temporary pinning of the receivers should be safe as we are pinning through the
// already pinned self. with the receivers we can also safely ignore exhaustion
// as those are fused.
loop {
let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) {
Poll::Ready(Some(evt)) => evt,
// doing teardown also after the `Ipfs` has been dropped
Poll::Ready(None) => IpfsEvent::Exit,
Poll::Pending => break,
};
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
}
{
let poll = Pin::new(&mut self.swarm).poll_next(ctx);
match poll {
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => {
// this should never happen with libp2p swarm
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();
addresses.extend(Swarm::listeners(&self.swarm).cloned());
addresses.extend(Swarm::external_addresses(&self.swarm).cloned());
// ignore error, perhaps caller went away already
let _ = ret.send(addresses);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
}
}
}
// Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy
// wants this to be written with a `while let`.
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid),
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
}
Poll::Pending
}
}