diff --git a/src/lib.rs b/src/lib.rs index fe233ea2..2efb3f4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -414,84 +414,94 @@ impl Future for IpfsFuture { fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { use futures::Stream; - use libp2p::Swarm; + use libp2p::{Swarm, swarm::SwarmEvent}; + + // begin by polling the swarm so that initially it'll first have chance to bind listeners + // and such. TODO: this no longer needs to be a swarm event but perhaps we should + // consolidate logging of these events here, if necessary? loop { - // temporary pinning of the receivers should be safe as we are pinning through the - // already pinned self. with the receivers we can also safely ignore exhaustion - // as those are fused. - loop { - let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { - Poll::Ready(Some(evt)) => evt, - // doing teardown also after the `Ipfs` has been dropped - Poll::Ready(None) => IpfsEvent::Exit, + let inner = { + let next = self.swarm.next_event(); + futures::pin_mut!(next); + match next.poll(ctx) { + Poll::Ready(inner) => inner, Poll::Pending => break, - }; - - match inner { - IpfsEvent::Connect(addr, ret) => { - let fut = self.swarm.connect(addr); - task::spawn(async move { - let res = fut.await.map_err(|err| format_err!("{}", err)); - ret.send(res).ok(); - }); - } - IpfsEvent::Addresses(ret) => { - let addrs = self.swarm.addrs(); - ret.send(Ok(addrs)).ok(); - } - IpfsEvent::Listeners(ret) => { - let listeners = Swarm::listeners(&self.swarm).cloned().collect(); - ret.send(Ok(listeners)).ok(); - } - IpfsEvent::Connections(ret) => { - let connections = self.swarm.connections(); - ret.send(Ok(connections)).ok(); - } - IpfsEvent::Disconnect(addr, ret) => { - if let Some(disconnector) = self.swarm.disconnect(addr) { - disconnector.disconnect(&mut self.swarm); - } - ret.send(Ok(())).ok(); - } - IpfsEvent::GetAddresses(ret) => { - // perhaps this could be moved under `IpfsEvent` or free functions? - let mut addresses = Vec::new(); - addresses.extend(Swarm::listeners(&self.swarm).cloned()); - addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); - // ignore error, perhaps caller went away already - let _ = ret.send(addresses); - } - IpfsEvent::Exit => { - // FIXME: we could do a proper teardown - return Poll::Ready(()); - } } + }; + match inner { + SwarmEvent::Behaviour(()) => {}, + SwarmEvent::Connected(_peer_id) => {}, + SwarmEvent::Disconnected(_peer_id) => {}, + SwarmEvent::NewListenAddr(_addr) => {}, + SwarmEvent::ExpiredListenAddr(_addr) => {}, + SwarmEvent::UnreachableAddr { peer_id: _peer_id, address: _address, error: _error } => {}, + SwarmEvent::StartConnect(_peer_id) => {} } + } - // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy - // wants this to be written with a `while let`. - while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { - match evt { - RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), - RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), - RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), + // temporary pinning of the receivers should be safe as we are pinning through the + // already pinned self. with the receivers we can also safely ignore exhaustion + // as those are fused. + loop { + let inner = match Pin::new(&mut self.from_facade).poll_next(ctx) { + Poll::Ready(Some(evt)) => evt, + // doing teardown also after the `Ipfs` has been dropped + Poll::Ready(None) => IpfsEvent::Exit, + Poll::Pending => break, + }; + + match inner { + IpfsEvent::Connect(addr, ret) => { + let fut = self.swarm.connect(addr); + task::spawn(async move { + let res = fut.await.map_err(|err| format_err!("{}", err)); + ret.send(res).ok(); + }); } - } - - { - let poll = Pin::new(&mut self.swarm).poll_next(ctx); - match poll { - Poll::Ready(Some(_)) => {} - Poll::Ready(None) => { - // this should never happen with libp2p swarm - return Poll::Ready(()); - } - Poll::Pending => { - return Poll::Pending; + IpfsEvent::Addresses(ret) => { + let addrs = self.swarm.addrs(); + ret.send(Ok(addrs)).ok(); + } + IpfsEvent::Listeners(ret) => { + let listeners = Swarm::listeners(&self.swarm).cloned().collect(); + ret.send(Ok(listeners)).ok(); + } + IpfsEvent::Connections(ret) => { + let connections = self.swarm.connections(); + ret.send(Ok(connections)).ok(); + } + IpfsEvent::Disconnect(addr, ret) => { + if let Some(disconnector) = self.swarm.disconnect(addr) { + disconnector.disconnect(&mut self.swarm); } + ret.send(Ok(())).ok(); + } + IpfsEvent::GetAddresses(ret) => { + // perhaps this could be moved under `IpfsEvent` or free functions? + let mut addresses = Vec::new(); + addresses.extend(Swarm::listeners(&self.swarm).cloned()); + addresses.extend(Swarm::external_addresses(&self.swarm).cloned()); + // ignore error, perhaps caller went away already + let _ = ret.send(addresses); + } + IpfsEvent::Exit => { + // FIXME: we could do a proper teardown + return Poll::Ready(()); } } } + + // Poll::Ready(None) and Poll::Pending can be used to break out of the loop, clippy + // wants this to be written with a `while let`. + while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { + match evt { + RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), + RepoEvent::ProvideBlock(cid) => self.swarm.provide_block(cid), + RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), + } + } + + Poll::Pending } }