bitswap: Implement network behaviour.

This commit is contained in:
David Craven 2019-02-04 00:13:29 +01:00
parent 58764dffc9
commit 65e7318a71
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
3 changed files with 87 additions and 48 deletions

View File

@ -5,12 +5,15 @@
//!
//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
//! will allow providing and reciving IPFS blocks.
use crate::bitswap::ledger::{BitswapEvent, Ledger, Message};
use crate::bitswap::protocol::BitswapConfig;
use futures::prelude::*;
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::core::protocols_handler::ProtocolsHandler;
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
use libp2p::{Multiaddr, PeerId};
use std::collections::VecDeque;
use std::marker::PhantomData;
use tokio::prelude::*;
@ -19,12 +22,9 @@ pub struct Bitswap<TSubstream> {
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// Queue of events to report to the user.
events: Vec<BitswapEvent>,
}
/// Event generated by the `Bitswap` behaviour.
pub enum BitswapEvent {
events: VecDeque<NetworkBehaviourAction<Message, BitswapEvent>>,
/// Ledger
ledger: Ledger,
}
impl<TSubstream> Bitswap<TSubstream> {
@ -32,7 +32,8 @@ impl<TSubstream> Bitswap<TSubstream> {
pub fn new() -> Self {
Bitswap {
marker: PhantomData,
events: Vec::new(),
events: VecDeque::new(),
ledger: Ledger::new(),
}
}
}
@ -48,26 +49,39 @@ impl<TSubstream> NetworkBehaviour for Bitswap<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = ();
type ProtocolsHandler = OneShotHandler<TSubstream, BitswapConfig, Message, InnerMessage>;
type OutEvent = BitswapEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
()
Default::default()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_connected(&mut self, peer_id: PeerId, _: ConnectedPoint) {
self.ledger.peer_connected(peer_id);
}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
self.ledger.peer_disconnected(peer_id);
}
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
event: InnerMessage,
) {
// We ignore successful send events.
let message = match event {
InnerMessage::Rx(message) => message,
InnerMessage::Sent => return,
};
for event in self.ledger.receive_message(&source, message) {
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
fn poll(
@ -75,11 +89,32 @@ where
_: &mut PollParameters,
) -> Async<NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
if !self.events.is_empty() {
let event = self.events.remove(0);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
if let Some(event) = self.events.pop_front() {
return Async::Ready(event);
}
Async::NotReady
}
}
/// Transmission between the `OneShotHandler` and the `BitswapHandler`.
pub enum InnerMessage {
/// We received a `Message` from a remote.
Rx(Message),
/// We successfully sent a `Message`.
Sent,
}
impl From<Message> for InnerMessage {
#[inline]
fn from(message: Message) -> InnerMessage {
InnerMessage::Rx(message)
}
}
impl From<()> for InnerMessage {
#[inline]
fn from(_: ()) -> InnerMessage {
InnerMessage::Sent
}
}

View File

@ -10,8 +10,6 @@ pub type Priority = u8;
#[derive(Debug)]
pub struct Ledger {
peers: HashMap<PeerId, PeerLedger>,
received_blocks: Vec<Block>,
received_wants: Vec<(PeerId, Cid, Priority)>,
wanted_blocks: HashMap<Cid, Priority>,
}
@ -21,8 +19,6 @@ impl Ledger {
Ledger {
peers: HashMap::new(),
wanted_blocks: HashMap::new(),
received_blocks: Vec::new(),
received_wants: Vec::new(),
}
}
@ -77,35 +73,37 @@ impl Ledger {
/// If a want was received it adds it to the received wants queue for
/// processing through a `Strategy`.
/// If a want was cancelled it removes it from the received wants queue.
pub fn receive_message(&mut self, peer_id: &PeerId, bytes: Vec<u8>) {
pub fn receive_message(&mut self, peer_id: &PeerId, message: Message) -> Vec<BitswapEvent> {
let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!");
let message = match ledger.receive_message(bytes) {
Ok(message) => message,
Err(err) => {
println!("{}", err);
return;
}
};
ledger.receive_message(&message);
for cid in message.cancel() {
// If a previous block was queued but has not been sent yet, remove it
// from the queue.
ledger.remove_block(cid);
// If the bitswap strategy has not processed the request yet, remove
// TODO: If the bitswap strategy has not processed the request yet, remove
// it from the queue.
self.received_wants.drain_filter(|(peer_id2, cid2, _)| {
peer_id == peer_id2 && cid == cid2
});
//self.received_wants.drain_filter(|(peer_id2, cid2, _)| {
// peer_id == peer_id2 && cid == cid2
//});
}
// Queue new requests
for (cid, priority) in message.want() {
self.received_wants.push((peer_id.to_owned(), cid.to_owned(), *priority));
}
let mut events = Vec::new();
for block in message.blocks() {
// Add block to received blocks
self.received_blocks.push(block.to_owned());
events.push(BitswapEvent::Block {
block: block.to_owned(),
});
// Cancel the block.
self.cancel_block(&block.cid());
}
for (cid, priority) in message.want() {
events.push(BitswapEvent::Want {
peer_id: peer_id.to_owned(),
cid: cid.to_owned(),
priority: *priority,
});
}
events
}
/// Sends all queued messages.
@ -206,8 +204,7 @@ impl PeerLedger {
/// Parses a message.
///
/// Updates the number of received blocks and the received want list entries.
fn receive_message(&mut self, bytes: Vec<u8>) -> Result<Message, ProtobufError> {
let message = Message::from_bytes(&bytes)?;
fn receive_message(&mut self, message: &Message) {
self.received_blocks += message.blocks.len();
for cid in message.cancel() {
self.received_want_list.remove(cid);
@ -215,7 +212,6 @@ impl PeerLedger {
for (cid, priority) in message.want() {
self.received_want_list.insert(cid.to_owned(), *priority);
}
Ok(message)
}
/// Gets the number of sent blocks.
@ -345,6 +341,20 @@ impl Message {
}
}
/// Event generated by the `Bitswap` behaviour.
pub enum BitswapEvent {
/// A block was received.
Block {
block: Block,
},
/// An action needs to be taken by the bitswap strategy.
Want {
peer_id: PeerId,
cid: Cid,
priority: Priority,
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -446,10 +456,9 @@ mod tests {
let mut message = Message::new();
message.add_block(block_1);
message.want_block(&block_2.cid(), 1);
let bytes = message.into_bytes();
let mut ledger = PeerLedger::new();
ledger.receive_message(bytes).unwrap();
ledger.receive_message(&message);
assert_eq!(ledger.received_blocks, 1);
let mut want_list = HashMap::new();
@ -458,8 +467,7 @@ mod tests {
let mut message = Message::new();
message.cancel_block(&block_2.cid());
let bytes = message.into_bytes();
ledger.receive_message(bytes).unwrap();
ledger.receive_message(&message);
assert_eq!(ledger.received_want_list, HashMap::new());
}
}

View File

@ -5,7 +5,7 @@ use futures::prelude::*;
use parity_multihash::Multihash;
use std::io::Error;
//mod behaviour;
mod behaviour;
mod ledger;
mod protobuf_structs;
pub mod strategy;
@ -54,11 +54,7 @@ impl<S: Strategy> Stream for Bitswap<S> {
// TODO: hookup ledger and strategy properly
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
println!("polling bitswap");
let peer_id = Swarm::local_peer_id(&self.swarm);
self.ledger.peer_connected(peer_id.clone());
self.ledger.peer_disconnected(&peer_id);
self.ledger.send_messages();
self.ledger.receive_message(peer_id, Vec::new());
loop {
match self.swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some(event)) => {