refactor: enum-ify the Subscription object

Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
ljedrz 2020-07-07 18:26:42 +02:00
parent 29e8a855bc
commit 9e03b9aac6

View File

@ -5,6 +5,7 @@ use core::hash::Hash;
use core::pin::Pin;
use std::collections::HashMap;
use std::fmt;
use std::mem;
use std::sync::{Arc, Mutex};
pub struct SubscriptionRegistry<TReq: Debug + Eq + Hash, TRes: Debug> {
@ -112,60 +113,37 @@ impl fmt::Display for Cancelled {
impl std::error::Error for Cancelled {}
pub struct Subscription<TResult> {
result: Option<TResult>,
wakers: Vec<Waker>,
cancelled: bool,
}
impl<TResult> fmt::Debug for Subscription<TResult> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"Subscription<{}>(result: {}, wakers: {}, cancelled: {})",
std::any::type_name::<TResult>(),
if self.result.is_some() {
"Some(_)"
} else {
"None"
},
self.wakers.len(),
self.cancelled
)
}
#[derive(Debug)]
pub enum Subscription<TResult> {
Ready(TResult),
Pending { wakers: Vec<Waker> },
Cancelled,
}
impl<TResult> Subscription<TResult> {
pub fn add_waker(&mut self, waker: &Waker) {
if !self.wakers.iter().any(|w| w.will_wake(waker)) {
self.wakers.push(waker.clone());
}
}
pub fn wake(&mut self, result: TResult) {
self.result = Some(result);
for waker in self.wakers.drain(..) {
waker.wake();
let mut former_self = mem::replace(self, Subscription::Ready(result));
if let Subscription::Pending { ref mut wakers } = former_self {
for waker in wakers.drain(..) {
waker.wake();
}
}
}
pub fn cancel(&mut self) {
if self.cancelled {
return;
}
self.cancelled = true;
for waker in self.wakers.drain(..) {
waker.wake();
let mut former_self = mem::replace(self, Subscription::Cancelled);
if let Subscription::Pending { ref mut wakers } = former_self {
for waker in wakers.drain(..) {
waker.wake();
}
}
}
}
impl<TResult> Default for Subscription<TResult> {
fn default() -> Self {
Self {
result: Default::default(),
Self::Pending {
wakers: Default::default(),
cancelled: false,
}
}
}
@ -179,15 +157,17 @@ impl<TResult: Clone> Future for SubscriptionFuture<TResult> {
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscription = self.subscription.lock().unwrap();
if subscription.cancelled {
return Poll::Ready(Err(Cancelled));
}
if let Some(ref result) = subscription.result {
Poll::Ready(Ok(result.clone()))
} else {
subscription.add_waker(&context.waker());
Poll::Pending
match &mut *subscription {
Subscription::Cancelled => Poll::Ready(Err(Cancelled)),
Subscription::Pending { ref mut wakers } => {
let waker = context.waker();
if !wakers.iter().any(|w| w.will_wake(waker)) {
wakers.push(waker.clone());
}
Poll::Pending
}
Subscription::Ready(result) => Poll::Ready(Ok(result.clone())),
}
}
}