263: move bitswap Stats directly under Bitswap r=koivunej a=ljedrz

This makes the bitswap `Stats` persistent between peer disconnects.

In addition, remove the unused and no longer compatible `Ledger` tests.

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-07-30 14:04:16 +00:00 committed by GitHub
commit 02beb5f46b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 185 deletions

View File

@ -6,7 +6,7 @@
//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
//! will allow providing and reciving IPFS blocks.
use crate::block::Block;
use crate::ledger::{Ledger, Message, Priority, Stats};
use crate::ledger::{Ledger, Message, Priority};
use crate::protocol::BitswapConfig;
use cid::Cid;
use fnv::FnvHashSet;
@ -20,6 +20,10 @@ use std::task::{Context, Poll};
use std::{
collections::{HashMap, VecDeque},
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
#[derive(Clone, Debug, Eq, PartialEq)]
@ -29,6 +33,55 @@ pub enum BitswapEvent {
ReceivedCancel(PeerId, Cid),
}
#[derive(Debug, Default)]
pub struct Stats {
pub sent_blocks: AtomicU64,
pub sent_data: AtomicU64,
pub received_blocks: AtomicU64,
pub received_data: AtomicU64,
pub duplicate_blocks: AtomicU64,
pub duplicate_data: AtomicU64,
}
impl Stats {
pub fn update_outgoing(&self, num_blocks: u64) {
self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
}
pub fn update_incoming_unique(&self, bytes: u64) {
self.received_blocks.fetch_add(1, Ordering::Relaxed);
self.received_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn update_incoming_duplicate(&self, bytes: u64) {
self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_assign(&self, other: &Stats) {
self.sent_blocks
.fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
self.sent_data
.fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
self.received_blocks.fetch_add(
other.received_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.received_data.fetch_add(
other.received_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_blocks.fetch_add(
other.duplicate_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_data.fetch_add(
other.duplicate_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
}
}
/// Network behaviour that handles sending and receiving IPFS blocks.
pub struct Bitswap {
/// Queue of events to report to the user.
@ -42,6 +95,8 @@ pub struct Bitswap {
/// Blocks queued to be sent
pub queued_blocks: UnboundedSender<(PeerId, Block)>,
ready_blocks: UnboundedReceiver<(PeerId, Block)>,
/// Statistics related to peers.
pub stats: HashMap<PeerId, Arc<Stats>>,
}
impl Default for Bitswap {
@ -55,6 +110,7 @@ impl Default for Bitswap {
wanted_blocks: Default::default(),
queued_blocks: tx,
ready_blocks: rx,
stats: Default::default(),
}
}
}
@ -74,11 +130,10 @@ impl Bitswap {
}
pub fn stats(&self) -> Stats {
// we currently do not remove ledgers so this is ... good enough
self.connected_peers
self.stats
.values()
.fold(Stats::default(), |acc, peer_ledger| {
acc.add_assign(&peer_ledger.stats);
.fold(Stats::default(), |acc, peer_stats| {
acc.add_assign(&peer_stats);
acc
})
}
@ -169,6 +224,7 @@ impl NetworkBehaviour for Bitswap {
fn inject_connected(&mut self, peer_id: &PeerId) {
debug!("bitswap: inject_connected {}", peer_id);
let ledger = Ledger::new();
self.stats.entry(peer_id.clone()).or_default();
self.connected_peers.insert(peer_id.clone(), ledger);
self.send_want_list(peer_id.clone());
}
@ -236,6 +292,10 @@ impl NetworkBehaviour for Bitswap {
for (peer_id, ledger) in &mut self.connected_peers {
if let Some(message) = ledger.send() {
if let Some(peer_stats) = self.stats.get_mut(peer_id) {
peer_stats.update_outgoing(message.blocks.len() as u64);
}
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::Any,

View File

@ -8,63 +8,10 @@ use prost::Message as ProstMessage;
use std::{
collections::{HashMap, HashSet},
mem,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub type Priority = i32;
#[derive(Debug, Default)]
pub struct Stats {
pub sent_blocks: AtomicU64,
pub sent_data: AtomicU64,
pub received_blocks: AtomicU64,
pub received_data: AtomicU64,
pub duplicate_blocks: AtomicU64,
pub duplicate_data: AtomicU64,
}
impl Stats {
pub fn update_outgoing(&self, num_blocks: u64) {
self.sent_blocks.fetch_add(num_blocks, Ordering::Relaxed);
}
pub fn update_incoming_unique(&self, bytes: u64) {
self.received_blocks.fetch_add(1, Ordering::Relaxed);
self.received_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn update_incoming_duplicate(&self, bytes: u64) {
self.duplicate_blocks.fetch_add(1, Ordering::Relaxed);
self.duplicate_data.fetch_add(bytes, Ordering::Relaxed);
}
pub fn add_assign(&self, other: &Stats) {
self.sent_blocks
.fetch_add(other.sent_blocks.load(Ordering::Relaxed), Ordering::Relaxed);
self.sent_data
.fetch_add(other.sent_data.load(Ordering::Relaxed), Ordering::Relaxed);
self.received_blocks.fetch_add(
other.received_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.received_data.fetch_add(
other.received_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_blocks.fetch_add(
other.duplicate_blocks.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.duplicate_data.fetch_add(
other.duplicate_data.load(Ordering::Relaxed),
Ordering::Relaxed,
);
}
}
/// The Ledger contains the history of transactions with a peer.
#[derive(Debug, Default)]
pub struct Ledger {
@ -74,8 +21,6 @@ pub struct Ledger {
pub(crate) received_want_list: HashMap<Cid, Priority>,
/// Queued message.
message: Message,
/// Statistics related to a given peer.
pub stats: Arc<Stats>,
}
impl Ledger {
@ -116,8 +61,6 @@ impl Ledger {
self.sent_want_list.insert(cid.clone(), *priority);
}
self.stats.update_outgoing(self.message.blocks.len() as u64);
Some(mem::take(&mut self.message))
}
}
@ -295,122 +238,3 @@ impl std::fmt::Debug for Message {
Ok(())
}
}
#[cfg(test)]
mod tests {
/*
use super::*;
#[test]
fn test_empty_message_to_from_bytes() {
let message = Message::new();
let bytes = message.clone().into_bytes();
let new_message = Message::from_bytes(&bytes).unwrap();
assert_eq!(message, new_message);
}
#[test]
fn test_want_message_to_from_bytes() {
let mut message = Message::new();
let block = Block::from("hello world");
message.want_block(&block.cid(), 1);
let bytes = message.clone().into_bytes();
let new_message = Message::from_bytes(&bytes).unwrap();
assert_eq!(message, new_message);
}
#[test]
fn test_cancel_message_to_from_bytes() {
let mut message = Message::new();
let block = Block::from("hello world");
message.cancel_block(&block.cid());
let bytes = message.clone().into_bytes();
let new_message = Message::from_bytes(&bytes).unwrap();
assert_eq!(message, new_message);
}
#[test]
fn test_payload_message_to_from_bytes() {
let mut message = Message::new();
let block = Block::from("hello world");
message.add_block(block);
let bytes = message.clone().into_bytes();
let new_message = Message::from_bytes(&bytes).unwrap();
assert_eq!(message, new_message);
}
#[test]
fn test_ledger_send_block() {
let block_1 = Block::from("1");
let block_2 = Block::from("2");
let mut ledger = Ledger::new();
ledger.add_block(block_1);
ledger.add_block(block_2);
ledger.send_message().unwrap();
assert_eq!(ledger.sent_blocks, 2);
}
#[test]
fn test_ledger_remove_block() {
let block_1 = Block::from("1");
let block_2 = Block::from("2");
let mut ledger = Ledger::new();
ledger.add_block(block_1.clone());
ledger.add_block(block_2);
ledger.remove_block(&block_1.cid());
ledger.send_message().unwrap();
assert_eq!(ledger.sent_blocks, 1);
}
#[test]
fn test_ledger_send_want() {
let block_1 = Block::from("1");
let block_2 = Block::from("2");
let mut ledger = Ledger::new();
ledger.want_block(&block_1.cid(), 1);
ledger.want_block(&block_2.cid(), 1);
ledger.cancel_block(&block_1.cid());
ledger.send_message().unwrap();
let mut want_list = HashMap::new();
want_list.insert(block_2.cid(), 1);
assert_eq!(ledger.sent_want_list, want_list);
}
#[test]
fn test_ledger_send_cancel() {
let block_1 = Block::from("1");
let block_2 = Block::from("2");
let mut ledger = Ledger::new();
ledger.want_block(&block_1.cid(), 1);
ledger.want_block(&block_2.cid(), 1);
ledger.send_message().unwrap();
ledger.cancel_block(&block_1.cid());
ledger.send_message().unwrap();
let mut want_list = HashMap::new();
want_list.insert(block_2.cid(), 1);
assert_eq!(ledger.sent_want_list, want_list);
}
#[test]
fn test_ledger_receive() {
let block_1 = Block::from("1");
let block_2 = Block::from("2");
let mut message = Message::new();
message.add_block(block_1);
message.want_block(&block_2.cid(), 1);
let mut ledger = Ledger::new();
ledger.receive_message(&message);
assert_eq!(ledger.received_blocks, 1);
let mut want_list = HashMap::new();
want_list.insert(block_2.cid(), 1);
assert_eq!(ledger.received_want_list, want_list);
let mut message = Message::new();
message.cancel_block(&block_2.cid());
ledger.receive_message(&message);
assert_eq!(ledger.received_want_list, HashMap::new());
}
*/
}

View File

@ -9,10 +9,10 @@ mod ledger;
mod prefix;
mod protocol;
pub use self::behaviour::{Bitswap, BitswapEvent};
pub use self::behaviour::{Bitswap, BitswapEvent, Stats};
pub use self::block::Block;
pub use self::error::BitswapError;
pub use self::ledger::{Priority, Stats};
pub use self::ledger::Priority;
mod bitswap_pb {
include!(concat!(env!("OUT_DIR"), "/bitswap_pb.rs"));

View File

@ -236,8 +236,7 @@ impl<Types: IpfsTypes> NetworkBehaviourEventProcess<BitswapEvent> for Behaviour<
match event {
BitswapEvent::ReceivedBlock(peer_id, block) => {
let ipfs = self.ipfs.clone();
let peer_stats =
Arc::clone(&self.bitswap.connected_peers.get(&peer_id).unwrap().stats);
let peer_stats = Arc::clone(&self.bitswap.stats.get(&peer_id).unwrap());
task::spawn(async move {
let bytes = block.data().len() as u64;
let res = ipfs.repo.put_block(block.clone()).await;