From 5f4c6726b512b3c496cb78d5d87df2b1c320f7f4 Mon Sep 17 00:00:00 2001 From: David Craven Date: Mon, 4 Feb 2019 04:32:26 +0100 Subject: [PATCH] p2p: Implement IPFS behaviour. --- src/bitswap/behaviour.rs | 131 ++++++++++++++++++++---- src/bitswap/ledger.rs | 214 ++++++++------------------------------- src/bitswap/mod.rs | 68 +------------ src/bitswap/protocol.rs | 9 +- src/bitswap/strategy.rs | 11 +- src/lib.rs | 17 ++-- src/p2p/behaviour.rs | 121 ++++++++++++++++++---- src/p2p/mod.rs | 4 +- 8 files changed, 287 insertions(+), 288 deletions(-) diff --git a/src/bitswap/behaviour.rs b/src/bitswap/behaviour.rs index 9ad4d9ea..9b2abbc5 100644 --- a/src/bitswap/behaviour.rs +++ b/src/bitswap/behaviour.rs @@ -5,15 +5,16 @@ //! //! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it //! will allow providing and reciving IPFS blocks. -use crate::bitswap::ledger::{BitswapEvent, Ledger, Message}; +use crate::bitswap::ledger::{BitswapEvent, Ledger, Message, Priority, I, O}; use crate::bitswap::protocol::BitswapConfig; +use crate::block::{Block, Cid}; use futures::prelude::*; use libp2p::core::swarm::{ ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, }; use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler}; use libp2p::{Multiaddr, PeerId}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; use tokio::prelude::*; @@ -22,9 +23,11 @@ pub struct Bitswap { /// Marker to pin the generics. marker: PhantomData, /// Queue of events to report to the user. - events: VecDeque>, + events: VecDeque, BitswapEvent>>, /// Ledger - ledger: Ledger, + peers: HashMap, + /// Wanted blocks + wanted_blocks: HashMap } impl Bitswap { @@ -33,9 +36,61 @@ impl Bitswap { Bitswap { marker: PhantomData, events: VecDeque::new(), - ledger: Ledger::new(), + peers: HashMap::new(), + wanted_blocks: HashMap::new(), } } + + /// Connect to peer. + /// + /// Called from Kademlia behaviour. + pub fn connect(&mut self, peer_id: PeerId) { + self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id }); + } + + /// Sends a block to the peer. + /// + /// Called from a Strategy. + pub fn send_block(&mut self, peer_id: PeerId, block: Block) { + let ledger = self.peers.get_mut(&peer_id).expect("Peer not in ledger?!"); + let message = ledger.send_block(block); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id, + event: message, + }); + } + + /// Queues the wanted block for all peers. + /// + /// A user request + pub fn want_block(&mut self, cid: Cid, priority: u8) { + for (peer_id, ledger) in self.peers.iter_mut() { + let message = ledger.want_block(&cid, priority); + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.to_owned(), + event: message, + }); + } + self.wanted_blocks.insert(cid, priority); + } + + /// Removes the block from our want list and updates all peers. + /// + /// Can be either a user request or be called when the block + /// was received. + pub fn cancel_block(&mut self, cid: &Cid) { + for (peer_id, ledger) in self.peers.iter_mut() { + ledger.cancel_block(cid); + let message = ledger.cancel_block(cid); + if message.is_some() { + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.to_owned(), + event: message.unwrap(), + }); + } + } + self.wanted_blocks.remove(cid); + } } impl Default for Bitswap { @@ -49,7 +104,7 @@ impl NetworkBehaviour for Bitswap where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler, InnerMessage>; type OutEvent = BitswapEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { @@ -61,11 +116,20 @@ where } fn inject_connected(&mut self, peer_id: PeerId, _: ConnectedPoint) { - self.ledger.peer_connected(peer_id); + let ledger = Ledger::new(); + let mut message = Message::new(); + for (cid, priority) in &self.wanted_blocks { + message.want_block(cid, *priority); + } + self.events.push_back(NetworkBehaviourAction::SendEvent { + peer_id: peer_id.clone(), + event: message, + }); + self.peers.insert(peer_id, ledger); } fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { - self.ledger.peer_disconnected(peer_id); + self.peers.remove(peer_id); } fn inject_node_event( @@ -73,15 +137,39 @@ where source: PeerId, event: InnerMessage, ) { - // We ignore successful send events. let message = match event { - InnerMessage::Rx(message) => message, - InnerMessage::Sent => return, + InnerMessage::Rx(message) => { + message + }, + InnerMessage::Tx => { + return; + }, }; - for event in self.ledger.receive_message(&source, message) { - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + // Update the ledger. + let ledger = self.peers.get_mut(&source).expect("Peer not in ledger?!"); + ledger.update_incoming_stats(&message); + + // Process incoming messages. + for block in message.blocks() { + // Cancel the block. + self.cancel_block(&block.cid()); + // Add block to received blocks + self.events.push_back(NetworkBehaviourAction::GenerateEvent( + BitswapEvent::Block { + block: block.to_owned(), + })); } + for (cid, priority) in message.want() { + self.events.push_back(NetworkBehaviourAction::GenerateEvent( + BitswapEvent::Want { + peer_id: source.clone(), + cid: cid.to_owned(), + priority: *priority, + })); + } + // TODO: Remove cancelled `Want` events from the queue. + // TODO: Remove cancelled blocks from `SendEvent`. } fn poll( @@ -89,7 +177,13 @@ where _: &mut PollParameters, ) -> Async::InEvent, Self::OutEvent>> { + // TODO concat messages to same destination to reduce traffic. if let Some(event) = self.events.pop_front() { + if let NetworkBehaviourAction::SendEvent { peer_id, event } = &event { + let ledger = self.peers.get_mut(&peer_id) + .expect("Peer not in ledger?!"); + ledger.update_outgoing_stats(&event); + } return Async::Ready(event); } @@ -98,16 +192,17 @@ where } /// Transmission between the `OneShotHandler` and the `BitswapHandler`. +#[derive(Debug)] pub enum InnerMessage { /// We received a `Message` from a remote. - Rx(Message), + Rx(Message), /// We successfully sent a `Message`. - Sent, + Tx, } -impl From for InnerMessage { +impl From> for InnerMessage { #[inline] - fn from(message: Message) -> InnerMessage { + fn from(message: Message) -> InnerMessage { InnerMessage::Rx(message) } } @@ -115,6 +210,6 @@ impl From for InnerMessage { impl From<()> for InnerMessage { #[inline] fn from(_: ()) -> InnerMessage { - InnerMessage::Sent + InnerMessage::Tx } } diff --git a/src/bitswap/ledger.rs b/src/bitswap/ledger.rs index 76e3a51f..23a9aecb 100644 --- a/src/bitswap/ledger.rs +++ b/src/bitswap/ledger.rs @@ -3,125 +3,13 @@ use crate::bitswap::protobuf_structs::bitswap as proto; use libp2p::PeerId; use protobuf::{ProtobufError, Message as ProtobufMessage}; use std::collections::HashMap; +use std::marker::PhantomData; pub type Priority = u8; -/// The Ledger contains all the state of all transactions. +/// The Ledger contains the history of transactions with a peer. #[derive(Debug)] pub struct Ledger { - peers: HashMap, - wanted_blocks: HashMap, -} - -impl Ledger { - /// Creates a new `Ledger`. - pub fn new() -> Ledger { - Ledger { - peers: HashMap::new(), - wanted_blocks: HashMap::new(), - } - } - - /// Returns the `PeerLedger` for `PeerId`. - #[allow(unused)] - pub fn peer_ledger(&self, peer_id: &PeerId) -> &PeerLedger { - self.peers.get(peer_id).expect("Peer not in ledger?!") - } - - /// Creates a new ledger entry for the peer and sends our want list. - pub fn peer_connected(&mut self, peer_id: PeerId) { - // TODO: load stats from previous interactions - let mut ledger = PeerLedger::new(); - for (cid, priority) in &self.wanted_blocks { - ledger.want_block(cid, *priority); - } - self.peers.insert(peer_id, ledger); - } - - /// Removes the ledger of a disconnected peer. - pub fn peer_disconnected(&mut self, peer_id: &PeerId) { - // TODO: persist stats for future interactions - self.peers.remove(peer_id); - } - - /// Queues the block to be sent to the peer. - pub fn send_block(&mut self, peer_id: &PeerId, block: Block) { - let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!"); - ledger.add_block(block); - } - - /// Adds the block to our want list and updates all peers. - pub fn want_block(&mut self, cid: Cid, priority: u8) { - for (_peer_id, ledger) in self.peers.iter_mut() { - ledger.want_block(&cid, priority); - } - self.wanted_blocks.insert(cid, priority); - } - - /// Removes the block from our want list and updates all peers. - pub fn cancel_block(&mut self, cid: &Cid) { - for (_peer_id, ledger) in self.peers.iter_mut() { - ledger.cancel_block(cid); - } - self.wanted_blocks.remove(cid); - } - - /// Parses and processes the message. - /// - /// If a block was received it adds it to the received blocks queue and - /// cancels the request for the block. - /// If a want was received it adds it to the received wants queue for - /// processing through a `Strategy`. - /// If a want was cancelled it removes it from the received wants queue. - pub fn receive_message(&mut self, peer_id: &PeerId, message: Message) -> Vec { - let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!"); - ledger.receive_message(&message); - for cid in message.cancel() { - // If a previous block was queued but has not been sent yet, remove it - // from the queue. - ledger.remove_block(cid); - // TODO: If the bitswap strategy has not processed the request yet, remove - // it from the queue. - //self.received_wants.drain_filter(|(peer_id2, cid2, _)| { - // peer_id == peer_id2 && cid == cid2 - //}); - } - // Queue new requests - let mut events = Vec::new(); - for block in message.blocks() { - // Add block to received blocks - events.push(BitswapEvent::Block { - block: block.to_owned(), - }); - // Cancel the block. - self.cancel_block(&block.cid()); - } - for (cid, priority) in message.want() { - events.push(BitswapEvent::Want { - peer_id: peer_id.to_owned(), - cid: cid.to_owned(), - priority: *priority, - }); - } - events - } - - /// Sends all queued messages. - pub fn send_messages(&mut self) -> Vec<(PeerId, Vec)>{ - let mut messages = Vec::new(); - for (peer_id, ledger) in self.peers.iter_mut() { - let message = ledger.send_message(); - if message.is_some() { - messages.push((peer_id.to_owned(), message.unwrap())); - } - } - messages - } -} - -/// The LedgerEntry contains all the state of all transactions with a peer. -#[derive(Debug)] -pub struct PeerLedger { /// The number of blocks sent to the peer. sent_blocks: usize, /// The number of blocks received from the peer. @@ -130,67 +18,42 @@ pub struct PeerLedger { sent_want_list: HashMap, /// The list of wanted blocks received from the peer. received_want_list: HashMap, - /// The next message to send to the peer. - queued_message: Option, } -impl PeerLedger { +impl Ledger { /// Creates a new `PeerLedger`. - fn new() -> Self { - PeerLedger { + pub fn new() -> Self { + Ledger { sent_blocks: 0, received_blocks: 0, sent_want_list: HashMap::new(), received_want_list: HashMap::new(), - queued_message: None, } } - /// Adds a block to the queued message. - fn add_block(&mut self, block: Block) { - if self.queued_message.is_none() { - self.queued_message = Some(Message::new()); - } - self.queued_message.as_mut().unwrap().add_block(block); + pub fn send_block(&mut self, block: Block) -> Message { + let mut message = Message::new(); + message.add_block(block); + message } - /// Removes a block from the queued message. - fn remove_block(&mut self, cid: &Cid) { - if self.queued_message.is_none() { - self.queued_message = Some(Message::new()); - } - self.queued_message.as_mut().unwrap().remove_block(cid); + pub fn want_block(&mut self, cid: &Cid, priority: Priority) -> Message { + let mut message = Message::new(); + message.want_block(cid, priority); + message } - /// Adds a block to the want list. - fn want_block(&mut self, cid: &Cid, priority: Priority) { - if self.queued_message.is_none() { - self.queued_message = Some(Message::new()); - } - self.queued_message.as_mut().unwrap().want_block(cid, priority); - } - - /// Removes the block from the want list. - fn cancel_block(&mut self, cid: &Cid) { - if self.queued_message.is_some() { - self.queued_message.as_mut().unwrap().soft_cancel_block(cid); - } + pub fn cancel_block(&mut self, cid: &Cid) -> Option> { if self.sent_want_list.contains_key(cid) { - if self.queued_message.is_none() { - self.queued_message = Some(Message::new()); - } - self.queued_message.as_mut().unwrap().cancel_block(cid); + let mut message = Message::new(); + message.cancel_block(cid); + Some(message) + } else { + None } } - /// Finalizes the message and returns it's contents. - /// - /// Updates the number of sent blocks and the sent want list entries. - fn send_message(&mut self) -> Option> { - if self.queued_message.is_none() { - return None; - } - let message = self.queued_message.take().unwrap(); + pub fn update_outgoing_stats(&mut self, message: &Message) { self.sent_blocks += message.blocks.len(); for cid in message.cancel() { self.sent_want_list.remove(cid); @@ -198,13 +61,9 @@ impl PeerLedger { for (cid, priority) in message.want() { self.sent_want_list.insert(cid.to_owned(), *priority); } - Some(message.into_bytes()) } - /// Parses a message. - /// - /// Updates the number of received blocks and the received want list entries. - fn receive_message(&mut self, message: &Message) { + pub fn update_incoming_stats(&mut self, message: &Message) { self.received_blocks += message.blocks.len(); for cid in message.cancel() { self.received_want_list.remove(cid); @@ -227,9 +86,16 @@ impl PeerLedger { } } +#[derive(Debug, Clone, PartialEq)] +pub struct I; +#[derive(Debug, Clone, PartialEq)] +pub struct O; + /// A bitswap message. #[derive(Debug, Clone, PartialEq)] -pub struct Message { +pub struct Message { + /// Message tag + _phantom_data: PhantomData, /// List of wanted blocks. want: HashMap, /// List of blocks to cancel. @@ -240,10 +106,11 @@ pub struct Message { blocks: Vec, } -impl Message { +impl Message { /// Creates a new bitswap message. pub fn new() -> Self { Message { + _phantom_data: PhantomData, want: HashMap::new(), cancel: Vec::new(), full: false, @@ -272,6 +139,7 @@ impl Message { } /// Removes the block from the message. + #[allow(unused)] pub fn remove_block(&mut self, cid: &Cid) { self.blocks.drain_filter(|block| &block.cid() == cid); } @@ -287,28 +155,31 @@ impl Message { } /// Removes the block from the want list. - pub fn soft_cancel_block(&mut self, cid: &Cid) { + #[allow(unused)] + pub fn remove_want_block(&mut self, cid: &Cid) { self.want.remove(cid); } +} +impl Message { /// Turns this `Message` into a message that can be sent to a substream. - pub fn into_bytes(self) -> Vec { + pub fn into_bytes(&self) -> Vec { let mut proto = proto::Message::new(); let mut wantlist = proto::Message_Wantlist::new(); - for (cid, priority) in self.want { + for (cid, priority) in self.want() { let mut entry = proto::Message_Wantlist_Entry::new(); entry.set_block(cid.to_bytes()); - entry.set_priority(priority as _); + entry.set_priority(*priority as _); wantlist.mut_entries().push(entry); } - for cid in self.cancel { + for cid in self.cancel() { let mut entry = proto::Message_Wantlist_Entry::new(); entry.set_block(cid.to_bytes()); entry.set_cancel(true); wantlist.mut_entries().push(entry); } proto.set_wantlist(wantlist); - for block in self.blocks { + for block in self.blocks() { let mut payload = proto::Message_Block::new(); payload.set_prefix(block.cid().prefix().as_bytes()); payload.set_data(block.data().to_vec()); @@ -319,6 +190,9 @@ impl Message { .expect("there is no situation in which the protobuf message can be invalid") } +} + +impl Message { /// Creates a `Message` from bytes that were received from a substream. pub fn from_bytes(bytes: &Vec) -> Result { let proto: proto::Message = protobuf::parse_from_bytes(bytes)?; diff --git a/src/bitswap/mod.rs b/src/bitswap/mod.rs index 7fb20118..2b1300f6 100644 --- a/src/bitswap/mod.rs +++ b/src/bitswap/mod.rs @@ -1,70 +1,10 @@ //! Bitswap protocol implementation -use crate::block::Cid; -use crate::p2p::{create_swarm, SecioKeyPair, Swarm}; -use futures::prelude::*; -use parity_multihash::Multihash; -use std::io::Error; - -mod behaviour; -mod ledger; +pub mod behaviour; +pub mod ledger; mod protobuf_structs; pub mod strategy; mod protocol; -use self::ledger::Ledger; +pub use self::behaviour::Bitswap; +pub use self::ledger::{BitswapEvent, Priority}; pub use self::strategy::{AltruisticStrategy, Strategy}; - -pub struct Bitswap { - swarm: Swarm, - ledger: Ledger, - _strategy: S, -} - -impl Bitswap { - pub fn new(local_private_key: SecioKeyPair, strategy: S) -> Self { - Bitswap { - swarm: create_swarm(local_private_key), - ledger: Ledger::new(), - _strategy: strategy, - } - } - - pub fn want_block(&mut self, cid: Cid) { - let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); - self.swarm.get_providers(hash); - - self.ledger.want_block(cid, 1); - } - - pub fn provide_block(&mut self, cid: &Cid) { - let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); - self.swarm.add_providing(hash); - } - - pub fn stop_providing_block(&mut self, cid: &Cid) { - let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); - self.swarm.remove_providing(&hash); - } -} - -impl Stream for Bitswap { - type Item = (); - type Error = Error; - - // TODO: hookup ledger and strategy properly - fn poll(&mut self) -> Poll, Self::Error> { - println!("polling bitswap"); - self.ledger.send_messages(); - loop { - match self.swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(event)) => { - println!("Result: {:#?}", event); - return Ok(Async::Ready(Some(()))); - }, - Async::Ready(None) | Async::NotReady => break, - } - } - - Ok(Async::NotReady) - } -} diff --git a/src/bitswap/protocol.rs b/src/bitswap/protocol.rs index 6c93d351..64475b58 100644 --- a/src/bitswap/protocol.rs +++ b/src/bitswap/protocol.rs @@ -7,7 +7,7 @@ use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade}; use protobuf::ProtobufError; use std::{io, iter}; use tokio::prelude::*; -use crate::bitswap::ledger::Message; +use crate::bitswap::ledger::{Message, I, O}; #[derive(Clone, Debug, Default)] pub struct BitswapConfig {} @@ -34,7 +34,7 @@ impl InboundUpgrade for BitswapConfig where TSocket: AsyncRead + AsyncWrite, { - type Output = Message; + type Output = Message; type Error = BitswapError; type Future = upgrade::ReadOneThen) -> Result>; @@ -85,7 +85,8 @@ impl std::error::Error for BitswapError { } } } -impl UpgradeInfo for Message { + +impl UpgradeInfo for Message { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -95,7 +96,7 @@ impl UpgradeInfo for Message { } } -impl OutboundUpgrade for Message +impl OutboundUpgrade for Message where TSocket: AsyncRead + AsyncWrite, { diff --git a/src/bitswap/strategy.rs b/src/bitswap/strategy.rs index e6b6f207..3ce4f702 100644 --- a/src/bitswap/strategy.rs +++ b/src/bitswap/strategy.rs @@ -1,11 +1,12 @@ use crate::block::Cid; +use crate::bitswap::Priority; use crate::repo::Repo; use libp2p::PeerId; -use crate::bitswap::ledger::{Ledger, Priority}; +use crate::p2p::Swarm; pub trait Strategy { fn new(repo: Repo) -> Self; - fn receive_want(&mut self, ledger: &mut Ledger, peer_id: &PeerId, cid: Cid, priority: Priority); + fn receive_want(&mut self, swarm: &mut Swarm, source: PeerId, cid: Cid, priority: Priority); } pub struct AltruisticStrategy { @@ -21,14 +22,14 @@ impl Strategy for AltruisticStrategy { fn receive_want( &mut self, - ledger: &mut Ledger, - peer_id: &PeerId, + swarm: &mut Swarm, + source: PeerId, cid: Cid, _priority: Priority, ) { let block = self.repo.get(&cid); if block.is_some() { - ledger.send_block(peer_id, block.unwrap()); + swarm.send_block(source, block.unwrap()); } } } diff --git a/src/lib.rs b/src/lib.rs index bf4f571b..f6af2c66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,16 +10,18 @@ mod future; mod p2p; mod repo; -use self::bitswap::{Bitswap, strategy::AltruisticStrategy, Strategy}; +use self::bitswap::{strategy::AltruisticStrategy, Strategy}; pub use self::block::{Block, Cid}; use self::future::BlockFuture; +use self::p2p::{create_swarm, Swarm}; use self::repo::Repo; /// Ipfs struct creates a new IPFS node and is the main entry point /// for interacting with IPFS. pub struct Ipfs { repo: Repo, - bitswap: Bitswap, + strategy: AltruisticStrategy, + swarm: Swarm, } impl Ipfs { @@ -28,25 +30,26 @@ impl Ipfs { let repo = Repo::new(); let local_key = SecioKeyPair::ed25519_generated().unwrap(); let strategy = AltruisticStrategy::new(repo.clone()); - let bitswap = Bitswap::new(local_key, strategy); + let swarm = create_swarm(local_key); Ipfs { repo, - bitswap, + strategy, + swarm, } } /// Puts a block into the ipfs repo. pub fn put_block(&mut self, block: Block) -> Cid { let cid = self.repo.put(block); - self.bitswap.provide_block(&cid); + self.swarm.provide_block(&cid); cid } /// Retrives a block from the ipfs repo. pub fn get_block(&mut self, cid: Cid) -> BlockFuture { if !self.repo.contains(&cid) { - self.bitswap.want_block(cid.clone()); + self.swarm.want_block(cid.clone()); } BlockFuture::new(self.repo.clone(), cid) } @@ -54,7 +57,7 @@ impl Ipfs { /// Remove block from the ipfs repo. pub fn remove_block(&mut self, cid: Cid) { self.repo.remove(&cid); - self.bitswap.stop_providing_block(&cid); + self.swarm.stop_providing_block(&cid); } } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 5be589c8..11b1b9b8 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -1,7 +1,12 @@ -use libp2p::PeerId; +use crate::bitswap::{Bitswap, BitswapEvent}; +use crate::block::{Block, Cid}; +use libp2p::{NetworkBehaviour, PeerId}; +use libp2p::core::swarm::NetworkBehaviourEventProcess; use libp2p::core::muxing::{StreamMuxerBox, SubstreamRef}; -use libp2p::kad::Kademlia; +use libp2p::kad::{Kademlia, KademliaOut as KademliaEvent}; +use parity_multihash::Multihash; use std::sync::Arc; +use tokio::prelude::*; /// IPFS bootstrap nodes. const BOOTSTRAP_NODES: &[(&'static str, &'static str)] = &[ @@ -44,23 +49,105 @@ const BOOTSTRAP_NODES: &[(&'static str, &'static str)] = &[ ]; /// Behaviour type. -pub type TBehaviour = Kademlia>>; +#[derive(NetworkBehaviour)] +pub struct Behaviour { + kademlia: Kademlia, + bitswap: Bitswap, +} -/// Create a Kademlia behaviour with the IPFS bootstrap nodes. -pub fn build_behaviour(local_peer_id: PeerId) -> TBehaviour { - // Note that normally the Kademlia process starts by performing lots of - // request in order to insert our local node in the DHT. However here we use - // `without_init` because this example is very ephemeral and we don't want - // to pollute the DHT. In a real world application, you want to use `new` - // instead. - let mut behaviour = Kademlia::without_init(local_peer_id); +impl + NetworkBehaviourEventProcess for + Behaviour +{ + fn inject_event(&mut self, event: KademliaEvent) { + match event { + KademliaEvent::Discovered { .. } => { - for (identity, location) in BOOTSTRAP_NODES { - behaviour.add_address( - &identity.parse().unwrap(), - location.parse().unwrap(), - ); + } + KademliaEvent::FindNodeResult { .. } => { + + } + KademliaEvent::GetProvidersResult { + provider_peers, + .. + } => { + for peer in provider_peers { + self.bitswap.connect(peer); + } + } + } + } +} + +impl + NetworkBehaviourEventProcess for + Behaviour +{ + fn inject_event(&mut self, event: BitswapEvent) { + match event { + BitswapEvent::Block { block } => { + println!("Received block with contents: '{:?}'", + String::from_utf8_lossy(&block.data())); + } + BitswapEvent::Want { peer_id, cid, priority } => { + println!("Peer {:?} wants block {:?} with priority {}", + peer_id, cid.to_string(), priority); + } + } + } +} + +impl Behaviour +{ + /// Create a Kademlia behaviour with the IPFS bootstrap nodes. + pub fn new(local_peer_id: PeerId) -> Self { + // Note that normally the Kademlia process starts by performing lots of + // request in order to insert our local node in the DHT. However here we use + // `without_init` because this example is very ephemeral and we don't want + // to pollute the DHT. In a real world application, you want to use `new` + // instead. + let mut kademlia = Kademlia::without_init(local_peer_id); + + for (identity, location) in BOOTSTRAP_NODES { + kademlia.add_address( + &identity.parse().unwrap(), + location.parse().unwrap(), + ); + } + + let bitswap = Bitswap::new(); + + Behaviour { + kademlia, + bitswap, + } } - behaviour + pub fn send_block(&mut self, peer_id: PeerId, block: Block) { + self.bitswap.send_block(peer_id, block); + } + + pub fn want_block(&mut self, cid: Cid) { + let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); + self.kademlia.get_providers(hash); + self.bitswap.want_block(cid, 1); + } + + pub fn provide_block(&mut self, cid: &Cid) { + let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); + self.kademlia.add_providing(hash); + } + + pub fn stop_providing_block(&mut self, cid: &Cid) { + let hash = Multihash::from_bytes(cid.hash.clone()).unwrap(); + self.kademlia.remove_providing(&hash); + } +} + +/// Behaviour type. +pub type TBehaviour = Behaviour>>; + +/// Create a IPFS behaviour with the IPFS bootstrap nodes. +pub fn build_behaviour(local_peer_id: PeerId) -> TBehaviour { + Behaviour::new(local_peer_id) } diff --git a/src/p2p/mod.rs b/src/p2p/mod.rs index 2e6adcbd..f2532c78 100644 --- a/src/p2p/mod.rs +++ b/src/p2p/mod.rs @@ -7,9 +7,7 @@ mod transport; pub type Swarm = libp2p::core::Swarm; /// Creates a new IPFS swarm. -pub fn create_swarm( - local_private_key: SecioKeyPair, -) -> Swarm { +pub fn create_swarm(local_private_key: SecioKeyPair) -> Swarm { let local_peer_id = local_private_key.to_peer_id(); // Set up an encrypted TCP transport over the Mplex protocol.