diff --git a/src/bitswap/behaviour.rs b/src/bitswap/behaviour.rs index a14c3e3b..9ad4d9ea 100644 --- a/src/bitswap/behaviour.rs +++ b/src/bitswap/behaviour.rs @@ -5,12 +5,15 @@ //! //! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it //! will allow providing and reciving IPFS blocks. +use crate::bitswap::ledger::{BitswapEvent, Ledger, Message}; +use crate::bitswap::protocol::BitswapConfig; use futures::prelude::*; use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; -use libp2p::core::protocols_handler::ProtocolsHandler; +use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler}; use libp2p::{Multiaddr, PeerId}; +use std::collections::VecDeque; use std::marker::PhantomData; use tokio::prelude::*; @@ -19,12 +22,9 @@ pub struct Bitswap { /// Marker to pin the generics. marker: PhantomData, /// Queue of events to report to the user. - events: Vec, -} - -/// Event generated by the `Bitswap` behaviour. -pub enum BitswapEvent { - + events: VecDeque>, + /// Ledger + ledger: Ledger, } impl Bitswap { @@ -32,7 +32,8 @@ impl Bitswap { pub fn new() -> Self { Bitswap { marker: PhantomData, - events: Vec::new(), + events: VecDeque::new(), + ledger: Ledger::new(), } } } @@ -48,26 +49,39 @@ impl NetworkBehaviour for Bitswap where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = (); + type ProtocolsHandler = OneShotHandler; type OutEvent = BitswapEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - () + Default::default() } fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, peer_id: PeerId, _: ConnectedPoint) { + self.ledger.peer_connected(peer_id); + } - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + self.ledger.peer_disconnected(peer_id); + } fn inject_node_event( &mut self, source: PeerId, - event: ::OutEvent, + event: InnerMessage, ) { + // We ignore successful send events. + let message = match event { + InnerMessage::Rx(message) => message, + InnerMessage::Sent => return, + }; + + for event in self.ledger.receive_message(&source, message) { + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } } fn poll( @@ -75,11 +89,32 @@ where _: &mut PollParameters, ) -> Async::InEvent, Self::OutEvent>> { - if !self.events.is_empty() { - let event = self.events.remove(0); - return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); + if let Some(event) = self.events.pop_front() { + return Async::Ready(event); } Async::NotReady } } + +/// Transmission between the `OneShotHandler` and the `BitswapHandler`. +pub enum InnerMessage { + /// We received a `Message` from a remote. + Rx(Message), + /// We successfully sent a `Message`. + Sent, +} + +impl From for InnerMessage { + #[inline] + fn from(message: Message) -> InnerMessage { + InnerMessage::Rx(message) + } +} + +impl From<()> for InnerMessage { + #[inline] + fn from(_: ()) -> InnerMessage { + InnerMessage::Sent + } +} diff --git a/src/bitswap/ledger.rs b/src/bitswap/ledger.rs index 2f735308..76e3a51f 100644 --- a/src/bitswap/ledger.rs +++ b/src/bitswap/ledger.rs @@ -10,8 +10,6 @@ pub type Priority = u8; #[derive(Debug)] pub struct Ledger { peers: HashMap, - received_blocks: Vec, - received_wants: Vec<(PeerId, Cid, Priority)>, wanted_blocks: HashMap, } @@ -21,8 +19,6 @@ impl Ledger { Ledger { peers: HashMap::new(), wanted_blocks: HashMap::new(), - received_blocks: Vec::new(), - received_wants: Vec::new(), } } @@ -77,35 +73,37 @@ impl Ledger { /// If a want was received it adds it to the received wants queue for /// processing through a `Strategy`. /// If a want was cancelled it removes it from the received wants queue. - pub fn receive_message(&mut self, peer_id: &PeerId, bytes: Vec) { + pub fn receive_message(&mut self, peer_id: &PeerId, message: Message) -> Vec { let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!"); - let message = match ledger.receive_message(bytes) { - Ok(message) => message, - Err(err) => { - println!("{}", err); - return; - } - }; + ledger.receive_message(&message); for cid in message.cancel() { // If a previous block was queued but has not been sent yet, remove it // from the queue. ledger.remove_block(cid); - // If the bitswap strategy has not processed the request yet, remove + // TODO: If the bitswap strategy has not processed the request yet, remove // it from the queue. - self.received_wants.drain_filter(|(peer_id2, cid2, _)| { - peer_id == peer_id2 && cid == cid2 - }); + //self.received_wants.drain_filter(|(peer_id2, cid2, _)| { + // peer_id == peer_id2 && cid == cid2 + //}); } // Queue new requests - for (cid, priority) in message.want() { - self.received_wants.push((peer_id.to_owned(), cid.to_owned(), *priority)); - } + let mut events = Vec::new(); for block in message.blocks() { // Add block to received blocks - self.received_blocks.push(block.to_owned()); + events.push(BitswapEvent::Block { + block: block.to_owned(), + }); // Cancel the block. self.cancel_block(&block.cid()); } + for (cid, priority) in message.want() { + events.push(BitswapEvent::Want { + peer_id: peer_id.to_owned(), + cid: cid.to_owned(), + priority: *priority, + }); + } + events } /// Sends all queued messages. @@ -206,8 +204,7 @@ impl PeerLedger { /// Parses a message. /// /// Updates the number of received blocks and the received want list entries. - fn receive_message(&mut self, bytes: Vec) -> Result { - let message = Message::from_bytes(&bytes)?; + fn receive_message(&mut self, message: &Message) { self.received_blocks += message.blocks.len(); for cid in message.cancel() { self.received_want_list.remove(cid); @@ -215,7 +212,6 @@ impl PeerLedger { for (cid, priority) in message.want() { self.received_want_list.insert(cid.to_owned(), *priority); } - Ok(message) } /// Gets the number of sent blocks. @@ -345,6 +341,20 @@ impl Message { } } +/// Event generated by the `Bitswap` behaviour. +pub enum BitswapEvent { + /// A block was received. + Block { + block: Block, + }, + /// An action needs to be taken by the bitswap strategy. + Want { + peer_id: PeerId, + cid: Cid, + priority: Priority, + } +} + #[cfg(test)] mod tests { use super::*; @@ -446,10 +456,9 @@ mod tests { let mut message = Message::new(); message.add_block(block_1); message.want_block(&block_2.cid(), 1); - let bytes = message.into_bytes(); let mut ledger = PeerLedger::new(); - ledger.receive_message(bytes).unwrap(); + ledger.receive_message(&message); assert_eq!(ledger.received_blocks, 1); let mut want_list = HashMap::new(); @@ -458,8 +467,7 @@ mod tests { let mut message = Message::new(); message.cancel_block(&block_2.cid()); - let bytes = message.into_bytes(); - ledger.receive_message(bytes).unwrap(); + ledger.receive_message(&message); assert_eq!(ledger.received_want_list, HashMap::new()); } } diff --git a/src/bitswap/mod.rs b/src/bitswap/mod.rs index 563e44b2..7fb20118 100644 --- a/src/bitswap/mod.rs +++ b/src/bitswap/mod.rs @@ -5,7 +5,7 @@ use futures::prelude::*; use parity_multihash::Multihash; use std::io::Error; -//mod behaviour; +mod behaviour; mod ledger; mod protobuf_structs; pub mod strategy; @@ -54,11 +54,7 @@ impl Stream for Bitswap { // TODO: hookup ledger and strategy properly fn poll(&mut self) -> Poll, Self::Error> { println!("polling bitswap"); - let peer_id = Swarm::local_peer_id(&self.swarm); - self.ledger.peer_connected(peer_id.clone()); - self.ledger.peer_disconnected(&peer_id); self.ledger.send_messages(); - self.ledger.receive_message(peer_id, Vec::new()); loop { match self.swarm.poll().expect("Error while polling swarm") { Async::Ready(Some(event)) => {