Improve logging output.

This commit is contained in:
David Craven 2019-02-13 22:31:58 +01:00
parent 61862e9568
commit 3b9a9ed846
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
3 changed files with 70 additions and 27 deletions

View File

@ -9,6 +9,7 @@ use crate::bitswap::ledger::{Ledger, Message, Priority, I, O};
use crate::bitswap::protocol::BitswapConfig;
use crate::bitswap::strategy::{Strategy, StrategyEvent};
use crate::block::{Block, Cid};
use fnv::FnvHashSet;
use futures::prelude::*;
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
@ -25,8 +26,10 @@ pub struct Bitswap<TSubstream, TStrategy: Strategy> {
marker: PhantomData<TSubstream>,
/// Queue of events to report to the user.
events: VecDeque<NetworkBehaviourAction<Message<O>, ()>>,
/// List of peers to send messages to.
target_peers: FnvHashSet<PeerId>,
/// Ledger
peers: HashMap<PeerId, Ledger>,
connected_peers: HashMap<PeerId, Ledger>,
/// Wanted blocks
wanted_blocks: HashMap<Cid, Priority>,
/// Strategy
@ -36,10 +39,12 @@ pub struct Bitswap<TSubstream, TStrategy: Strategy> {
impl<TSubstream, TStrategy: Strategy> Bitswap<TSubstream, TStrategy> {
/// Creates a `Bitswap`.
pub fn new(strategy: TStrategy) -> Self {
println!("bitswap: new");
Bitswap {
marker: PhantomData,
events: VecDeque::new(),
peers: HashMap::new(),
target_peers: FnvHashSet::default(),
connected_peers: HashMap::new(),
wanted_blocks: HashMap::new(),
strategy,
}
@ -49,33 +54,47 @@ impl<TSubstream, TStrategy: Strategy> Bitswap<TSubstream, TStrategy> {
///
/// Called from Kademlia behaviour.
pub fn connect(&mut self, peer_id: PeerId) {
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id });
println!("bitswap: connect");
if self.wanted_blocks.len() > 0 {
if self.target_peers.insert(peer_id.clone()) {
println!(" queuing dial_peer to {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id });
}
}
println!("");
}
/// Sends a block to the peer.
///
/// Called from a Strategy.
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
let ledger = self.peers.get_mut(&peer_id).expect("Peer not in ledger?!");
println!("bitswap: send_block");
let ledger = self.connected_peers.get_mut(&peer_id)
.expect("Peer not in ledger?!");
let message = ledger.send_block(block);
println!(" queuing block for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id,
event: message,
});
println!("");
}
/// Queues the wanted block for all peers.
///
/// A user request
pub fn want_block(&mut self, cid: Cid, priority: Priority) {
for (peer_id, ledger) in self.peers.iter_mut() {
println!("bitswap: want_block");
for (peer_id, ledger) in self.connected_peers.iter_mut() {
let message = ledger.want_block(&cid, priority);
println!(" queuing want for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.to_owned(),
event: message,
});
}
self.wanted_blocks.insert(cid, priority);
println!("");
}
/// Removes the block from our want list and updates all peers.
@ -83,10 +102,11 @@ impl<TSubstream, TStrategy: Strategy> Bitswap<TSubstream, TStrategy> {
/// Can be either a user request or be called when the block
/// was received.
pub fn cancel_block(&mut self, cid: &Cid) {
for (peer_id, ledger) in self.peers.iter_mut() {
ledger.cancel_block(cid);
println!("bitswap: cancel_block");
for (peer_id, ledger) in self.connected_peers.iter_mut() {
let message = ledger.cancel_block(cid);
if message.is_some() {
println!(" queuing cancel for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.to_owned(),
event: message.unwrap(),
@ -94,6 +114,7 @@ impl<TSubstream, TStrategy: Strategy> Bitswap<TSubstream, TStrategy> {
}
}
self.wanted_blocks.remove(cid);
println!("");
}
}
@ -105,30 +126,41 @@ where
type OutEvent = ();
fn new_handler(&mut self) -> Self::ProtocolsHandler {
println!("bitswap: new_handler");
Default::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
println!("bitswap: addresses_of_peer");
Vec::new()
}
fn inject_connected(&mut self, peer_id: PeerId, _: ConnectedPoint) {
fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
println!("bitswap: inject_connected");
println!(" peer_id: {}", peer_id.to_base58());
println!(" connected_point: {:?}", cp);
let ledger = Ledger::new();
if !self.wanted_blocks.is_empty() {
let mut message = Message::new();
for (cid, priority) in &self.wanted_blocks {
message.want_block(cid, *priority);
}
println!(" queuing wanted blocks");
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: message,
});
}
self.peers.insert(peer_id, ledger);
self.connected_peers.insert(peer_id, ledger);
println!("");
}
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
self.peers.remove(peer_id);
fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
println!("bitswap: inject_disconnected {:?}", cp);
println!(" peer_id: {}", peer_id.to_base58());
println!(" connected_point: {:?}", cp);
println!("");
//self.connected_peers.remove(peer_id);
}
fn inject_node_event(
@ -136,6 +168,8 @@ where
source: PeerId,
event: InnerMessage,
) {
println!("bitswap: inject_node_event");
println!("{:?}", event);
let message = match event {
InnerMessage::Rx(message) => {
message
@ -144,9 +178,11 @@ where
return;
},
};
println!(" received message");
// Update the ledger.
let ledger = self.peers.get_mut(&source).expect("Peer not in ledger?!");
let ledger = self.connected_peers.get_mut(&source)
.expect("Peer not in ledger?!");
ledger.update_incoming_stats(&message);
// Process incoming messages.
@ -160,6 +196,7 @@ where
}
// TODO: Remove cancelled `Want` events from the queue.
// TODO: Remove cancelled blocks from `SendEvent`.
println!("");
}
fn poll(
@ -167,25 +204,29 @@ where
_: &mut PollParameters,
) -> Async<NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
println!("bitswap: poll");
// TODO concat messages to same destination to reduce traffic.
if let Some(event) = self.events.pop_front() {
if let NetworkBehaviourAction::SendEvent { peer_id, event } = &event {
let ledger = self.connected_peers.get_mut(&peer_id)
.expect("Peer not in ledger?!");
ledger.update_outgoing_stats(&event);
println!(" send_message to {}", peer_id.to_base58());
}
println!("{:?}", event);
println!("");
return Async::Ready(event);
}
match self.strategy.poll() {
Some(StrategyEvent::Send { peer_id, block }) => {
self.send_block(peer_id, block);
task::current().notify();
}
None => {}
}
// TODO concat messages to same destination to reduce traffic.
if let Some(event) = self.events.pop_front() {
if let NetworkBehaviourAction::SendEvent { peer_id, event } = &event {
let ledger = self.peers.get_mut(&peer_id)
.expect("Peer not in ledger?!");
ledger.update_outgoing_stats(&event);
println!("Sending bitswap message to {}", peer_id.to_base58());
println!("{:?}", event);
}
return Async::Ready(event);
}
println!("");
Async::NotReady
}
}

View File

@ -211,7 +211,7 @@ impl<T> std::fmt::Debug for Message<T> {
writeln!(fmt, "cancel: {}", cid.to_string())?;
}
for block in self.blocks() {
writeln!(fmt, "block: {:?}", block)?;
writeln!(fmt, "block: {}", block.cid().to_string())?;
}
Ok(())
}

View File

@ -31,7 +31,8 @@ where
type Future = upgrade::ReadOneThen<TSocket, fn(Vec<u8>) -> Result<Self::Output, Self::Error>>;
#[inline]
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, socket: TSocket, info: Self::Info) -> Self::Future {
println!("upgrade_inbound: {}", std::str::from_utf8(info).unwrap());
upgrade::read_one_then(socket, 2048, |packet| {
Ok(Message::from_bytes(&packet)?)
})
@ -97,7 +98,8 @@ where
type Future = upgrade::WriteOne<TSocket>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
fn upgrade_outbound(self, socket: TSocket, info: Self::Info) -> Self::Future {
println!("upgrade_outbound: {}", std::str::from_utf8(info).unwrap());
let bytes = self.into_bytes();
upgrade::write_one(socket, bytes)
}