refactor: keep subscription accounting within the registry

Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
ljedrz 2020-07-14 16:04:28 +02:00
parent 25ee479ff3
commit 19a77616d7
3 changed files with 56 additions and 57 deletions

View File

@ -30,8 +30,6 @@ pub enum BitswapEvent {
ReceivedCancel(PeerId, Cid),
}
pub type SubscriberCount = usize;
/// Network behaviour that handles sending and receiving IPFS blocks.
#[derive(Default)]
pub struct Bitswap {
@ -42,7 +40,7 @@ pub struct Bitswap {
/// Ledger
pub connected_peers: HashMap<PeerId, Ledger>,
/// Wanted blocks
wanted_blocks: HashMap<Cid, (Priority, SubscriberCount)>,
wanted_blocks: HashMap<Cid, Priority>,
/// Blocks queued to be sent
pub queued_blocks: Arc<Mutex<Vec<(PeerId, Block)>>>,
}
@ -52,7 +50,7 @@ impl Bitswap {
pub fn local_wantlist(&self) -> Vec<(Cid, Priority)> {
self.wanted_blocks
.iter()
.map(|(cid, (prio, _))| (cid.clone(), *prio))
.map(|(cid, prio)| (cid.clone(), *prio))
.collect()
}
@ -107,7 +105,7 @@ impl Bitswap {
debug!("bitswap: send_want_list");
if !self.wanted_blocks.is_empty() {
let mut message = Message::default();
for (cid, (priority, _)) in &self.wanted_blocks {
for (cid, priority) in &self.wanted_blocks {
message.want_block(cid, *priority);
}
debug!(" queuing wanted blocks");
@ -129,13 +127,7 @@ impl Bitswap {
ledger.want_block(&cid, priority);
debug!(" queuing want for {}", peer_id.to_base58());
}
self.wanted_blocks
.entry(cid)
.and_modify(|(prio, subs)| {
*prio = priority;
*subs += 1;
})
.or_insert((priority, 1));
self.wanted_blocks.insert(cid, priority);
debug!("");
}
@ -151,16 +143,6 @@ impl Bitswap {
self.wanted_blocks.remove(cid);
debug!("");
}
pub fn cancel_subscription(&mut self, cid: &Cid) -> bool {
if let Some((_prio, ref mut subs)) = self.wanted_blocks.get_mut(cid) {
*subs -= 1;
if *subs == 0 {
return true;
}
}
false
}
}
impl NetworkBehaviour for Bitswap {

View File

@ -881,11 +881,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::UnwantBlock(cid) => {
if self.swarm.bitswap().cancel_subscription(&cid) {
self.swarm.bitswap().cancel_block(&cid)
}
}
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
RepoEvent::ProvideBlock(cid) => {
// TODO: consider if cancel is applicable in cases where we provide the
// associated Block ourselves

View File

@ -1,6 +1,6 @@
use crate::RepoEvent;
use async_std::future::Future;
use async_std::task::{Context, Poll, Waker};
use async_std::task::{self, Context, Poll, Waker};
use core::fmt::Debug;
use core::hash::Hash;
use core::pin::Pin;
@ -53,12 +53,12 @@ impl From<Cid> for Request {
type SubscriptionId = u64;
type Subscriptions<T> = HashMap<SubscriptionId, Subscription<T>>;
pub struct SubscriptionRegistry<TRes: Debug + Clone> {
pub struct SubscriptionRegistry<TRes: Debug + Clone + PartialEq> {
subscriptions: Arc<Mutex<Subscriptions<TRes>>>,
shutting_down: AtomicBool,
}
impl<TRes: Debug + Clone> fmt::Debug for SubscriptionRegistry<TRes> {
impl<TRes: Debug + Clone + PartialEq> fmt::Debug for SubscriptionRegistry<TRes> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
@ -69,7 +69,7 @@ impl<TRes: Debug + Clone> fmt::Debug for SubscriptionRegistry<TRes> {
}
}
impl<TRes: Debug + Clone> SubscriptionRegistry<TRes> {
impl<TRes: Debug + Clone + PartialEq> SubscriptionRegistry<TRes> {
pub fn create_subscription(
&self,
req: Request,
@ -79,12 +79,10 @@ impl<TRes: Debug + Clone> SubscriptionRegistry<TRes> {
let mut subscription = Subscription::new(req, cancel_notifier);
if self.shutting_down.load(Ordering::SeqCst) {
subscription.cancel();
subscription.cancel(true);
}
async_std::task::block_on(async {
self.subscriptions.lock().await.insert(id, subscription)
});
task::block_on(async { self.subscriptions.lock().await.insert(id, subscription) });
SubscriptionFuture {
id,
@ -93,8 +91,7 @@ impl<TRes: Debug + Clone> SubscriptionRegistry<TRes> {
}
pub fn finish_subscription(&self, req: &Request, res: TRes) {
let mut subscriptions =
async_std::task::block_on(async { self.subscriptions.lock().await });
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
for sub in subscriptions.values_mut() {
if let Subscription::Pending { request, .. } = sub {
@ -114,19 +111,18 @@ impl<TRes: Debug + Clone> SubscriptionRegistry<TRes> {
log::debug!("Shutting down {:?}", self);
let mut cancelled = 0;
let mut subscriptions =
async_std::task::block_on(async { self.subscriptions.lock().await });
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
for (_idx, mut sub) in subscriptions.drain() {
sub.cancel();
sub.cancel(true);
cancelled += 1;
}
log::trace!("Cancelled {} subscriptions", cancelled,);
log::trace!("Cancelled {} subscriptions", cancelled);
}
}
impl<TRes: Debug + Clone> Default for SubscriptionRegistry<TRes> {
impl<TRes: Debug + Clone + PartialEq> Default for SubscriptionRegistry<TRes> {
fn default() -> Self {
Self {
subscriptions: Default::default(),
@ -135,7 +131,7 @@ impl<TRes: Debug + Clone> Default for SubscriptionRegistry<TRes> {
}
}
impl<TRes: Debug + Clone> Drop for SubscriptionRegistry<TRes> {
impl<TRes: Debug + Clone + PartialEq> Drop for SubscriptionRegistry<TRes> {
fn drop(&mut self) {
self.shutdown();
}
@ -164,6 +160,21 @@ pub enum Subscription<TRes> {
Cancelled,
}
impl<TRes: PartialEq> PartialEq for Subscription<TRes> {
fn eq(&self, other: &Self) -> bool {
match self {
Self::Pending { request: req1, .. } => {
if let Self::Pending { request: req2, .. } = other {
req1 == req2
} else {
false
}
}
done => done == other,
}
}
}
impl<TRes: Clone> Subscription<TRes> {
fn new(request: Request, cancel_notifier: Option<Sender<RepoEvent>>) -> Self {
Self::Pending {
@ -182,7 +193,7 @@ impl<TRes: Clone> Subscription<TRes> {
}
}
fn cancel(&mut self) {
fn cancel(&mut self, is_last: bool) {
let former_self = mem::replace(self, Subscription::Cancelled);
if let Subscription::Pending {
request,
@ -190,27 +201,29 @@ impl<TRes: Clone> Subscription<TRes> {
cancel_notifier,
} = former_self
{
if is_last {
if let Some(mut sender) = cancel_notifier {
let _ = sender.try_send(RepoEvent::from(request));
}
}
if let Some(waker) = waker {
waker.wake();
}
if let Some(mut sender) = cancel_notifier {
let _ = sender.try_send(RepoEvent::from(request));
}
}
}
}
pub struct SubscriptionFuture<TRes: Clone + Debug> {
pub struct SubscriptionFuture<TRes: Clone + Debug + PartialEq> {
id: u64,
subscriptions: Arc<Mutex<Subscriptions<TRes>>>,
}
impl<TRes: Clone + Debug> Future for SubscriptionFuture<TRes> {
impl<TRes: Clone + Debug + PartialEq> Future for SubscriptionFuture<TRes> {
type Output = Result<TRes, Cancelled>;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscriptions =
async_std::task::block_on(async { self.subscriptions.lock().await });
let mut subscriptions = task::block_on(async { self.subscriptions.lock().await });
let subscription = if let Some(sub) = subscriptions.get_mut(&self.id) {
sub
} else {
@ -228,17 +241,25 @@ impl<TRes: Clone + Debug> Future for SubscriptionFuture<TRes> {
}
}
impl<TRes: Clone + Debug> Drop for SubscriptionFuture<TRes> {
impl<TRes: Clone + Debug + PartialEq> Drop for SubscriptionFuture<TRes> {
fn drop(&mut self) {
if let Some(mut sub) =
async_std::task::block_on(async { self.subscriptions.lock().await.remove(&self.id) })
{
sub.cancel();
let (sub, is_last) = task::block_on(async {
let mut subscriptions = self.subscriptions.lock().await;
let sub = subscriptions.remove(&self.id);
let is_last = !subscriptions.values().any(|s| Some(s) == sub.as_ref());
(sub, is_last)
});
if let Some(sub) = sub {
if let mut sub @ Subscription::Pending { .. } = sub {
sub.cancel(is_last);
}
}
}
}
impl<TRes: Clone + Debug> fmt::Debug for SubscriptionFuture<TRes> {
impl<TRes: Clone + Debug + PartialEq> fmt::Debug for SubscriptionFuture<TRes> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,