281: Some Kademlia debugging r=koivunej a=ljedrz

These changes were sparked by the following 2 observations:
- the logs sometimes indicate that a Kademlia query was executed twice
- `finish_subscription` didn't always result in futures being awoken

While the former remains a mystery (its occurrence is not correlated with subscriptions, meaning it's either some polling issue that eludes me or a bug in `libp2p`), investigating the `kad`<>`Subscription` route yielded a few improvements, namely:
- tweaked `kad` log levels
- proper `SubscriptionFuture` handling in `put_block`
- improved `SubscriptionRegistry` logs and a small fix
- simpler `SubscriptionFuture` type handling (it always returned a `Result`, so now it's a default)
- a `debug_assert` checking that we don't trigger zero-wake cases in `finish_subscription` during tests
- improved `swarm_api` test (that sometimes caused issues with the new `debug_assert`, but could be improved regardless)

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-08-06 13:39:18 +00:00 committed by GitHub
commit 0fb84c0b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 144 deletions

View File

@ -231,7 +231,6 @@ pub struct IpfsInner<Types: IpfsTypes> {
}
type Channel<T> = OneshotSender<Result<T, Error>>;
type FutureSubscription<T, E> = SubscriptionFuture<Result<T, E>>;
/// Events used internally to communicate with the swarm, which is executed in the the background
/// task.
@ -240,7 +239,7 @@ enum IpfsEvent {
/// Connect
Connect(
ConnectionTarget,
OneshotSender<FutureSubscription<(), String>>,
OneshotSender<SubscriptionFuture<(), String>>,
),
/// Addresses
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
@ -261,9 +260,9 @@ enum IpfsEvent {
BitswapStats(OneshotSender<BitswapStats>),
AddListeningAddress(Multiaddr, Channel<Multiaddr>),
RemoveListeningAddress(Multiaddr, Channel<()>),
Bootstrap(OneshotSender<Result<FutureSubscription<(), String>, Error>>),
Bootstrap(OneshotSender<Result<SubscriptionFuture<(), String>, Error>>),
AddPeer(PeerId, Multiaddr),
GetClosestPeers(PeerId, OneshotSender<FutureSubscription<(), String>>),
GetClosestPeers(PeerId, OneshotSender<SubscriptionFuture<(), String>>),
Exit,
}
@ -473,7 +472,7 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.send(IpfsEvent::Connect(target.into(), tx))
.await?;
let subscription = rx.await?;
subscription.await?.map_err(|e| format_err!("{}", e))
subscription.await.map_err(|e| anyhow!(e))
})
.await
}
@ -997,11 +996,11 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
RepoEvent::ProvideBlock(cid) => {
RepoEvent::ProvideBlock(cid, ret) => {
// TODO: consider if cancel is applicable in cases where we provide the
// associated Block ourselves
self.swarm.bitswap().cancel_block(&cid);
self.swarm.provide_block(cid)
let _ = ret.send(self.swarm.provide_block(cid));
}
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}
@ -1080,29 +1079,29 @@ mod node {
pub fn get_subscriptions(
&self,
) -> &futures::lock::Mutex<subscription::Subscriptions<Block>> {
) -> &futures::lock::Mutex<subscription::Subscriptions<Block, String>> {
&self.ipfs.repo.subscriptions.subscriptions
}
pub async fn get_closest_peers(&self) -> Result<(), Error> {
let self_peer = PeerId::from_public_key(self.identity().await?.0);
let (tx, rx) = oneshot_channel::<FutureSubscription<(), String>>();
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::GetClosestPeers(self_peer, tx))
.await?;
rx.await?.await?.map_err(|e| anyhow!(e))
rx.await?.await.map_err(|e| anyhow!(e))
}
/// Initiate a query for random key to discover peers.
pub async fn bootstrap(&self) -> Result<(), Error> {
let (tx, rx) = oneshot_channel::<Result<FutureSubscription<(), String>, Error>>();
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Bootstrap(tx)).await?;
rx.await??.await?.map_err(|e| anyhow!(e))
rx.await??.await.map_err(|e| anyhow!(e))
}
/// Add a known peer to the DHT.

View File

@ -28,7 +28,7 @@ pub struct Behaviour<Types: IpfsTypes> {
mdns: Toggle<Mdns>,
kademlia: Kademlia<MemoryStore>,
#[behaviour(ignore)]
kad_subscriptions: SubscriptionRegistry<Result<(), String>>,
kad_subscriptions: SubscriptionRegistry<(), String>,
bitswap: Bitswap,
ping: Ping,
identify: Identify,
@ -84,15 +84,17 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
}
GetClosestPeers(Ok(GetClosestPeersOk { key: _, peers })) => {
for peer in peers {
info!("kad: peer {} is close", peer);
// don't mention the key here, as this is just the id of our node
debug!("kad: peer {} is close", peer);
}
}
GetClosestPeers(Err(GetClosestPeersError::Timeout { key: _, peers })) => {
// don't mention the key here, as this is just the id of our node
warn!(
"kad: timed out trying to find all closest peers; got the following:"
);
for peer in peers {
info!("kad: peer {} is close", peer);
debug!("kad: peer {} is close", peer);
}
}
GetProviders(Ok(GetProvidersOk {
@ -105,7 +107,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
warn!("kad: could not find a provider for {}", key);
} else {
for peer in closest_peers.into_iter().chain(providers.into_iter()) {
info!("kad: {} is provided by {}", key, peer);
debug!("kad: {} is provided by {}", key, peer);
self.bitswap.connect(peer);
}
}
@ -116,7 +118,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
}
StartProviding(Ok(AddProviderOk { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: providing {}", key);
debug!("kad: providing {}", key);
}
StartProviding(Err(AddProviderError::Timeout { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
@ -124,7 +126,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
}
RepublishProvider(Ok(AddProviderOk { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: republished provider {}", key);
debug!("kad: republished provider {}", key);
}
RepublishProvider(Err(AddProviderError::Timeout { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
@ -133,7 +135,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
GetRecord(Ok(GetRecordOk { records })) => {
for record in records {
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
debug!("kad: got record {}:{:?}", key, record.record.value);
}
}
GetRecord(Err(GetRecordError::NotFound {
@ -156,7 +158,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
);
for record in records {
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
debug!("kad: got record {}:{:?}", key, record.record.value);
}
}
GetRecord(Err(GetRecordError::Timeout {
@ -172,13 +174,13 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
);
for record in records {
let key = multibase::encode(Base::Base32Lower, record.record.key);
info!("kad: got record {}:{:?}", key, record.record.value);
debug!("kad: got record {}:{:?}", key, record.record.value);
}
}
PutRecord(Ok(PutRecordOk { key }))
| RepublishRecord(Ok(PutRecordOk { key })) => {
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: successfully put record {}", key);
debug!("kad: successfully put record {}", key);
}
PutRecord(Err(PutRecordError::QuorumFailed {
key,
@ -191,7 +193,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
quorum,
})) => {
let key = multibase::encode(Base::Base32Lower, key);
info!(
warn!(
"kad: quorum failed ({}) trying to put record {}",
quorum, key
);
@ -207,7 +209,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<KademliaEvent> for Behaviour
quorum: _,
})) => {
let key = multibase::encode(Base::Base32Lower, key);
info!("kad: timed out trying to put record {}", key);
warn!("kad: timed out trying to put record {}", key);
}
}
}
@ -369,7 +371,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {
options.keypair.public(),
);
let pubsub = Pubsub::new(options.peer_id);
let swarm = SwarmApi::new();
let swarm = SwarmApi::default();
Behaviour {
ipfs,
@ -411,7 +413,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {
self.swarm.connections()
}
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<Result<(), String>> {
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<(), String> {
self.swarm.connect(target)
}
@ -427,20 +429,23 @@ impl<Types: IpfsTypes> Behaviour<Types> {
self.bitswap.want_block(cid, 1);
}
// FIXME: it would probably be best if this could return a SubscriptionFuture, so
// that the put_block operation truly finishes only when the block is already being
// provided; it is, however, pretty tricky in terms of internal communication between
// Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth
pub fn provide_block(&mut self, cid: Cid) {
let key = cid.to_bytes();
match self.kademlia.start_providing(key.into()) {
Ok(_id) => {
// Ok(self.kad_subscriptions.create_subscription(id.into(), None))
}
Err(e) => {
error!("kad: can't provide block {}: {:?}", cid, e);
// Err(anyhow!("kad: can't provide block {}", key))
pub fn provide_block(
&mut self,
cid: Cid,
) -> Result<SubscriptionFuture<(), String>, anyhow::Error> {
// currently disabled; see https://github.com/rs-ipfs/rust-ipfs/pull/281#discussion_r465583345
// for details regarding the concerns about enabling this functionality as-is
if false {
let key = cid.to_bytes();
match self.kademlia.start_providing(key.into()) {
// Kademlia queries are marked with QueryIds, which are most fitting to
// be used as kad Subscription keys - they are small and require no
// conversion for the applicable finish_subscription calls
Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)),
Err(e) => Err(anyhow!("kad: can't provide block {}: {:?}", cid, e)),
}
} else {
Err(anyhow!("providing blocks is currently unsupported"))
}
}
@ -458,7 +463,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {
&mut self.bitswap
}
pub fn bootstrap(&mut self) -> Result<SubscriptionFuture<Result<(), String>>, anyhow::Error> {
pub fn bootstrap(&mut self) -> Result<SubscriptionFuture<(), String>, anyhow::Error> {
match self.kademlia.bootstrap() {
Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)),
Err(e) => {
@ -468,7 +473,7 @@ impl<Types: IpfsTypes> Behaviour<Types> {
}
}
pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture<Result<(), String>> {
pub fn get_closest_peers(&mut self, id: PeerId) -> SubscriptionFuture<(), String> {
let id = id.to_base58();
self.kad_subscriptions

View File

@ -59,17 +59,13 @@ type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as Netwo
pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<Result<(), String>>,
connect_registry: SubscriptionRegistry<(), String>,
connections: HashMap<Multiaddr, PeerId>,
roundtrip_times: HashMap<PeerId, Duration>,
connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
}
impl SwarmApi {
pub fn new() -> Self {
Self::default()
}
pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.insert(peer_id);
}
@ -105,7 +101,7 @@ impl SwarmApi {
self.roundtrip_times.insert(peer_id.clone(), rtt);
}
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<Result<(), String>> {
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<(), String> {
trace!("Connecting to {:?}", target);
self.events.push_back(match target {
@ -211,9 +207,10 @@ impl NetworkBehaviour for SwarmApi {
self.connected_peers.remove(peer_id);
}
self.connections.remove(closed_addr);
// FIXME: should be an error
self.connect_registry
.finish_subscription(closed_addr.clone().into(), Ok(()));
self.connect_registry.finish_subscription(
closed_addr.clone().into(),
Err("Connection reset by peer".to_owned()),
);
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
@ -259,50 +256,22 @@ fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr {
mod tests {
use super::*;
use crate::p2p::transport::{build_transport, TTransport};
use futures::channel::mpsc;
use futures::future::{select, FutureExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use libp2p::identity::Keypair;
use libp2p::swarm::Swarm;
#[async_std::test]
async fn swarm_api() {
let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, SwarmApi::new(), peer1_id);
let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id);
let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, SwarmApi::new(), peer2_id);
let mut swarm2 = Swarm::new(trans, SwarmApi::default(), peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let peer1 = async move {
while swarm1.next().now_or_never().is_some() {}
for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).await.unwrap();
}
loop {
swarm1.next().await;
}
};
let peer2 = async move {
let future = swarm2.connect(rx.next().await.unwrap().into());
let poll_swarm = async move {
loop {
swarm2.next().await;
}
};
select(Box::pin(future), Box::pin(poll_swarm)).await;
};
let result = select(Box::pin(peer1), Box::pin(peer2));
result.await;
for l in Swarm::listeners(&swarm1) {
swarm2.connect(l.to_owned().into()).await.unwrap();
}
}
fn mk_transport() -> (PeerId, TTransport) {

View File

@ -1,7 +1,7 @@
//! IPFS repo
use crate::error::Error;
use crate::path::IpfsPath;
use crate::subscription::{RequestKind, SubscriptionRegistry};
use crate::subscription::{RequestKind, SubscriptionFuture, SubscriptionRegistry};
use crate::IpfsOptions;
use async_std::path::PathBuf;
use async_trait::async_trait;
@ -10,7 +10,10 @@ use cid::{self, Cid};
use core::convert::TryFrom;
use core::fmt::Debug;
use core::marker::PhantomData;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
};
use futures::sink::SinkExt;
use libp2p::core::PeerId;
use std::hash::{Hash, Hasher};
@ -121,14 +124,17 @@ pub struct Repo<TRepoTypes: RepoTypes> {
block_store: TRepoTypes::TBlockStore,
data_store: TRepoTypes::TDataStore,
events: Sender<RepoEvent>,
pub(crate) subscriptions: SubscriptionRegistry<Block>,
pub(crate) subscriptions: SubscriptionRegistry<Block, String>,
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum RepoEvent {
WantBlock(Cid),
UnwantBlock(Cid),
ProvideBlock(Cid),
ProvideBlock(
Cid,
oneshot::Sender<Result<SubscriptionFuture<(), String>, anyhow::Error>>,
),
UnprovideBlock(Cid),
}
@ -197,14 +203,24 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
let cid = block.cid.clone();
let (_cid, res) = self.block_store.put(block.clone()).await?;
self.subscriptions
.finish_subscription(cid.clone().into(), block);
// sending only fails if no one is listening anymore
// and that is okay with us.
self.events
.clone()
.send(RepoEvent::ProvideBlock(cid.clone()))
.await
.ok();
.finish_subscription(cid.clone().into(), Ok(block));
if let BlockPut::NewBlock = res {
// sending only fails if no one is listening anymore
// and that is okay with us.
let (tx, rx) = oneshot::channel();
self.events
.clone()
.send(RepoEvent::ProvideBlock(cid.clone(), tx))
.await
.ok();
if let Ok(kad_subscription) = rx.await? {
kad_subscription.await?;
}
}
Ok((cid, res))
}

View File

@ -85,15 +85,15 @@ impl fmt::Display for RequestKind {
type SubscriptionId = u64;
/// The specific collection used to hold all the `Subscription`s.
pub type Subscriptions<T> = HashMap<RequestKind, HashMap<SubscriptionId, Subscription<T>>>;
pub type Subscriptions<T, E> = HashMap<RequestKind, HashMap<SubscriptionId, Subscription<T, E>>>;
/// A collection of all the live `Subscription`s.
pub struct SubscriptionRegistry<TRes: Debug + Clone + PartialEq> {
pub(crate) subscriptions: Arc<Mutex<Subscriptions<TRes>>>,
pub struct SubscriptionRegistry<T: Debug + Clone + PartialEq, E: Debug + Clone> {
pub(crate) subscriptions: Arc<Mutex<Subscriptions<T, E>>>,
shutting_down: AtomicBool,
}
impl<TRes: Debug + Clone + PartialEq> fmt::Debug for SubscriptionRegistry<TRes> {
impl<T: Debug + Clone + PartialEq, E: Debug + Clone> fmt::Debug for SubscriptionRegistry<T, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
@ -104,13 +104,13 @@ impl<TRes: Debug + Clone + PartialEq> fmt::Debug for SubscriptionRegistry<TRes>
}
}
impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
impl<T: Debug + Clone + PartialEq, E: Debug + Clone> SubscriptionRegistry<T, E> {
/// Creates a `Subscription` and returns its associated `Future`, the `SubscriptionFuture`.
pub fn create_subscription(
&self,
kind: RequestKind,
cancel_notifier: Option<Sender<RepoEvent>>,
) -> SubscriptionFuture<TRes> {
) -> SubscriptionFuture<T, E> {
let id = GLOBAL_REQ_COUNT.fetch_add(1, Ordering::SeqCst);
debug!("Creating subscription {} to {}", id, kind);
@ -139,7 +139,7 @@ impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
/// Finalizes all pending subscriptions of the specified kind with the given `result`.
///
pub fn finish_subscription(&self, req_kind: RequestKind, result: TRes) {
pub fn finish_subscription(&self, req_kind: RequestKind, result: Result<T, E>) {
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
let related_subs = subscriptions.get_mut(&req_kind);
@ -149,11 +149,19 @@ impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
if let Some(related_subs) = related_subs {
debug!("Finishing the subscription to {}", req_kind);
let mut awoken = 0;
for sub in related_subs.values_mut() {
if let Subscription::Pending { .. } = sub {
sub.wake(result.clone());
awoken += 1;
}
}
// ensure that the subscriptions are being handled correctly: normally
// finish_subscriptions should result in some related futures being awoken
debug_assert!(awoken != 0);
trace!("Woke {} related subscription(s)", awoken);
}
}
@ -181,7 +189,7 @@ impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
}
}
impl<TRes: Debug + Clone + PartialEq> Default for SubscriptionRegistry<TRes> {
impl<T: Debug + Clone + PartialEq, E: Debug + Clone> Default for SubscriptionRegistry<T, E> {
fn default() -> Self {
Self {
subscriptions: Default::default(),
@ -190,29 +198,33 @@ impl<TRes: Debug + Clone + PartialEq> Default for SubscriptionRegistry<TRes> {
}
}
impl<TRes: Debug + Clone + PartialEq> Drop for SubscriptionRegistry<TRes> {
impl<T: Debug + Clone + PartialEq, E: Debug + Clone> Drop for SubscriptionRegistry<T, E> {
fn drop(&mut self) {
self.shutdown();
}
}
/// A value returned when a `Subscription` and it's linked `SubscriptionFuture`
/// is cancelled before completion or when the `Future` is aborted.
#[derive(Debug, PartialEq, Eq)]
pub struct Cancelled;
#[derive(Debug, PartialEq)]
pub enum SubscriptionErr<E: Debug + PartialEq> {
/// A value returned when a `Subscription` and it's linked `SubscriptionFuture`
/// is cancelled before completion or when the `Future` is aborted.
Cancelled,
/// Other errors not caused by cancellation.
Failed(E),
}
impl fmt::Display for Cancelled {
impl<E: Debug + PartialEq> fmt::Display for SubscriptionErr<E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self)
}
}
impl std::error::Error for Cancelled {}
impl<E: Debug + PartialEq> std::error::Error for SubscriptionErr<E> {}
/// Represents a request for a resource at different stages of its lifetime.
pub enum Subscription<TRes> {
pub enum Subscription<T, E> {
/// A finished `Subscription` containing the desired `TRes` value.
Ready(TRes),
Ready(Result<T, E>),
/// A standing request that hasn't been fulfilled yet.
Pending {
/// The waker of the task assigned to check if the `Subscription` is complete.
@ -224,7 +236,7 @@ pub enum Subscription<TRes> {
Cancelled,
}
impl<TRes> fmt::Debug for Subscription<TRes> {
impl<T, E> fmt::Debug for Subscription<T, E> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use Subscription::*;
match self {
@ -250,7 +262,7 @@ impl<TRes> fmt::Debug for Subscription<TRes> {
}
}
impl<TRes> Subscription<TRes> {
impl<T, E> Subscription<T, E> {
fn new(cancel_notifier: Option<Sender<RepoEvent>>) -> Self {
Self::Pending {
waker: Default::default(),
@ -258,7 +270,7 @@ impl<TRes> Subscription<TRes> {
}
}
fn wake(&mut self, result: TRes) {
fn wake(&mut self, result: Result<T, E>) {
let former_self = mem::replace(self, Subscription::Ready(result));
if let Subscription::Pending { waker, .. } = former_self {
if let Some(waker) = waker {
@ -279,8 +291,8 @@ impl<TRes> Subscription<TRes> {
// send a cancel notification to the repo - the wantlist needs
// to be updated
if is_last {
trace!("Last related subscription cancelled, sending a cancel notification");
if let Some(mut sender) = cancel_notifier {
trace!("Last related subscription cancelled, sending a cancel notification");
let _ = sender.try_send(RepoEvent::try_from(kind).unwrap());
}
}
@ -293,20 +305,20 @@ impl<TRes> Subscription<TRes> {
}
/// A `Future` that resolves to the resource whose subscription was requested.
pub struct SubscriptionFuture<TRes: Debug + PartialEq> {
pub struct SubscriptionFuture<T: Debug + PartialEq, E: Debug> {
/// The unique identifier of the subscription request and the secondary
/// key in the `SubscriptionRegistry`.
id: u64,
/// The type of the request made; the primary key in the `SubscriptionRegistry`.
kind: RequestKind,
/// A reference to the subscriptions at the `SubscriptionRegistry`.
subscriptions: Arc<Mutex<Subscriptions<TRes>>>,
subscriptions: Arc<Mutex<Subscriptions<T, E>>>,
/// True if the cleanup is already done, false if `Drop` needs to do it
cleanup_complete: bool,
}
impl<TRes: Debug + PartialEq> Future for SubscriptionFuture<TRes> {
type Output = Result<TRes, Cancelled>;
impl<T: Debug + PartialEq, E: Debug + PartialEq> Future for SubscriptionFuture<T, E> {
type Output = Result<T, SubscriptionErr<E>>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
use std::collections::hash_map::Entry::*;
@ -327,7 +339,7 @@ impl<TRes: Debug + PartialEq> Future for SubscriptionFuture<TRes> {
let (became_empty, ret) = match related_subs.entry(self.id) {
// there were no related subs, it can only mean cancellation or polling after
// Poll::Ready
Vacant(_) => return Poll::Ready(Err(Cancelled)),
Vacant(_) => return Poll::Ready(Err(SubscriptionErr::Cancelled)),
Occupied(mut oe) => {
let unwrapped = match oe.get_mut() {
Subscription::Pending { ref mut waker, .. } => {
@ -337,10 +349,10 @@ impl<TRes: Debug + PartialEq> Future for SubscriptionFuture<TRes> {
}
Subscription::Cancelled => {
oe.remove();
Err(Cancelled)
Err(SubscriptionErr::Cancelled)
}
_ => match oe.remove() {
Subscription::Ready(result) => Ok(result),
Subscription::Ready(result) => result.map_err(SubscriptionErr::Failed),
_ => unreachable!("already matched"),
},
};
@ -360,14 +372,14 @@ impl<TRes: Debug + PartialEq> Future for SubscriptionFuture<TRes> {
self.cleanup_complete = became_empty;
Poll::Ready(ret)
} else {
Poll::Ready(Err(Cancelled))
Poll::Ready(Err(SubscriptionErr::Cancelled))
}
}
}
impl<TRes: Debug + PartialEq> Drop for SubscriptionFuture<TRes> {
impl<T: Debug + PartialEq, E: Debug> Drop for SubscriptionFuture<T, E> {
fn drop(&mut self) {
trace!("Dropping subscription {} to {}", self.id, self.kind);
trace!("Dropping subscription future {} to {}", self.id, self.kind);
if self.cleanup_complete {
// cleaned up the easier variants already
@ -393,22 +405,21 @@ impl<TRes: Debug + PartialEq> Drop for SubscriptionFuture<TRes> {
});
if let Some(sub) = sub {
// don't bother updating anything that isn't `Pending`
// don't cancel anything that isn't `Pending`
if let mut sub @ Subscription::Pending { .. } = sub {
if is_last {
sub.cancel(self.id, self.kind.clone(), is_last);
}
sub.cancel(self.id, self.kind.clone(), is_last);
}
}
}
}
impl<TRes: Debug + PartialEq> fmt::Debug for SubscriptionFuture<TRes> {
impl<T: Debug + PartialEq, E: Debug> fmt::Debug for SubscriptionFuture<T, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"SubscriptionFuture<Output = Result<{}, Cancelled>>",
std::any::type_name::<TRes>()
"SubscriptionFuture<Output = Result<{}, {}>>",
std::any::type_name::<T>(),
std::any::type_name::<E>(),
)
}
}
@ -425,11 +436,11 @@ mod tests {
#[async_std::test]
async fn subscription_basics() {
let registry = SubscriptionRegistry::<u32>::default();
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
let s2 = registry.create_subscription(0.into(), None);
let s3 = registry.create_subscription(0.into(), None);
registry.finish_subscription(0.into(), 10);
registry.finish_subscription(0.into(), Ok(10));
assert_eq!(s1.await.unwrap(), 10);
assert_eq!(s2.await.unwrap(), 10);
assert_eq!(s3.await.unwrap(), 10);
@ -437,26 +448,26 @@ mod tests {
#[async_std::test]
async fn subscription_cancelled_on_dropping_registry() {
let registry = SubscriptionRegistry::<u32>::default();
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
drop(registry);
assert_eq!(s1.await, Err(Cancelled));
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
async fn subscription_cancelled_on_shutdown() {
let registry = SubscriptionRegistry::<u32>::default();
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
registry.shutdown();
assert_eq!(s1.await, Err(Cancelled));
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
async fn new_subscriptions_cancelled_after_shutdown() {
let registry = SubscriptionRegistry::<u32>::default();
let registry = SubscriptionRegistry::<u32, ()>::default();
registry.shutdown();
let s1 = registry.create_subscription(0.into(), None);
assert_eq!(s1.await, Err(Cancelled));
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
@ -464,7 +475,7 @@ mod tests {
use async_std::future::timeout;
use std::time::Duration;
let registry = SubscriptionRegistry::<u32>::default();
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = timeout(
Duration::from_millis(1),
registry.create_subscription(0.into(), None),
@ -475,7 +486,7 @@ mod tests {
s1.await.unwrap_err();
// this will cause a call to waker installed by s1, but it shouldn't be a problem.
registry.finish_subscription(0.into(), 0);
registry.finish_subscription(0.into(), Ok(0));
assert_eq!(s2.await.unwrap(), 0);
}