p2p: Implement IPFS behaviour.
This commit is contained in:
parent
65e7318a71
commit
5f4c6726b5
@ -5,15 +5,16 @@
|
||||
//!
|
||||
//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
|
||||
//! will allow providing and reciving IPFS blocks.
|
||||
use crate::bitswap::ledger::{BitswapEvent, Ledger, Message};
|
||||
use crate::bitswap::ledger::{BitswapEvent, Ledger, Message, Priority, I, O};
|
||||
use crate::bitswap::protocol::BitswapConfig;
|
||||
use crate::block::{Block, Cid};
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::swarm::{
|
||||
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
||||
};
|
||||
use libp2p::core::protocols_handler::{OneShotHandler, ProtocolsHandler};
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::marker::PhantomData;
|
||||
use tokio::prelude::*;
|
||||
|
||||
@ -22,9 +23,11 @@ pub struct Bitswap<TSubstream> {
|
||||
/// Marker to pin the generics.
|
||||
marker: PhantomData<TSubstream>,
|
||||
/// Queue of events to report to the user.
|
||||
events: VecDeque<NetworkBehaviourAction<Message, BitswapEvent>>,
|
||||
events: VecDeque<NetworkBehaviourAction<Message<O>, BitswapEvent>>,
|
||||
/// Ledger
|
||||
ledger: Ledger,
|
||||
peers: HashMap<PeerId, Ledger>,
|
||||
/// Wanted blocks
|
||||
wanted_blocks: HashMap<Cid, Priority>
|
||||
}
|
||||
|
||||
impl<TSubstream> Bitswap<TSubstream> {
|
||||
@ -33,9 +36,61 @@ impl<TSubstream> Bitswap<TSubstream> {
|
||||
Bitswap {
|
||||
marker: PhantomData,
|
||||
events: VecDeque::new(),
|
||||
ledger: Ledger::new(),
|
||||
peers: HashMap::new(),
|
||||
wanted_blocks: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to peer.
|
||||
///
|
||||
/// Called from Kademlia behaviour.
|
||||
pub fn connect(&mut self, peer_id: PeerId) {
|
||||
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id });
|
||||
}
|
||||
|
||||
/// Sends a block to the peer.
|
||||
///
|
||||
/// Called from a Strategy.
|
||||
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
|
||||
let ledger = self.peers.get_mut(&peer_id).expect("Peer not in ledger?!");
|
||||
let message = ledger.send_block(block);
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id,
|
||||
event: message,
|
||||
});
|
||||
}
|
||||
|
||||
/// Queues the wanted block for all peers.
|
||||
///
|
||||
/// A user request
|
||||
pub fn want_block(&mut self, cid: Cid, priority: u8) {
|
||||
for (peer_id, ledger) in self.peers.iter_mut() {
|
||||
let message = ledger.want_block(&cid, priority);
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: peer_id.to_owned(),
|
||||
event: message,
|
||||
});
|
||||
}
|
||||
self.wanted_blocks.insert(cid, priority);
|
||||
}
|
||||
|
||||
/// Removes the block from our want list and updates all peers.
|
||||
///
|
||||
/// Can be either a user request or be called when the block
|
||||
/// was received.
|
||||
pub fn cancel_block(&mut self, cid: &Cid) {
|
||||
for (peer_id, ledger) in self.peers.iter_mut() {
|
||||
ledger.cancel_block(cid);
|
||||
let message = ledger.cancel_block(cid);
|
||||
if message.is_some() {
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: peer_id.to_owned(),
|
||||
event: message.unwrap(),
|
||||
});
|
||||
}
|
||||
}
|
||||
self.wanted_blocks.remove(cid);
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream> Default for Bitswap<TSubstream> {
|
||||
@ -49,7 +104,7 @@ impl<TSubstream> NetworkBehaviour for Bitswap<TSubstream>
|
||||
where
|
||||
TSubstream: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type ProtocolsHandler = OneShotHandler<TSubstream, BitswapConfig, Message, InnerMessage>;
|
||||
type ProtocolsHandler = OneShotHandler<TSubstream, BitswapConfig, Message<O>, InnerMessage>;
|
||||
type OutEvent = BitswapEvent;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
@ -61,11 +116,20 @@ where
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, peer_id: PeerId, _: ConnectedPoint) {
|
||||
self.ledger.peer_connected(peer_id);
|
||||
let ledger = Ledger::new();
|
||||
let mut message = Message::new();
|
||||
for (cid, priority) in &self.wanted_blocks {
|
||||
message.want_block(cid, *priority);
|
||||
}
|
||||
self.events.push_back(NetworkBehaviourAction::SendEvent {
|
||||
peer_id: peer_id.clone(),
|
||||
event: message,
|
||||
});
|
||||
self.peers.insert(peer_id, ledger);
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
|
||||
self.ledger.peer_disconnected(peer_id);
|
||||
self.peers.remove(peer_id);
|
||||
}
|
||||
|
||||
fn inject_node_event(
|
||||
@ -73,15 +137,39 @@ where
|
||||
source: PeerId,
|
||||
event: InnerMessage,
|
||||
) {
|
||||
// We ignore successful send events.
|
||||
let message = match event {
|
||||
InnerMessage::Rx(message) => message,
|
||||
InnerMessage::Sent => return,
|
||||
InnerMessage::Rx(message) => {
|
||||
message
|
||||
},
|
||||
InnerMessage::Tx => {
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
for event in self.ledger.receive_message(&source, message) {
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
|
||||
// Update the ledger.
|
||||
let ledger = self.peers.get_mut(&source).expect("Peer not in ledger?!");
|
||||
ledger.update_incoming_stats(&message);
|
||||
|
||||
// Process incoming messages.
|
||||
for block in message.blocks() {
|
||||
// Cancel the block.
|
||||
self.cancel_block(&block.cid());
|
||||
// Add block to received blocks
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
BitswapEvent::Block {
|
||||
block: block.to_owned(),
|
||||
}));
|
||||
}
|
||||
for (cid, priority) in message.want() {
|
||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
BitswapEvent::Want {
|
||||
peer_id: source.clone(),
|
||||
cid: cid.to_owned(),
|
||||
priority: *priority,
|
||||
}));
|
||||
}
|
||||
// TODO: Remove cancelled `Want` events from the queue.
|
||||
// TODO: Remove cancelled blocks from `SendEvent`.
|
||||
}
|
||||
|
||||
fn poll(
|
||||
@ -89,7 +177,13 @@ where
|
||||
_: &mut PollParameters,
|
||||
) -> Async<NetworkBehaviourAction<
|
||||
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
|
||||
// TODO concat messages to same destination to reduce traffic.
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
if let NetworkBehaviourAction::SendEvent { peer_id, event } = &event {
|
||||
let ledger = self.peers.get_mut(&peer_id)
|
||||
.expect("Peer not in ledger?!");
|
||||
ledger.update_outgoing_stats(&event);
|
||||
}
|
||||
return Async::Ready(event);
|
||||
}
|
||||
|
||||
@ -98,16 +192,17 @@ where
|
||||
}
|
||||
|
||||
/// Transmission between the `OneShotHandler` and the `BitswapHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum InnerMessage {
|
||||
/// We received a `Message` from a remote.
|
||||
Rx(Message),
|
||||
Rx(Message<I>),
|
||||
/// We successfully sent a `Message`.
|
||||
Sent,
|
||||
Tx,
|
||||
}
|
||||
|
||||
impl From<Message> for InnerMessage {
|
||||
impl From<Message<I>> for InnerMessage {
|
||||
#[inline]
|
||||
fn from(message: Message) -> InnerMessage {
|
||||
fn from(message: Message<I>) -> InnerMessage {
|
||||
InnerMessage::Rx(message)
|
||||
}
|
||||
}
|
||||
@ -115,6 +210,6 @@ impl From<Message> for InnerMessage {
|
||||
impl From<()> for InnerMessage {
|
||||
#[inline]
|
||||
fn from(_: ()) -> InnerMessage {
|
||||
InnerMessage::Sent
|
||||
InnerMessage::Tx
|
||||
}
|
||||
}
|
||||
|
@ -3,125 +3,13 @@ use crate::bitswap::protobuf_structs::bitswap as proto;
|
||||
use libp2p::PeerId;
|
||||
use protobuf::{ProtobufError, Message as ProtobufMessage};
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
pub type Priority = u8;
|
||||
|
||||
/// The Ledger contains all the state of all transactions.
|
||||
/// The Ledger contains the history of transactions with a peer.
|
||||
#[derive(Debug)]
|
||||
pub struct Ledger {
|
||||
peers: HashMap<PeerId, PeerLedger>,
|
||||
wanted_blocks: HashMap<Cid, Priority>,
|
||||
}
|
||||
|
||||
impl Ledger {
|
||||
/// Creates a new `Ledger`.
|
||||
pub fn new() -> Ledger {
|
||||
Ledger {
|
||||
peers: HashMap::new(),
|
||||
wanted_blocks: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `PeerLedger` for `PeerId`.
|
||||
#[allow(unused)]
|
||||
pub fn peer_ledger(&self, peer_id: &PeerId) -> &PeerLedger {
|
||||
self.peers.get(peer_id).expect("Peer not in ledger?!")
|
||||
}
|
||||
|
||||
/// Creates a new ledger entry for the peer and sends our want list.
|
||||
pub fn peer_connected(&mut self, peer_id: PeerId) {
|
||||
// TODO: load stats from previous interactions
|
||||
let mut ledger = PeerLedger::new();
|
||||
for (cid, priority) in &self.wanted_blocks {
|
||||
ledger.want_block(cid, *priority);
|
||||
}
|
||||
self.peers.insert(peer_id, ledger);
|
||||
}
|
||||
|
||||
/// Removes the ledger of a disconnected peer.
|
||||
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
|
||||
// TODO: persist stats for future interactions
|
||||
self.peers.remove(peer_id);
|
||||
}
|
||||
|
||||
/// Queues the block to be sent to the peer.
|
||||
pub fn send_block(&mut self, peer_id: &PeerId, block: Block) {
|
||||
let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!");
|
||||
ledger.add_block(block);
|
||||
}
|
||||
|
||||
/// Adds the block to our want list and updates all peers.
|
||||
pub fn want_block(&mut self, cid: Cid, priority: u8) {
|
||||
for (_peer_id, ledger) in self.peers.iter_mut() {
|
||||
ledger.want_block(&cid, priority);
|
||||
}
|
||||
self.wanted_blocks.insert(cid, priority);
|
||||
}
|
||||
|
||||
/// Removes the block from our want list and updates all peers.
|
||||
pub fn cancel_block(&mut self, cid: &Cid) {
|
||||
for (_peer_id, ledger) in self.peers.iter_mut() {
|
||||
ledger.cancel_block(cid);
|
||||
}
|
||||
self.wanted_blocks.remove(cid);
|
||||
}
|
||||
|
||||
/// Parses and processes the message.
|
||||
///
|
||||
/// If a block was received it adds it to the received blocks queue and
|
||||
/// cancels the request for the block.
|
||||
/// If a want was received it adds it to the received wants queue for
|
||||
/// processing through a `Strategy`.
|
||||
/// If a want was cancelled it removes it from the received wants queue.
|
||||
pub fn receive_message(&mut self, peer_id: &PeerId, message: Message) -> Vec<BitswapEvent> {
|
||||
let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!");
|
||||
ledger.receive_message(&message);
|
||||
for cid in message.cancel() {
|
||||
// If a previous block was queued but has not been sent yet, remove it
|
||||
// from the queue.
|
||||
ledger.remove_block(cid);
|
||||
// TODO: If the bitswap strategy has not processed the request yet, remove
|
||||
// it from the queue.
|
||||
//self.received_wants.drain_filter(|(peer_id2, cid2, _)| {
|
||||
// peer_id == peer_id2 && cid == cid2
|
||||
//});
|
||||
}
|
||||
// Queue new requests
|
||||
let mut events = Vec::new();
|
||||
for block in message.blocks() {
|
||||
// Add block to received blocks
|
||||
events.push(BitswapEvent::Block {
|
||||
block: block.to_owned(),
|
||||
});
|
||||
// Cancel the block.
|
||||
self.cancel_block(&block.cid());
|
||||
}
|
||||
for (cid, priority) in message.want() {
|
||||
events.push(BitswapEvent::Want {
|
||||
peer_id: peer_id.to_owned(),
|
||||
cid: cid.to_owned(),
|
||||
priority: *priority,
|
||||
});
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
/// Sends all queued messages.
|
||||
pub fn send_messages(&mut self) -> Vec<(PeerId, Vec<u8>)>{
|
||||
let mut messages = Vec::new();
|
||||
for (peer_id, ledger) in self.peers.iter_mut() {
|
||||
let message = ledger.send_message();
|
||||
if message.is_some() {
|
||||
messages.push((peer_id.to_owned(), message.unwrap()));
|
||||
}
|
||||
}
|
||||
messages
|
||||
}
|
||||
}
|
||||
|
||||
/// The LedgerEntry contains all the state of all transactions with a peer.
|
||||
#[derive(Debug)]
|
||||
pub struct PeerLedger {
|
||||
/// The number of blocks sent to the peer.
|
||||
sent_blocks: usize,
|
||||
/// The number of blocks received from the peer.
|
||||
@ -130,67 +18,42 @@ pub struct PeerLedger {
|
||||
sent_want_list: HashMap<Cid, Priority>,
|
||||
/// The list of wanted blocks received from the peer.
|
||||
received_want_list: HashMap<Cid, Priority>,
|
||||
/// The next message to send to the peer.
|
||||
queued_message: Option<Message>,
|
||||
}
|
||||
|
||||
impl PeerLedger {
|
||||
impl Ledger {
|
||||
/// Creates a new `PeerLedger`.
|
||||
fn new() -> Self {
|
||||
PeerLedger {
|
||||
pub fn new() -> Self {
|
||||
Ledger {
|
||||
sent_blocks: 0,
|
||||
received_blocks: 0,
|
||||
sent_want_list: HashMap::new(),
|
||||
received_want_list: HashMap::new(),
|
||||
queued_message: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a block to the queued message.
|
||||
fn add_block(&mut self, block: Block) {
|
||||
if self.queued_message.is_none() {
|
||||
self.queued_message = Some(Message::new());
|
||||
}
|
||||
self.queued_message.as_mut().unwrap().add_block(block);
|
||||
pub fn send_block(&mut self, block: Block) -> Message<O> {
|
||||
let mut message = Message::new();
|
||||
message.add_block(block);
|
||||
message
|
||||
}
|
||||
|
||||
/// Removes a block from the queued message.
|
||||
fn remove_block(&mut self, cid: &Cid) {
|
||||
if self.queued_message.is_none() {
|
||||
self.queued_message = Some(Message::new());
|
||||
}
|
||||
self.queued_message.as_mut().unwrap().remove_block(cid);
|
||||
pub fn want_block(&mut self, cid: &Cid, priority: Priority) -> Message<O> {
|
||||
let mut message = Message::new();
|
||||
message.want_block(cid, priority);
|
||||
message
|
||||
}
|
||||
|
||||
/// Adds a block to the want list.
|
||||
fn want_block(&mut self, cid: &Cid, priority: Priority) {
|
||||
if self.queued_message.is_none() {
|
||||
self.queued_message = Some(Message::new());
|
||||
}
|
||||
self.queued_message.as_mut().unwrap().want_block(cid, priority);
|
||||
}
|
||||
|
||||
/// Removes the block from the want list.
|
||||
fn cancel_block(&mut self, cid: &Cid) {
|
||||
if self.queued_message.is_some() {
|
||||
self.queued_message.as_mut().unwrap().soft_cancel_block(cid);
|
||||
}
|
||||
pub fn cancel_block(&mut self, cid: &Cid) -> Option<Message<O>> {
|
||||
if self.sent_want_list.contains_key(cid) {
|
||||
if self.queued_message.is_none() {
|
||||
self.queued_message = Some(Message::new());
|
||||
}
|
||||
self.queued_message.as_mut().unwrap().cancel_block(cid);
|
||||
let mut message = Message::new();
|
||||
message.cancel_block(cid);
|
||||
Some(message)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Finalizes the message and returns it's contents.
|
||||
///
|
||||
/// Updates the number of sent blocks and the sent want list entries.
|
||||
fn send_message(&mut self) -> Option<Vec<u8>> {
|
||||
if self.queued_message.is_none() {
|
||||
return None;
|
||||
}
|
||||
let message = self.queued_message.take().unwrap();
|
||||
pub fn update_outgoing_stats(&mut self, message: &Message<O>) {
|
||||
self.sent_blocks += message.blocks.len();
|
||||
for cid in message.cancel() {
|
||||
self.sent_want_list.remove(cid);
|
||||
@ -198,13 +61,9 @@ impl PeerLedger {
|
||||
for (cid, priority) in message.want() {
|
||||
self.sent_want_list.insert(cid.to_owned(), *priority);
|
||||
}
|
||||
Some(message.into_bytes())
|
||||
}
|
||||
|
||||
/// Parses a message.
|
||||
///
|
||||
/// Updates the number of received blocks and the received want list entries.
|
||||
fn receive_message(&mut self, message: &Message) {
|
||||
pub fn update_incoming_stats(&mut self, message: &Message<I>) {
|
||||
self.received_blocks += message.blocks.len();
|
||||
for cid in message.cancel() {
|
||||
self.received_want_list.remove(cid);
|
||||
@ -227,9 +86,16 @@ impl PeerLedger {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct I;
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct O;
|
||||
|
||||
/// A bitswap message.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Message {
|
||||
pub struct Message<T> {
|
||||
/// Message tag
|
||||
_phantom_data: PhantomData<T>,
|
||||
/// List of wanted blocks.
|
||||
want: HashMap<Cid, Priority>,
|
||||
/// List of blocks to cancel.
|
||||
@ -240,10 +106,11 @@ pub struct Message {
|
||||
blocks: Vec<Block>,
|
||||
}
|
||||
|
||||
impl Message {
|
||||
impl<T> Message<T> {
|
||||
/// Creates a new bitswap message.
|
||||
pub fn new() -> Self {
|
||||
Message {
|
||||
_phantom_data: PhantomData,
|
||||
want: HashMap::new(),
|
||||
cancel: Vec::new(),
|
||||
full: false,
|
||||
@ -272,6 +139,7 @@ impl Message {
|
||||
}
|
||||
|
||||
/// Removes the block from the message.
|
||||
#[allow(unused)]
|
||||
pub fn remove_block(&mut self, cid: &Cid) {
|
||||
self.blocks.drain_filter(|block| &block.cid() == cid);
|
||||
}
|
||||
@ -287,28 +155,31 @@ impl Message {
|
||||
}
|
||||
|
||||
/// Removes the block from the want list.
|
||||
pub fn soft_cancel_block(&mut self, cid: &Cid) {
|
||||
#[allow(unused)]
|
||||
pub fn remove_want_block(&mut self, cid: &Cid) {
|
||||
self.want.remove(cid);
|
||||
}
|
||||
}
|
||||
|
||||
impl Message<O> {
|
||||
/// Turns this `Message` into a message that can be sent to a substream.
|
||||
pub fn into_bytes(self) -> Vec<u8> {
|
||||
pub fn into_bytes(&self) -> Vec<u8> {
|
||||
let mut proto = proto::Message::new();
|
||||
let mut wantlist = proto::Message_Wantlist::new();
|
||||
for (cid, priority) in self.want {
|
||||
for (cid, priority) in self.want() {
|
||||
let mut entry = proto::Message_Wantlist_Entry::new();
|
||||
entry.set_block(cid.to_bytes());
|
||||
entry.set_priority(priority as _);
|
||||
entry.set_priority(*priority as _);
|
||||
wantlist.mut_entries().push(entry);
|
||||
}
|
||||
for cid in self.cancel {
|
||||
for cid in self.cancel() {
|
||||
let mut entry = proto::Message_Wantlist_Entry::new();
|
||||
entry.set_block(cid.to_bytes());
|
||||
entry.set_cancel(true);
|
||||
wantlist.mut_entries().push(entry);
|
||||
}
|
||||
proto.set_wantlist(wantlist);
|
||||
for block in self.blocks {
|
||||
for block in self.blocks() {
|
||||
let mut payload = proto::Message_Block::new();
|
||||
payload.set_prefix(block.cid().prefix().as_bytes());
|
||||
payload.set_data(block.data().to_vec());
|
||||
@ -319,6 +190,9 @@ impl Message {
|
||||
.expect("there is no situation in which the protobuf message can be invalid")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Message<I> {
|
||||
/// Creates a `Message` from bytes that were received from a substream.
|
||||
pub fn from_bytes(bytes: &Vec<u8>) -> Result<Self, ProtobufError> {
|
||||
let proto: proto::Message = protobuf::parse_from_bytes(bytes)?;
|
||||
|
@ -1,70 +1,10 @@
|
||||
//! Bitswap protocol implementation
|
||||
use crate::block::Cid;
|
||||
use crate::p2p::{create_swarm, SecioKeyPair, Swarm};
|
||||
use futures::prelude::*;
|
||||
use parity_multihash::Multihash;
|
||||
use std::io::Error;
|
||||
|
||||
mod behaviour;
|
||||
mod ledger;
|
||||
pub mod behaviour;
|
||||
pub mod ledger;
|
||||
mod protobuf_structs;
|
||||
pub mod strategy;
|
||||
mod protocol;
|
||||
|
||||
use self::ledger::Ledger;
|
||||
pub use self::behaviour::Bitswap;
|
||||
pub use self::ledger::{BitswapEvent, Priority};
|
||||
pub use self::strategy::{AltruisticStrategy, Strategy};
|
||||
|
||||
pub struct Bitswap<S: Strategy> {
|
||||
swarm: Swarm,
|
||||
ledger: Ledger,
|
||||
_strategy: S,
|
||||
}
|
||||
|
||||
impl<S: Strategy> Bitswap<S> {
|
||||
pub fn new(local_private_key: SecioKeyPair, strategy: S) -> Self {
|
||||
Bitswap {
|
||||
swarm: create_swarm(local_private_key),
|
||||
ledger: Ledger::new(),
|
||||
_strategy: strategy,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn want_block(&mut self, cid: Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.swarm.get_providers(hash);
|
||||
|
||||
self.ledger.want_block(cid, 1);
|
||||
}
|
||||
|
||||
pub fn provide_block(&mut self, cid: &Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.swarm.add_providing(hash);
|
||||
}
|
||||
|
||||
pub fn stop_providing_block(&mut self, cid: &Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.swarm.remove_providing(&hash);
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Strategy> Stream for Bitswap<S> {
|
||||
type Item = ();
|
||||
type Error = Error;
|
||||
|
||||
// TODO: hookup ledger and strategy properly
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
println!("polling bitswap");
|
||||
self.ledger.send_messages();
|
||||
loop {
|
||||
match self.swarm.poll().expect("Error while polling swarm") {
|
||||
Async::Ready(Some(event)) => {
|
||||
println!("Result: {:#?}", event);
|
||||
return Ok(Async::Ready(Some(())));
|
||||
},
|
||||
Async::Ready(None) | Async::NotReady => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade};
|
||||
use protobuf::ProtobufError;
|
||||
use std::{io, iter};
|
||||
use tokio::prelude::*;
|
||||
use crate::bitswap::ledger::Message;
|
||||
use crate::bitswap::ledger::{Message, I, O};
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct BitswapConfig {}
|
||||
@ -34,7 +34,7 @@ impl<TSocket> InboundUpgrade<TSocket> for BitswapConfig
|
||||
where
|
||||
TSocket: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = Message;
|
||||
type Output = Message<I>;
|
||||
type Error = BitswapError;
|
||||
type Future = upgrade::ReadOneThen<TSocket, fn(Vec<u8>) -> Result<Self::Output, Self::Error>>;
|
||||
|
||||
@ -85,7 +85,8 @@ impl std::error::Error for BitswapError {
|
||||
}
|
||||
}
|
||||
}
|
||||
impl UpgradeInfo for Message {
|
||||
|
||||
impl UpgradeInfo for Message<O> {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
@ -95,7 +96,7 @@ impl UpgradeInfo for Message {
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSocket> OutboundUpgrade<TSocket> for Message
|
||||
impl<TSocket> OutboundUpgrade<TSocket> for Message<O>
|
||||
where
|
||||
TSocket: AsyncRead + AsyncWrite,
|
||||
{
|
||||
|
@ -1,11 +1,12 @@
|
||||
use crate::block::Cid;
|
||||
use crate::bitswap::Priority;
|
||||
use crate::repo::Repo;
|
||||
use libp2p::PeerId;
|
||||
use crate::bitswap::ledger::{Ledger, Priority};
|
||||
use crate::p2p::Swarm;
|
||||
|
||||
pub trait Strategy {
|
||||
fn new(repo: Repo) -> Self;
|
||||
fn receive_want(&mut self, ledger: &mut Ledger, peer_id: &PeerId, cid: Cid, priority: Priority);
|
||||
fn receive_want(&mut self, swarm: &mut Swarm, source: PeerId, cid: Cid, priority: Priority);
|
||||
}
|
||||
|
||||
pub struct AltruisticStrategy {
|
||||
@ -21,14 +22,14 @@ impl Strategy for AltruisticStrategy {
|
||||
|
||||
fn receive_want(
|
||||
&mut self,
|
||||
ledger: &mut Ledger,
|
||||
peer_id: &PeerId,
|
||||
swarm: &mut Swarm,
|
||||
source: PeerId,
|
||||
cid: Cid,
|
||||
_priority: Priority,
|
||||
) {
|
||||
let block = self.repo.get(&cid);
|
||||
if block.is_some() {
|
||||
ledger.send_block(peer_id, block.unwrap());
|
||||
swarm.send_block(source, block.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
17
src/lib.rs
17
src/lib.rs
@ -10,16 +10,18 @@ mod future;
|
||||
mod p2p;
|
||||
mod repo;
|
||||
|
||||
use self::bitswap::{Bitswap, strategy::AltruisticStrategy, Strategy};
|
||||
use self::bitswap::{strategy::AltruisticStrategy, Strategy};
|
||||
pub use self::block::{Block, Cid};
|
||||
use self::future::BlockFuture;
|
||||
use self::p2p::{create_swarm, Swarm};
|
||||
use self::repo::Repo;
|
||||
|
||||
/// Ipfs struct creates a new IPFS node and is the main entry point
|
||||
/// for interacting with IPFS.
|
||||
pub struct Ipfs {
|
||||
repo: Repo,
|
||||
bitswap: Bitswap<AltruisticStrategy>,
|
||||
strategy: AltruisticStrategy,
|
||||
swarm: Swarm,
|
||||
}
|
||||
|
||||
impl Ipfs {
|
||||
@ -28,25 +30,26 @@ impl Ipfs {
|
||||
let repo = Repo::new();
|
||||
let local_key = SecioKeyPair::ed25519_generated().unwrap();
|
||||
let strategy = AltruisticStrategy::new(repo.clone());
|
||||
let bitswap = Bitswap::new(local_key, strategy);
|
||||
let swarm = create_swarm(local_key);
|
||||
|
||||
Ipfs {
|
||||
repo,
|
||||
bitswap,
|
||||
strategy,
|
||||
swarm,
|
||||
}
|
||||
}
|
||||
|
||||
/// Puts a block into the ipfs repo.
|
||||
pub fn put_block(&mut self, block: Block) -> Cid {
|
||||
let cid = self.repo.put(block);
|
||||
self.bitswap.provide_block(&cid);
|
||||
self.swarm.provide_block(&cid);
|
||||
cid
|
||||
}
|
||||
|
||||
/// Retrives a block from the ipfs repo.
|
||||
pub fn get_block(&mut self, cid: Cid) -> BlockFuture {
|
||||
if !self.repo.contains(&cid) {
|
||||
self.bitswap.want_block(cid.clone());
|
||||
self.swarm.want_block(cid.clone());
|
||||
}
|
||||
BlockFuture::new(self.repo.clone(), cid)
|
||||
}
|
||||
@ -54,7 +57,7 @@ impl Ipfs {
|
||||
/// Remove block from the ipfs repo.
|
||||
pub fn remove_block(&mut self, cid: Cid) {
|
||||
self.repo.remove(&cid);
|
||||
self.bitswap.stop_providing_block(&cid);
|
||||
self.swarm.stop_providing_block(&cid);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,12 @@
|
||||
use libp2p::PeerId;
|
||||
use crate::bitswap::{Bitswap, BitswapEvent};
|
||||
use crate::block::{Block, Cid};
|
||||
use libp2p::{NetworkBehaviour, PeerId};
|
||||
use libp2p::core::swarm::NetworkBehaviourEventProcess;
|
||||
use libp2p::core::muxing::{StreamMuxerBox, SubstreamRef};
|
||||
use libp2p::kad::Kademlia;
|
||||
use libp2p::kad::{Kademlia, KademliaOut as KademliaEvent};
|
||||
use parity_multihash::Multihash;
|
||||
use std::sync::Arc;
|
||||
use tokio::prelude::*;
|
||||
|
||||
/// IPFS bootstrap nodes.
|
||||
const BOOTSTRAP_NODES: &[(&'static str, &'static str)] = &[
|
||||
@ -44,23 +49,105 @@ const BOOTSTRAP_NODES: &[(&'static str, &'static str)] = &[
|
||||
];
|
||||
|
||||
/// Behaviour type.
|
||||
pub type TBehaviour = Kademlia<SubstreamRef<Arc<StreamMuxerBox>>>;
|
||||
#[derive(NetworkBehaviour)]
|
||||
pub struct Behaviour<TSubstream: AsyncRead + AsyncWrite> {
|
||||
kademlia: Kademlia<TSubstream>,
|
||||
bitswap: Bitswap<TSubstream>,
|
||||
}
|
||||
|
||||
/// Create a Kademlia behaviour with the IPFS bootstrap nodes.
|
||||
pub fn build_behaviour(local_peer_id: PeerId) -> TBehaviour {
|
||||
// Note that normally the Kademlia process starts by performing lots of
|
||||
// request in order to insert our local node in the DHT. However here we use
|
||||
// `without_init` because this example is very ephemeral and we don't want
|
||||
// to pollute the DHT. In a real world application, you want to use `new`
|
||||
// instead.
|
||||
let mut behaviour = Kademlia::without_init(local_peer_id);
|
||||
impl<TSubstream: AsyncRead + AsyncWrite>
|
||||
NetworkBehaviourEventProcess<KademliaEvent> for
|
||||
Behaviour<TSubstream>
|
||||
{
|
||||
fn inject_event(&mut self, event: KademliaEvent) {
|
||||
match event {
|
||||
KademliaEvent::Discovered { .. } => {
|
||||
|
||||
for (identity, location) in BOOTSTRAP_NODES {
|
||||
behaviour.add_address(
|
||||
&identity.parse().unwrap(),
|
||||
location.parse().unwrap(),
|
||||
);
|
||||
}
|
||||
KademliaEvent::FindNodeResult { .. } => {
|
||||
|
||||
}
|
||||
KademliaEvent::GetProvidersResult {
|
||||
provider_peers,
|
||||
..
|
||||
} => {
|
||||
for peer in provider_peers {
|
||||
self.bitswap.connect(peer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite>
|
||||
NetworkBehaviourEventProcess<BitswapEvent> for
|
||||
Behaviour<TSubstream>
|
||||
{
|
||||
fn inject_event(&mut self, event: BitswapEvent) {
|
||||
match event {
|
||||
BitswapEvent::Block { block } => {
|
||||
println!("Received block with contents: '{:?}'",
|
||||
String::from_utf8_lossy(&block.data()));
|
||||
}
|
||||
BitswapEvent::Want { peer_id, cid, priority } => {
|
||||
println!("Peer {:?} wants block {:?} with priority {}",
|
||||
peer_id, cid.to_string(), priority);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TSubstream: AsyncRead + AsyncWrite> Behaviour<TSubstream>
|
||||
{
|
||||
/// Create a Kademlia behaviour with the IPFS bootstrap nodes.
|
||||
pub fn new(local_peer_id: PeerId) -> Self {
|
||||
// Note that normally the Kademlia process starts by performing lots of
|
||||
// request in order to insert our local node in the DHT. However here we use
|
||||
// `without_init` because this example is very ephemeral and we don't want
|
||||
// to pollute the DHT. In a real world application, you want to use `new`
|
||||
// instead.
|
||||
let mut kademlia = Kademlia::without_init(local_peer_id);
|
||||
|
||||
for (identity, location) in BOOTSTRAP_NODES {
|
||||
kademlia.add_address(
|
||||
&identity.parse().unwrap(),
|
||||
location.parse().unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let bitswap = Bitswap::new();
|
||||
|
||||
Behaviour {
|
||||
kademlia,
|
||||
bitswap,
|
||||
}
|
||||
}
|
||||
|
||||
behaviour
|
||||
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
|
||||
self.bitswap.send_block(peer_id, block);
|
||||
}
|
||||
|
||||
pub fn want_block(&mut self, cid: Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.kademlia.get_providers(hash);
|
||||
self.bitswap.want_block(cid, 1);
|
||||
}
|
||||
|
||||
pub fn provide_block(&mut self, cid: &Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.kademlia.add_providing(hash);
|
||||
}
|
||||
|
||||
pub fn stop_providing_block(&mut self, cid: &Cid) {
|
||||
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
|
||||
self.kademlia.remove_providing(&hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Behaviour type.
|
||||
pub type TBehaviour = Behaviour<SubstreamRef<Arc<StreamMuxerBox>>>;
|
||||
|
||||
/// Create a IPFS behaviour with the IPFS bootstrap nodes.
|
||||
pub fn build_behaviour(local_peer_id: PeerId) -> TBehaviour {
|
||||
Behaviour::new(local_peer_id)
|
||||
}
|
||||
|
@ -7,9 +7,7 @@ mod transport;
|
||||
pub type Swarm = libp2p::core::Swarm<transport::TTransport, behaviour::TBehaviour>;
|
||||
|
||||
/// Creates a new IPFS swarm.
|
||||
pub fn create_swarm(
|
||||
local_private_key: SecioKeyPair,
|
||||
) -> Swarm {
|
||||
pub fn create_swarm(local_private_key: SecioKeyPair) -> Swarm {
|
||||
let local_peer_id = local_private_key.to_peer_id();
|
||||
|
||||
// Set up an encrypted TCP transport over the Mplex protocol.
|
||||
|
Loading…
x
Reference in New Issue
Block a user