diff --git a/src/lib.rs b/src/lib.rs index e7dddd03..e4f2645e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -231,7 +231,6 @@ pub struct IpfsInner { } type Channel = OneshotSender>; -type FutureSubscription = SubscriptionFuture>; /// Events used internally to communicate with the swarm, which is executed in the the background /// task. @@ -240,7 +239,7 @@ enum IpfsEvent { /// Connect Connect( ConnectionTarget, - OneshotSender>, + OneshotSender>, ), /// Addresses Addresses(Channel)>>), @@ -261,9 +260,9 @@ enum IpfsEvent { BitswapStats(OneshotSender), AddListeningAddress(Multiaddr, Channel), RemoveListeningAddress(Multiaddr, Channel<()>), - Bootstrap(OneshotSender, Error>>), + Bootstrap(OneshotSender, Error>>), AddPeer(PeerId, Multiaddr), - GetClosestPeers(PeerId, OneshotSender>), + GetClosestPeers(PeerId, OneshotSender>), Exit, } @@ -473,7 +472,7 @@ impl Ipfs { .send(IpfsEvent::Connect(target.into(), tx)) .await?; let subscription = rx.await?; - subscription.await?.map_err(|e| format_err!("{}", e)) + subscription.await.map_err(|e| anyhow!(e)) }) .await } @@ -997,11 +996,11 @@ impl Future for IpfsFuture { match evt { RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid), - RepoEvent::ProvideBlock(cid) => { + RepoEvent::ProvideBlock(cid, ret) => { // TODO: consider if cancel is applicable in cases where we provide the // associated Block ourselves self.swarm.bitswap().cancel_block(&cid); - self.swarm.provide_block(cid) + let _ = ret.send(self.swarm.provide_block(cid)); } RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid), } @@ -1080,29 +1079,29 @@ mod node { pub fn get_subscriptions( &self, - ) -> &futures::lock::Mutex> { + ) -> &futures::lock::Mutex> { &self.ipfs.repo.subscriptions.subscriptions } pub async fn get_closest_peers(&self) -> Result<(), Error> { let self_peer = PeerId::from_public_key(self.identity().await?.0); - let (tx, rx) = oneshot_channel::>(); + let (tx, rx) = oneshot_channel(); self.to_task .clone() .send(IpfsEvent::GetClosestPeers(self_peer, tx)) .await?; - rx.await?.await?.map_err(|e| anyhow!(e)) + rx.await?.await.map_err(|e| anyhow!(e)) } /// Initiate a query for random key to discover peers. pub async fn bootstrap(&self) -> Result<(), Error> { - let (tx, rx) = oneshot_channel::, Error>>(); + let (tx, rx) = oneshot_channel(); self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?; - rx.await??.await?.map_err(|e| anyhow!(e)) + rx.await??.await.map_err(|e| anyhow!(e)) } /// Add a known peer to the DHT. diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 41651b93..b133eb0d 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -28,7 +28,7 @@ pub struct Behaviour { mdns: Toggle, kademlia: Kademlia, #[behaviour(ignore)] - kad_subscriptions: SubscriptionRegistry>, + kad_subscriptions: SubscriptionRegistry<(), String>, bitswap: Bitswap, ping: Ping, identify: Identify, @@ -84,15 +84,17 @@ impl NetworkBehaviourEventProcess for Behaviour } GetClosestPeers(Ok(GetClosestPeersOk { key: _, peers })) => { for peer in peers { - info!("kad: peer {} is close", peer); + // don't mention the key here, as this is just the id of our node + debug!("kad: peer {} is close", peer); } } GetClosestPeers(Err(GetClosestPeersError::Timeout { key: _, peers })) => { + // don't mention the key here, as this is just the id of our node warn!( "kad: timed out trying to find all closest peers; got the following:" ); for peer in peers { - info!("kad: peer {} is close", peer); + debug!("kad: peer {} is close", peer); } } GetProviders(Ok(GetProvidersOk { @@ -105,7 +107,7 @@ impl NetworkBehaviourEventProcess for Behaviour warn!("kad: could not find a provider for {}", key); } else { for peer in closest_peers.into_iter().chain(providers.into_iter()) { - info!("kad: {} is provided by {}", key, peer); + debug!("kad: {} is provided by {}", key, peer); self.bitswap.connect(peer); } } @@ -116,7 +118,7 @@ impl NetworkBehaviourEventProcess for Behaviour } StartProviding(Ok(AddProviderOk { key })) => { let key = multibase::encode(Base::Base32Lower, key); - info!("kad: providing {}", key); + debug!("kad: providing {}", key); } StartProviding(Err(AddProviderError::Timeout { key })) => { let key = multibase::encode(Base::Base32Lower, key); @@ -124,7 +126,7 @@ impl NetworkBehaviourEventProcess for Behaviour } RepublishProvider(Ok(AddProviderOk { key })) => { let key = multibase::encode(Base::Base32Lower, key); - info!("kad: republished provider {}", key); + debug!("kad: republished provider {}", key); } RepublishProvider(Err(AddProviderError::Timeout { key })) => { let key = multibase::encode(Base::Base32Lower, key); @@ -133,7 +135,7 @@ impl NetworkBehaviourEventProcess for Behaviour GetRecord(Ok(GetRecordOk { records })) => { for record in records { let key = multibase::encode(Base::Base32Lower, record.record.key); - info!("kad: got record {}:{:?}", key, record.record.value); + debug!("kad: got record {}:{:?}", key, record.record.value); } } GetRecord(Err(GetRecordError::NotFound { @@ -156,7 +158,7 @@ impl NetworkBehaviourEventProcess for Behaviour ); for record in records { let key = multibase::encode(Base::Base32Lower, record.record.key); - info!("kad: got record {}:{:?}", key, record.record.value); + debug!("kad: got record {}:{:?}", key, record.record.value); } } GetRecord(Err(GetRecordError::Timeout { @@ -172,13 +174,13 @@ impl NetworkBehaviourEventProcess for Behaviour ); for record in records { let key = multibase::encode(Base::Base32Lower, record.record.key); - info!("kad: got record {}:{:?}", key, record.record.value); + debug!("kad: got record {}:{:?}", key, record.record.value); } } PutRecord(Ok(PutRecordOk { key })) | RepublishRecord(Ok(PutRecordOk { key })) => { let key = multibase::encode(Base::Base32Lower, key); - info!("kad: successfully put record {}", key); + debug!("kad: successfully put record {}", key); } PutRecord(Err(PutRecordError::QuorumFailed { key, @@ -191,7 +193,7 @@ impl NetworkBehaviourEventProcess for Behaviour quorum, })) => { let key = multibase::encode(Base::Base32Lower, key); - info!( + warn!( "kad: quorum failed ({}) trying to put record {}", quorum, key ); @@ -207,7 +209,7 @@ impl NetworkBehaviourEventProcess for Behaviour quorum: _, })) => { let key = multibase::encode(Base::Base32Lower, key); - info!("kad: timed out trying to put record {}", key); + warn!("kad: timed out trying to put record {}", key); } } } @@ -369,7 +371,7 @@ impl Behaviour { options.keypair.public(), ); let pubsub = Pubsub::new(options.peer_id); - let swarm = SwarmApi::new(); + let swarm = SwarmApi::default(); Behaviour { ipfs, @@ -411,7 +413,7 @@ impl Behaviour { self.swarm.connections() } - pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture> { + pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<(), String> { self.swarm.connect(target) } @@ -427,20 +429,23 @@ impl Behaviour { self.bitswap.want_block(cid, 1); } - // FIXME: it would probably be best if this could return a SubscriptionFuture, so - // that the put_block operation truly finishes only when the block is already being - // provided; it is, however, pretty tricky in terms of internal communication between - // Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth - pub fn provide_block(&mut self, cid: Cid) { - let key = cid.to_bytes(); - match self.kademlia.start_providing(key.into()) { - Ok(_id) => { - // Ok(self.kad_subscriptions.create_subscription(id.into(), None)) - } - Err(e) => { - error!("kad: can't provide block {}: {:?}", cid, e); - // Err(anyhow!("kad: can't provide block {}", key)) + pub fn provide_block( + &mut self, + cid: Cid, + ) -> Result, anyhow::Error> { + // currently disabled; see https://github.com/rs-ipfs/rust-ipfs/pull/281#discussion_r465583345 + // for details regarding the concerns about enabling this functionality as-is + if false { + let key = cid.to_bytes(); + match self.kademlia.start_providing(key.into()) { + // Kademlia queries are marked with QueryIds, which are most fitting to + // be used as kad Subscription keys - they are small and require no + // conversion for the applicable finish_subscription calls + Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)), + Err(e) => Err(anyhow!("kad: can't provide block {}: {:?}", cid, e)), } + } else { + Err(anyhow!("providing blocks is currently unsupported")) } } @@ -458,7 +463,7 @@ impl Behaviour { &mut self.bitswap } - pub fn bootstrap(&mut self) -> Result>, anyhow::Error> { + pub fn bootstrap(&mut self) -> Result, anyhow::Error> { match self.kademlia.bootstrap() { Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)), Err(e) => { @@ -468,7 +473,7 @@ impl Behaviour { } } - pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture> { + pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture<(), String> { let id = id.to_base58(); self.kad_subscriptions diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 74dd6b17..2187c440 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -59,17 +59,13 @@ type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<, peers: HashSet, - connect_registry: SubscriptionRegistry>, + connect_registry: SubscriptionRegistry<(), String>, connections: HashMap, roundtrip_times: HashMap, connected_peers: HashMap>, } impl SwarmApi { - pub fn new() -> Self { - Self::default() - } - pub fn add_peer(&mut self, peer_id: PeerId) { self.peers.insert(peer_id); } @@ -105,7 +101,7 @@ impl SwarmApi { self.roundtrip_times.insert(peer_id.clone(), rtt); } - pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture> { + pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<(), String> { trace!("Connecting to {:?}", target); self.events.push_back(match target { @@ -211,9 +207,10 @@ impl NetworkBehaviour for SwarmApi { self.connected_peers.remove(peer_id); } self.connections.remove(closed_addr); - // FIXME: should be an error - self.connect_registry - .finish_subscription(closed_addr.clone().into(), Ok(())); + self.connect_registry.finish_subscription( + closed_addr.clone().into(), + Err("Connection reset by peer".to_owned()), + ); } fn inject_disconnected(&mut self, peer_id: &PeerId) { @@ -259,50 +256,22 @@ fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr { mod tests { use super::*; use crate::p2p::transport::{build_transport, TTransport}; - use futures::channel::mpsc; - use futures::future::{select, FutureExt}; - use futures::sink::SinkExt; - use futures::stream::StreamExt; use libp2p::identity::Keypair; use libp2p::swarm::Swarm; #[async_std::test] async fn swarm_api() { let (peer1_id, trans) = mk_transport(); - let mut swarm1 = Swarm::new(trans, SwarmApi::new(), peer1_id); + let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id); let (peer2_id, trans) = mk_transport(); - let mut swarm2 = Swarm::new(trans, SwarmApi::new(), peer2_id); + let mut swarm2 = Swarm::new(trans, SwarmApi::default(), peer2_id); - let (mut tx, mut rx) = mpsc::channel::(1); Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let peer1 = async move { - while swarm1.next().now_or_never().is_some() {} - - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } - - loop { - swarm1.next().await; - } - }; - - let peer2 = async move { - let future = swarm2.connect(rx.next().await.unwrap().into()); - - let poll_swarm = async move { - loop { - swarm2.next().await; - } - }; - - select(Box::pin(future), Box::pin(poll_swarm)).await; - }; - - let result = select(Box::pin(peer1), Box::pin(peer2)); - result.await; + for l in Swarm::listeners(&swarm1) { + swarm2.connect(l.to_owned().into()).await.unwrap(); + } } fn mk_transport() -> (PeerId, TTransport) { diff --git a/src/repo/mod.rs b/src/repo/mod.rs index cf0bc83a..453e5010 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,7 +1,7 @@ //! IPFS repo use crate::error::Error; use crate::path::IpfsPath; -use crate::subscription::{RequestKind, SubscriptionRegistry}; +use crate::subscription::{RequestKind, SubscriptionFuture, SubscriptionRegistry}; use crate::IpfsOptions; use async_std::path::PathBuf; use async_trait::async_trait; @@ -10,7 +10,10 @@ use cid::{self, Cid}; use core::convert::TryFrom; use core::fmt::Debug; use core::marker::PhantomData; -use futures::channel::mpsc::{channel, Receiver, Sender}; +use futures::channel::{ + mpsc::{channel, Receiver, Sender}, + oneshot, +}; use futures::sink::SinkExt; use libp2p::core::PeerId; use std::hash::{Hash, Hasher}; @@ -121,14 +124,17 @@ pub struct Repo { block_store: TRepoTypes::TBlockStore, data_store: TRepoTypes::TDataStore, events: Sender, - pub(crate) subscriptions: SubscriptionRegistry, + pub(crate) subscriptions: SubscriptionRegistry, } -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum RepoEvent { WantBlock(Cid), UnwantBlock(Cid), - ProvideBlock(Cid), + ProvideBlock( + Cid, + oneshot::Sender, anyhow::Error>>, + ), UnprovideBlock(Cid), } @@ -197,14 +203,24 @@ impl Repo { let cid = block.cid.clone(); let (_cid, res) = self.block_store.put(block.clone()).await?; self.subscriptions - .finish_subscription(cid.clone().into(), block); - // sending only fails if no one is listening anymore - // and that is okay with us. - self.events - .clone() - .send(RepoEvent::ProvideBlock(cid.clone())) - .await - .ok(); + .finish_subscription(cid.clone().into(), Ok(block)); + + if let BlockPut::NewBlock = res { + // sending only fails if no one is listening anymore + // and that is okay with us. + let (tx, rx) = oneshot::channel(); + + self.events + .clone() + .send(RepoEvent::ProvideBlock(cid.clone(), tx)) + .await + .ok(); + + if let Ok(kad_subscription) = rx.await? { + kad_subscription.await?; + } + } + Ok((cid, res)) } diff --git a/src/subscription.rs b/src/subscription.rs index b96d2216..a8e2cc52 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -85,15 +85,15 @@ impl fmt::Display for RequestKind { type SubscriptionId = u64; /// The specific collection used to hold all the `Subscription`s. -pub type Subscriptions = HashMap>>; +pub type Subscriptions = HashMap>>; /// A collection of all the live `Subscription`s. -pub struct SubscriptionRegistry { - pub(crate) subscriptions: Arc>>, +pub struct SubscriptionRegistry { + pub(crate) subscriptions: Arc>>, shutting_down: AtomicBool, } -impl fmt::Debug for SubscriptionRegistry { +impl fmt::Debug for SubscriptionRegistry { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, @@ -104,13 +104,13 @@ impl fmt::Debug for SubscriptionRegistry } } -impl SubscriptionRegistry { +impl SubscriptionRegistry { /// Creates a `Subscription` and returns its associated `Future`, the `SubscriptionFuture`. pub fn create_subscription( &self, kind: RequestKind, cancel_notifier: Option>, - ) -> SubscriptionFuture { + ) -> SubscriptionFuture { let id = GLOBAL_REQ_COUNT.fetch_add(1, Ordering::SeqCst); debug!("Creating subscription {} to {}", id, kind); @@ -139,7 +139,7 @@ impl SubscriptionRegistry { /// Finalizes all pending subscriptions of the specified kind with the given `result`. /// - pub fn finish_subscription(&self, req_kind: RequestKind, result: TRes) { + pub fn finish_subscription(&self, req_kind: RequestKind, result: Result) { let mut subscriptions = task::block_on(async { self.subscriptions.lock().await }); let related_subs = subscriptions.get_mut(&req_kind); @@ -149,11 +149,19 @@ impl SubscriptionRegistry { if let Some(related_subs) = related_subs { debug!("Finishing the subscription to {}", req_kind); + let mut awoken = 0; for sub in related_subs.values_mut() { if let Subscription::Pending { .. } = sub { sub.wake(result.clone()); + awoken += 1; } } + + // ensure that the subscriptions are being handled correctly: normally + // finish_subscriptions should result in some related futures being awoken + debug_assert!(awoken != 0); + + trace!("Woke {} related subscription(s)", awoken); } } @@ -181,7 +189,7 @@ impl SubscriptionRegistry { } } -impl Default for SubscriptionRegistry { +impl Default for SubscriptionRegistry { fn default() -> Self { Self { subscriptions: Default::default(), @@ -190,29 +198,33 @@ impl Default for SubscriptionRegistry { } } -impl Drop for SubscriptionRegistry { +impl Drop for SubscriptionRegistry { fn drop(&mut self) { self.shutdown(); } } -/// A value returned when a `Subscription` and it's linked `SubscriptionFuture` -/// is cancelled before completion or when the `Future` is aborted. -#[derive(Debug, PartialEq, Eq)] -pub struct Cancelled; +#[derive(Debug, PartialEq)] +pub enum SubscriptionErr { + /// A value returned when a `Subscription` and it's linked `SubscriptionFuture` + /// is cancelled before completion or when the `Future` is aborted. + Cancelled, + /// Other errors not caused by cancellation. + Failed(E), +} -impl fmt::Display for Cancelled { +impl fmt::Display for SubscriptionErr { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "{:?}", self) } } -impl std::error::Error for Cancelled {} +impl std::error::Error for SubscriptionErr {} /// Represents a request for a resource at different stages of its lifetime. -pub enum Subscription { +pub enum Subscription { /// A finished `Subscription` containing the desired `TRes` value. - Ready(TRes), + Ready(Result), /// A standing request that hasn't been fulfilled yet. Pending { /// The waker of the task assigned to check if the `Subscription` is complete. @@ -224,7 +236,7 @@ pub enum Subscription { Cancelled, } -impl fmt::Debug for Subscription { +impl fmt::Debug for Subscription { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { use Subscription::*; match self { @@ -250,7 +262,7 @@ impl fmt::Debug for Subscription { } } -impl Subscription { +impl Subscription { fn new(cancel_notifier: Option>) -> Self { Self::Pending { waker: Default::default(), @@ -258,7 +270,7 @@ impl Subscription { } } - fn wake(&mut self, result: TRes) { + fn wake(&mut self, result: Result) { let former_self = mem::replace(self, Subscription::Ready(result)); if let Subscription::Pending { waker, .. } = former_self { if let Some(waker) = waker { @@ -279,8 +291,8 @@ impl Subscription { // send a cancel notification to the repo - the wantlist needs // to be updated if is_last { - trace!("Last related subscription cancelled, sending a cancel notification"); if let Some(mut sender) = cancel_notifier { + trace!("Last related subscription cancelled, sending a cancel notification"); let _ = sender.try_send(RepoEvent::try_from(kind).unwrap()); } } @@ -293,20 +305,20 @@ impl Subscription { } /// A `Future` that resolves to the resource whose subscription was requested. -pub struct SubscriptionFuture { +pub struct SubscriptionFuture { /// The unique identifier of the subscription request and the secondary /// key in the `SubscriptionRegistry`. id: u64, /// The type of the request made; the primary key in the `SubscriptionRegistry`. kind: RequestKind, /// A reference to the subscriptions at the `SubscriptionRegistry`. - subscriptions: Arc>>, + subscriptions: Arc>>, /// True if the cleanup is already done, false if `Drop` needs to do it cleanup_complete: bool, } -impl Future for SubscriptionFuture { - type Output = Result; +impl Future for SubscriptionFuture { + type Output = Result>; fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll { use std::collections::hash_map::Entry::*; @@ -327,7 +339,7 @@ impl Future for SubscriptionFuture { let (became_empty, ret) = match related_subs.entry(self.id) { // there were no related subs, it can only mean cancellation or polling after // Poll::Ready - Vacant(_) => return Poll::Ready(Err(Cancelled)), + Vacant(_) => return Poll::Ready(Err(SubscriptionErr::Cancelled)), Occupied(mut oe) => { let unwrapped = match oe.get_mut() { Subscription::Pending { ref mut waker, .. } => { @@ -337,10 +349,10 @@ impl Future for SubscriptionFuture { } Subscription::Cancelled => { oe.remove(); - Err(Cancelled) + Err(SubscriptionErr::Cancelled) } _ => match oe.remove() { - Subscription::Ready(result) => Ok(result), + Subscription::Ready(result) => result.map_err(SubscriptionErr::Failed), _ => unreachable!("already matched"), }, }; @@ -360,14 +372,14 @@ impl Future for SubscriptionFuture { self.cleanup_complete = became_empty; Poll::Ready(ret) } else { - Poll::Ready(Err(Cancelled)) + Poll::Ready(Err(SubscriptionErr::Cancelled)) } } } -impl Drop for SubscriptionFuture { +impl Drop for SubscriptionFuture { fn drop(&mut self) { - trace!("Dropping subscription {} to {}", self.id, self.kind); + trace!("Dropping subscription future {} to {}", self.id, self.kind); if self.cleanup_complete { // cleaned up the easier variants already @@ -393,22 +405,21 @@ impl Drop for SubscriptionFuture { }); if let Some(sub) = sub { - // don't bother updating anything that isn't `Pending` + // don't cancel anything that isn't `Pending` if let mut sub @ Subscription::Pending { .. } = sub { - if is_last { - sub.cancel(self.id, self.kind.clone(), is_last); - } + sub.cancel(self.id, self.kind.clone(), is_last); } } } } -impl fmt::Debug for SubscriptionFuture { +impl fmt::Debug for SubscriptionFuture { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, - "SubscriptionFuture>", - std::any::type_name::() + "SubscriptionFuture>", + std::any::type_name::(), + std::any::type_name::(), ) } } @@ -425,11 +436,11 @@ mod tests { #[async_std::test] async fn subscription_basics() { - let registry = SubscriptionRegistry::::default(); + let registry = SubscriptionRegistry::::default(); let s1 = registry.create_subscription(0.into(), None); let s2 = registry.create_subscription(0.into(), None); let s3 = registry.create_subscription(0.into(), None); - registry.finish_subscription(0.into(), 10); + registry.finish_subscription(0.into(), Ok(10)); assert_eq!(s1.await.unwrap(), 10); assert_eq!(s2.await.unwrap(), 10); assert_eq!(s3.await.unwrap(), 10); @@ -437,26 +448,26 @@ mod tests { #[async_std::test] async fn subscription_cancelled_on_dropping_registry() { - let registry = SubscriptionRegistry::::default(); + let registry = SubscriptionRegistry::::default(); let s1 = registry.create_subscription(0.into(), None); drop(registry); - assert_eq!(s1.await, Err(Cancelled)); + assert_eq!(s1.await, Err(SubscriptionErr::Cancelled)); } #[async_std::test] async fn subscription_cancelled_on_shutdown() { - let registry = SubscriptionRegistry::::default(); + let registry = SubscriptionRegistry::::default(); let s1 = registry.create_subscription(0.into(), None); registry.shutdown(); - assert_eq!(s1.await, Err(Cancelled)); + assert_eq!(s1.await, Err(SubscriptionErr::Cancelled)); } #[async_std::test] async fn new_subscriptions_cancelled_after_shutdown() { - let registry = SubscriptionRegistry::::default(); + let registry = SubscriptionRegistry::::default(); registry.shutdown(); let s1 = registry.create_subscription(0.into(), None); - assert_eq!(s1.await, Err(Cancelled)); + assert_eq!(s1.await, Err(SubscriptionErr::Cancelled)); } #[async_std::test] @@ -464,7 +475,7 @@ mod tests { use async_std::future::timeout; use std::time::Duration; - let registry = SubscriptionRegistry::::default(); + let registry = SubscriptionRegistry::::default(); let s1 = timeout( Duration::from_millis(1), registry.create_subscription(0.into(), None), @@ -475,7 +486,7 @@ mod tests { s1.await.unwrap_err(); // this will cause a call to waker installed by s1, but it shouldn't be a problem. - registry.finish_subscription(0.into(), 0); + registry.finish_subscription(0.into(), Ok(0)); assert_eq!(s2.await.unwrap(), 0); }