feat: bitswap stats with repo changes
this adds a BlockPut result in two places which is inferior to the more complete bitswap refactoring. BlockPut will let the caller know if the block was added or if it existed already. This is poorly implemented in the fs blockstore.
This commit is contained in:
parent
868f4a8722
commit
608ca9f174
@ -32,6 +32,16 @@ pub struct Bitswap<TStrategy> {
|
||||
strategy: TStrategy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Stats {
|
||||
pub sent_blocks: u64,
|
||||
pub sent_data: u64,
|
||||
pub received_blocks: u64,
|
||||
pub received_data: u64,
|
||||
pub duplicate_blocks: u64,
|
||||
pub duplicate_data: u64
|
||||
}
|
||||
|
||||
impl<TStrategy> Bitswap<TStrategy> {
|
||||
/// Creates a `Bitswap`.
|
||||
pub fn new(strategy: TStrategy) -> Self {
|
||||
@ -55,6 +65,23 @@ impl<TStrategy> Bitswap<TStrategy> {
|
||||
self.connected_peers.get(peer).map(Ledger::wantlist)
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> Stats {
|
||||
// we currently do not remove ledgers so this is ... good enough
|
||||
self.connected_peers.values().fold(Stats::default(), |mut acc, ledger| {
|
||||
acc.sent_blocks += ledger.sent_blocks;
|
||||
acc.sent_data += ledger.sent_data;
|
||||
acc.received_blocks += ledger.received_blocks;
|
||||
acc.received_data += ledger.received_data;
|
||||
acc.duplicate_blocks += ledger.duplicate_blocks;
|
||||
acc.duplicate_data += ledger.duplicate_data;
|
||||
acc
|
||||
})
|
||||
}
|
||||
|
||||
pub fn peers(&self) -> Vec<PeerId> {
|
||||
self.connected_peers.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Connect to peer.
|
||||
///
|
||||
/// Called from Kademlia behaviour.
|
||||
@ -182,7 +209,6 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
};
|
||||
debug!(" received message");
|
||||
|
||||
// Update the ledger.
|
||||
let ledger = self
|
||||
.connected_peers
|
||||
.get_mut(&source)
|
||||
@ -219,6 +245,8 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event })
|
||||
}
|
||||
Some(ref mut ledger) => {
|
||||
// FIXME: this is a bit early to update stats as the block hasn't been sent
|
||||
// to anywhere at this point.
|
||||
ledger.update_outgoing_stats(&event);
|
||||
debug!(" send_message to {}", peer_id.to_base58());
|
||||
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
|
||||
@ -231,9 +259,22 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(StrategyEvent::Send { peer_id, block }) = self.strategy.poll() {
|
||||
self.send_block(peer_id, block);
|
||||
ctx.waker().wake_by_ref();
|
||||
let inner = match self.strategy.poll(ctx) {
|
||||
Poll::Ready(Some(inner)) => inner,
|
||||
Poll::Ready(None) => return Poll::Pending,
|
||||
Poll::Pending => {
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
|
||||
match inner {
|
||||
StrategyEvent::Send { peer_id, block } => self.send_block(peer_id, block),
|
||||
StrategyEvent::NewBlockStored { source, bytes } => if let Some(ledger) = self.connected_peers.get_mut(&source) {
|
||||
ledger.update_incoming_stored(bytes)
|
||||
},
|
||||
StrategyEvent::DuplicateBlockReceived { source, bytes } => if let Some(ledger) = self.connected_peers.get_mut(&source) {
|
||||
ledger.update_incoming_duplicate(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
|
@ -14,9 +14,16 @@ pub type Priority = i32;
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Ledger {
|
||||
/// The number of blocks sent to the peer.
|
||||
sent_blocks: usize,
|
||||
pub(crate) sent_blocks: u64,
|
||||
pub(crate) sent_data: u64,
|
||||
|
||||
/// The number of blocks received from the peer.
|
||||
received_blocks: usize,
|
||||
pub(crate) received_blocks: u64,
|
||||
pub(crate) received_data: u64,
|
||||
|
||||
pub(crate) duplicate_blocks: u64,
|
||||
pub(crate) duplicate_data: u64,
|
||||
|
||||
/// The list of wanted blocks sent to the peer.
|
||||
sent_want_list: HashMap<Cid, Priority>,
|
||||
/// The list of wanted blocks received from the peer.
|
||||
@ -52,7 +59,7 @@ impl Ledger {
|
||||
}
|
||||
|
||||
pub fn update_outgoing_stats(&mut self, message: &Message<O>) {
|
||||
self.sent_blocks += message.blocks.len();
|
||||
self.sent_blocks += message.blocks.len() as u64;
|
||||
for cid in message.cancel() {
|
||||
self.sent_want_list.remove(cid);
|
||||
}
|
||||
@ -62,7 +69,6 @@ impl Ledger {
|
||||
}
|
||||
|
||||
pub fn update_incoming_stats(&mut self, message: &Message<I>) {
|
||||
self.received_blocks += message.blocks.len();
|
||||
for cid in message.cancel() {
|
||||
self.received_want_list.remove(cid);
|
||||
}
|
||||
@ -71,6 +77,16 @@ impl Ledger {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_incoming_stored(&mut self, bytes: u64) {
|
||||
self.received_blocks += 1;
|
||||
self.received_data += bytes;
|
||||
}
|
||||
|
||||
pub(crate) fn update_incoming_duplicate(&mut self, bytes: u64) {
|
||||
self.duplicate_blocks += 1;
|
||||
self.duplicate_data += bytes;
|
||||
}
|
||||
|
||||
/// Returns the blocks wanted by the peer in unspecified order
|
||||
pub fn wantlist(&self) -> Vec<(Cid, Priority)> {
|
||||
self.received_want_list.iter().map(|(cid, prio)| (cid.clone(), *prio)).collect()
|
||||
|
@ -10,11 +10,11 @@ mod prefix;
|
||||
mod protocol;
|
||||
mod strategy;
|
||||
|
||||
pub use self::behaviour::Bitswap;
|
||||
pub use self::behaviour::{Bitswap, Stats};
|
||||
pub use self::block::Block;
|
||||
pub use self::error::BitswapError;
|
||||
pub use self::ledger::Priority;
|
||||
pub use self::strategy::{AltruisticStrategy, BitswapStore, Strategy};
|
||||
pub use self::strategy::{AltruisticStrategy, BitswapStore, Strategy, BlockPut};
|
||||
|
||||
mod bitswap_pb {
|
||||
include!(concat!(env!("OUT_DIR"), "/bitswap_pb.rs"));
|
||||
|
@ -4,38 +4,50 @@ use async_std::task;
|
||||
use async_trait::async_trait;
|
||||
use libipld::cid::Cid;
|
||||
use libp2p_core::PeerId;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use futures::channel::mpsc::{unbounded, UnboundedSender as Sender, UnboundedReceiver as Receiver};
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum BlockPut {
|
||||
Stored,
|
||||
Duplicate,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait BitswapStore: Send + Sync {
|
||||
async fn get_block(&self, cid: &Cid) -> Result<Option<Block>, anyhow::Error>;
|
||||
|
||||
async fn put_block(&self, block: Block) -> Result<(), anyhow::Error>;
|
||||
async fn put_block(&self, block: Block) -> Result<BlockPut, anyhow::Error>;
|
||||
}
|
||||
|
||||
pub trait Strategy: Send + Unpin + 'static {
|
||||
fn new(store: Arc<dyn BitswapStore>) -> Self;
|
||||
fn process_want(&self, source: PeerId, cid: Cid, priority: Priority);
|
||||
fn process_block(&self, source: PeerId, block: Block);
|
||||
fn poll(&self) -> Option<StrategyEvent>;
|
||||
fn poll(&self, ctx: &mut Context<'_>) -> Poll<Option<StrategyEvent>>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum StrategyEvent {
|
||||
Send { peer_id: PeerId, block: Block },
|
||||
DuplicateBlockReceived { source: PeerId, bytes: u64 },
|
||||
NewBlockStored { source: PeerId, bytes: u64 },
|
||||
}
|
||||
|
||||
pub struct AltruisticStrategy {
|
||||
store: Arc<dyn BitswapStore>,
|
||||
events: (Sender<StrategyEvent>, Receiver<StrategyEvent>),
|
||||
event_sender: Sender<StrategyEvent>,
|
||||
events: Mutex<Receiver<StrategyEvent>>,
|
||||
}
|
||||
|
||||
impl Strategy for AltruisticStrategy {
|
||||
fn new(store: Arc<dyn BitswapStore>) -> Self {
|
||||
let (tx, rx) = unbounded();
|
||||
AltruisticStrategy {
|
||||
store,
|
||||
events: channel::<StrategyEvent>(),
|
||||
event_sender: tx,
|
||||
events: Mutex::new(rx),
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +58,7 @@ impl Strategy for AltruisticStrategy {
|
||||
cid.to_string(),
|
||||
priority
|
||||
);
|
||||
let events = self.events.0.clone();
|
||||
let sender = self.event_sender.clone();
|
||||
let store = self.store.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
@ -69,14 +81,7 @@ impl Strategy for AltruisticStrategy {
|
||||
block,
|
||||
};
|
||||
|
||||
if let Err(e) = events.send(req) {
|
||||
warn!(
|
||||
"Peer {} wanted block {} we failed start sending it: {}",
|
||||
source.to_base58(),
|
||||
cid,
|
||||
e
|
||||
);
|
||||
}
|
||||
let _ = sender.unbounded_send(req);
|
||||
});
|
||||
}
|
||||
|
||||
@ -85,22 +90,36 @@ impl Strategy for AltruisticStrategy {
|
||||
info!("Received block {} from peer {}", cid, source.to_base58());
|
||||
|
||||
let store = self.store.clone();
|
||||
let sender = self.event_sender.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
let bytes = block.data().len() as u64;
|
||||
let res = store.put_block(block).await;
|
||||
if let Err(e) = res {
|
||||
debug!(
|
||||
"Got block {} from peer {} but failed to store it: {}",
|
||||
cid,
|
||||
source.to_base58(),
|
||||
e
|
||||
);
|
||||
}
|
||||
let evt = match res {
|
||||
Ok(BlockPut::Stored) => StrategyEvent::NewBlockStored { source, bytes },
|
||||
Ok(BlockPut::Duplicate) => StrategyEvent::DuplicateBlockReceived { source, bytes },
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"Got block {} from peer {} but failed to store it: {}",
|
||||
cid,
|
||||
source.to_base58(),
|
||||
e
|
||||
);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
let _ = sender.unbounded_send(evt);
|
||||
});
|
||||
}
|
||||
|
||||
fn poll(&self) -> Option<StrategyEvent> {
|
||||
self.events.1.try_recv().ok()
|
||||
/// Can return Poll::Ready(None) multiple times, Poll::Pending
|
||||
fn poll(&self, ctx: &mut Context) -> Poll<Option<StrategyEvent>> {
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
let mut g = self.events.try_lock().expect("Failed to acquire the uncontended mutex right away");
|
||||
|
||||
g.poll_next_unpin(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ impl<Types: RepoTypes> IpldDag<Types> {
|
||||
};
|
||||
let cid = Cid::new(version, codec, hash)?;
|
||||
let block = Block::new(bytes, cid);
|
||||
let cid = self.repo.put_block(block).await?;
|
||||
let (cid, _) = self.repo.put_block(block).await?;
|
||||
Ok(cid)
|
||||
}
|
||||
|
||||
|
22
src/lib.rs
22
src/lib.rs
@ -307,7 +307,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
|
||||
impl<Types: IpfsTypes> Ipfs<Types> {
|
||||
/// Puts a block into the ipfs repo.
|
||||
pub async fn put_block(&mut self, block: Block) -> Result<Cid, Error> {
|
||||
Ok(self.repo.put_block(block).await?)
|
||||
Ok(self.repo.put_block(block).await?.0)
|
||||
}
|
||||
|
||||
/// Retrives a block from the ipfs repo.
|
||||
@ -643,7 +643,10 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
|
||||
let _ = ret.send(list);
|
||||
}
|
||||
IpfsEvent::BitswapStats(ret) => {
|
||||
todo!()
|
||||
let stats = self.swarm.bitswap().stats();
|
||||
let peers = self.swarm.bitswap().peers();
|
||||
let wantlist = self.swarm.bitswap().local_wantlist();
|
||||
let _ = ret.send((stats, peers, wantlist).into());
|
||||
}
|
||||
IpfsEvent::Exit => {
|
||||
// FIXME: we could do a proper teardown
|
||||
@ -679,6 +682,21 @@ pub struct BitswapStats {
|
||||
pub wantlist: Vec<(Cid, bitswap::Priority)>,
|
||||
}
|
||||
|
||||
impl From<(bitswap::Stats, Vec<PeerId>, Vec<(Cid, bitswap::Priority)>)> for BitswapStats {
|
||||
fn from((stats, peers, wantlist): (bitswap::Stats, Vec<PeerId>, Vec<(Cid, bitswap::Priority)>)) -> Self {
|
||||
BitswapStats {
|
||||
blocks_sent: stats.sent_blocks,
|
||||
data_sent: stats.sent_data,
|
||||
blocks_received: stats.received_blocks,
|
||||
data_received: stats.received_data,
|
||||
dup_blks_received: stats.duplicate_blocks,
|
||||
dup_data_received: stats.duplicate_data,
|
||||
peers,
|
||||
wantlist,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use node::Node;
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! Persistent fs backed repo
|
||||
use crate::error::Error;
|
||||
use crate::repo::BlockStore;
|
||||
use crate::repo::{BlockStore, BlockPut};
|
||||
#[cfg(feature = "rocksdb")]
|
||||
use crate::repo::{Column, DataStore};
|
||||
use async_std::fs;
|
||||
@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex};
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FsBlockStore {
|
||||
path: PathBuf,
|
||||
// FIXME: this lock is not from futures
|
||||
cids: Arc<Mutex<HashSet<Cid>>>,
|
||||
}
|
||||
|
||||
@ -86,14 +87,23 @@ impl BlockStore for FsBlockStore {
|
||||
Ok(Some(block))
|
||||
}
|
||||
|
||||
async fn put(&self, block: Block) -> Result<Cid, Error> {
|
||||
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
|
||||
let path = block_path(self.path.clone(), &block.cid());
|
||||
let cids = self.cids.clone();
|
||||
let mut file = fs::File::create(path).await?;
|
||||
let data = block.data();
|
||||
let mut file = fs::File::create(path).await?;
|
||||
file.write_all(&*data).await?;
|
||||
cids.lock().unwrap().insert(block.cid().to_owned());
|
||||
Ok(block.cid().to_owned())
|
||||
file.flush().await?;
|
||||
let retval = if cids.lock().unwrap().insert(block.cid().to_owned()) {
|
||||
BlockPut::NewBlock
|
||||
} else {
|
||||
BlockPut::Existed
|
||||
};
|
||||
// FIXME: checking if the file existed already while creating complicates this function a
|
||||
// lot; might be better to just guard with mutex to enforce single task file access.. the
|
||||
// current implementation will write over the same file multiple times, each time believing
|
||||
// it was the first.
|
||||
Ok((block.cid().to_owned(), retval))
|
||||
}
|
||||
|
||||
async fn remove(&self, cid: &Cid) -> Result<(), Error> {
|
||||
@ -111,6 +121,7 @@ impl BlockStore for FsBlockStore {
|
||||
#[cfg(feature = "rocksdb")]
|
||||
pub struct RocksDataStore {
|
||||
path: PathBuf,
|
||||
// FIXME: this lock is not from futures
|
||||
db: Arc<Mutex<Option<rocksdb::DB>>>,
|
||||
}
|
||||
|
||||
@ -238,7 +249,7 @@ mod tests {
|
||||
assert_eq!(remove.await.unwrap(), ());
|
||||
|
||||
let put = store.put(block.clone());
|
||||
assert_eq!(put.await.unwrap(), cid.to_owned());
|
||||
assert_eq!(put.await.unwrap().0, cid.to_owned());
|
||||
let contains = store.contains(&cid);
|
||||
assert_eq!(contains.await.unwrap(), true);
|
||||
let get = store.get(&cid);
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! Volatile memory backed repo
|
||||
use crate::error::Error;
|
||||
use crate::repo::{BlockStore, Column, DataStore};
|
||||
use crate::repo::{BlockStore, BlockPut, Column, DataStore};
|
||||
use async_std::path::PathBuf;
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use async_trait::async_trait;
|
||||
@ -44,10 +44,17 @@ impl BlockStore for MemBlockStore {
|
||||
Ok(block)
|
||||
}
|
||||
|
||||
async fn put(&self, block: Block) -> Result<Cid, Error> {
|
||||
let cid = block.cid().to_owned();
|
||||
self.blocks.lock().await.insert(cid.clone(), block);
|
||||
Ok(cid)
|
||||
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
|
||||
use std::collections::hash_map::Entry;
|
||||
let mut g = self.blocks.lock().await;
|
||||
match g.entry(block.cid.clone()) {
|
||||
Entry::Occupied(_) => Ok((block.cid, BlockPut::Existed)),
|
||||
Entry::Vacant(ve) => {
|
||||
let cid = ve.key().clone();
|
||||
ve.insert(block);
|
||||
Ok((cid, BlockPut::NewBlock))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove(&self, cid: &Cid) -> Result<(), Error> {
|
||||
@ -143,7 +150,7 @@ mod tests {
|
||||
assert_eq!(remove.await.unwrap(), ());
|
||||
|
||||
let put = store.put(block.clone());
|
||||
assert_eq!(put.await.unwrap(), cid.to_owned());
|
||||
assert_eq!(put.await.unwrap().0, cid.to_owned());
|
||||
let contains = store.contains(&cid);
|
||||
assert_eq!(contains.await.unwrap(), true);
|
||||
let get = store.get(&cid);
|
||||
|
@ -44,6 +44,16 @@ pub fn create_repo<TRepoTypes: RepoTypes>(
|
||||
(Arc::new(repo), ch)
|
||||
}
|
||||
|
||||
/// Describes the outcome of `BlockStore::put_block`
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum BlockPut {
|
||||
/// A new block was written
|
||||
NewBlock,
|
||||
/// The block existed already
|
||||
Existed
|
||||
}
|
||||
|
||||
/// This API is being discussed and evolved, which will likely lead to breakage.
|
||||
#[async_trait]
|
||||
pub trait BlockStore: Debug + Clone + Send + Sync + Unpin + 'static {
|
||||
fn new(path: PathBuf) -> Self;
|
||||
@ -51,7 +61,7 @@ pub trait BlockStore: Debug + Clone + Send + Sync + Unpin + 'static {
|
||||
async fn open(&self) -> Result<(), Error>;
|
||||
async fn contains(&self, cid: &Cid) -> Result<bool, Error>;
|
||||
async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error>;
|
||||
async fn put(&self, block: Block) -> Result<Cid, Error>;
|
||||
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
|
||||
async fn remove(&self, cid: &Cid) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
@ -136,8 +146,8 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
|
||||
}
|
||||
|
||||
/// Puts a block into the block store.
|
||||
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
|
||||
let cid = self.block_store.put(block.clone()).await?;
|
||||
pub async fn put_block(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
|
||||
let (cid, res) = self.block_store.put(block.clone()).await?;
|
||||
self.subscriptions
|
||||
.lock()
|
||||
.await
|
||||
@ -149,7 +159,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
|
||||
.send(RepoEvent::ProvideBlock(cid.clone()))
|
||||
.await
|
||||
.ok();
|
||||
Ok(cid)
|
||||
Ok((cid, res))
|
||||
}
|
||||
|
||||
/// Retrives a block from the block store.
|
||||
@ -267,9 +277,13 @@ impl<T: RepoTypes> bitswap::BitswapStore for Repo<T> {
|
||||
self.block_store.get(cid).await
|
||||
}
|
||||
|
||||
async fn put_block(&self, block: Block) -> Result<(), anyhow::Error> {
|
||||
self.put_block(block).await?;
|
||||
Ok(())
|
||||
async fn put_block(&self, block: Block) -> Result<bitswap::BlockPut, anyhow::Error> {
|
||||
let (_, res) = self.put_block(block).await?;
|
||||
let res = match res {
|
||||
BlockPut::NewBlock => bitswap::BlockPut::Stored,
|
||||
BlockPut::Existed => bitswap::BlockPut::Duplicate,
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user