fix: wait for Kademlia in put_block

Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
ljedrz 2020-08-04 14:06:28 +02:00
parent ed20a189e7
commit 58c904072f
3 changed files with 25 additions and 19 deletions

View File

@ -1002,11 +1002,11 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
match evt {
RepoEvent::WantBlock(cid) => self.swarm.want_block(cid),
RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid),
RepoEvent::ProvideBlock(cid) => {
RepoEvent::ProvideBlock(cid, ret) => {
// TODO: consider if cancel is applicable in cases where we provide the
// associated Block ourselves
self.swarm.bitswap().cancel_block(&cid);
self.swarm.provide_block(cid)
let _ = ret.send(self.swarm.provide_block(cid));
}
RepoEvent::UnprovideBlock(cid) => self.swarm.stop_providing_block(&cid),
}

View File

@ -427,20 +427,14 @@ impl<Types: IpfsTypes> Behaviour<Types> {
self.bitswap.want_block(cid, 1);
}
// FIXME: it would probably be best if this could return a SubscriptionFuture, so
// that the put_block operation truly finishes only when the block is already being
// provided; it is, however, pretty tricky in terms of internal communication between
// Ipfs and IpfsFuture objects - it would currently require some extra back-and-forth
pub fn provide_block(&mut self, cid: Cid) {
pub fn provide_block(
&mut self,
cid: Cid,
) -> Result<SubscriptionFuture<Result<(), String>>, anyhow::Error> {
let key = cid.to_bytes();
match self.kademlia.start_providing(key.into()) {
Ok(_id) => {
// Ok(self.kad_subscriptions.create_subscription(id.into(), None))
}
Err(e) => {
error!("kad: can't provide block {}: {:?}", cid, e);
// Err(anyhow!("kad: can't provide block {}", key))
}
Ok(id) => Ok(self.kad_subscriptions.create_subscription(id.into(), None)),
Err(e) => Err(anyhow!("kad: can't provide block {}: {:?}", cid, e)),
}
}

View File

@ -1,8 +1,9 @@
//! IPFS repo
use crate::error::Error;
use crate::path::IpfsPath;
use crate::subscription::{RequestKind, SubscriptionRegistry};
use crate::subscription::{RequestKind, SubscriptionFuture, SubscriptionRegistry};
use crate::IpfsOptions;
use anyhow::anyhow;
use async_std::path::PathBuf;
use async_trait::async_trait;
use bitswap::Block;
@ -10,7 +11,10 @@ use cid::{self, Cid};
use core::convert::TryFrom;
use core::fmt::Debug;
use core::marker::PhantomData;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
};
use futures::sink::SinkExt;
use libp2p::core::PeerId;
use std::hash::{Hash, Hasher};
@ -124,11 +128,14 @@ pub struct Repo<TRepoTypes: RepoTypes> {
pub(crate) subscriptions: SubscriptionRegistry<Block>,
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum RepoEvent {
WantBlock(Cid),
UnwantBlock(Cid),
ProvideBlock(Cid),
ProvideBlock(
Cid,
oneshot::Sender<Result<SubscriptionFuture<Result<(), String>>, anyhow::Error>>,
),
UnprovideBlock(Cid),
}
@ -200,11 +207,16 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
.finish_subscription(cid.clone().into(), block);
// sending only fails if no one is listening anymore
// and that is okay with us.
let (tx, rx) = oneshot::channel();
self.events
.clone()
.send(RepoEvent::ProvideBlock(cid.clone()))
.send(RepoEvent::ProvideBlock(cid.clone(), tx))
.await
.ok();
rx.await??.await?.map_err(|e| anyhow!(e))?;
Ok((cid, res))
}