Initial ledger, strategy and dht handling.

This commit is contained in:
David Craven 2019-02-03 02:10:34 +01:00
parent 701dcb1d1c
commit ee2ffac612
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
3 changed files with 380 additions and 17 deletions

306
src/bitswap/ledger.rs Normal file
View File

@ -0,0 +1,306 @@
use crate::block::{Block, Cid};
use libp2p::PeerId;
use std::collections::HashMap;
pub type Priority = u8;
/// The Ledger contains all the state of all transactions.
#[derive(Debug)]
pub struct Ledger {
peers: HashMap<PeerId, PeerLedger>,
received_blocks: Vec<Block>,
received_wants: Vec<(PeerId, Cid, Priority)>,
wanted_blocks: HashMap<Cid, Priority>,
}
impl Ledger {
/// Creates a new `Ledger`.
pub fn new() -> Ledger {
Ledger {
peers: HashMap::new(),
wanted_blocks: HashMap::new(),
received_blocks: Vec::new(),
received_wants: Vec::new(),
}
}
/// Creates a new ledger entry for the peer and sends our want list.
pub fn peer_connected(&mut self, peer_id: PeerId) {
// TODO: load stats from previous interactions
let ledger = PeerLedger::from_wanted_blocks(&self.wanted_blocks);
self.peers.insert(peer_id, ledger);
}
/// Removes the ledger of a disconnected peer.
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
// TODO: persist stats for future interactions
self.peers.remove(peer_id);
}
/// Queues the block to be sent to the peer.
pub fn send_block(&mut self, peer_id: &PeerId, block: Block) {
let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!");
ledger.add_block(block);
}
/// Adds the block to our want list and updates all peers.
pub fn want_block(&mut self, cid: Cid, priority: u8) {
for (_peer_id, ledger) in self.peers.iter_mut() {
ledger.want_block(&cid, priority);
}
self.wanted_blocks.insert(cid, priority);
}
/// Removes the block from our want list and updates all peers.
pub fn cancel_block(&mut self, cid: &Cid) {
for (_peer_id, ledger) in self.peers.iter_mut() {
ledger.cancel_block(cid);
}
self.wanted_blocks.remove(cid);
}
/// Parses and processes the message.
///
/// If a block was received it adds it to the received blocks queue and
/// cancels the request for the block.
/// If a want was received it adds it to the received wants queue for
/// processing through a `Strategy`.
/// If a want was cancelled it removes it from the received wants queue.
pub fn receive_message(&mut self, peer_id: &PeerId, bytes: Vec<u8>) {
let ledger = self.peers.get_mut(peer_id).expect("Peer not in ledger?!");
let message = match ledger.receive_message(bytes) {
Ok(message) => message,
Err(err) => {
println!("{}", err);
return;
}
};
for cid in message.cancel() {
// If a previous block was queued but has not been sent yet, remove it
// from the queue.
ledger.remove_block(cid);
// If the bitswap strategy has not processed the request yet, remove
// it from the queue.
self.received_wants.drain_filter(|(peer_id2, cid2, _)| {
peer_id == peer_id2 && cid == cid2
});
}
// Queue new requests
for (cid, priority) in message.want() {
self.received_wants.push((peer_id.to_owned(), cid.to_owned(), *priority));
}
for block in message.blocks() {
// Add block to received blocks
self.received_blocks.push(block.to_owned());
// Cancel the block.
self.cancel_block(&block.cid());
}
}
/// Sends all queued messages.
pub fn send_messages(&mut self) {
for (_peer_id, ledger) in self.peers.iter_mut() {
// TODO
let _bytes_to_send = ledger.send_message();
}
}
}
/// The LedgerEntry contains all the state of all transactions with a peer.
#[derive(Debug)]
pub struct PeerLedger {
/// The number of blocks sent to the peer.
sent_blocks: usize,
/// The number of blocks received from the peer.
received_blocks: usize,
/// The list of wanted blocks sent to the peer.
sent_want_list: HashMap<Cid, Priority>,
/// The list of wanted blocks received from the peer.
received_want_list: HashMap<Cid, Priority>,
/// The next message to send to the peer.
queued_message: Option<Message>,
}
impl PeerLedger {
/// Creates a new `PeerLedger`.
fn new() -> Self {
PeerLedger {
sent_blocks: 0,
received_blocks: 0,
sent_want_list: HashMap::new(),
received_want_list: HashMap::new(),
queued_message: None,
}
}
/// Creates a new `PeerLedger` and queues a message with the wanted blocks.
fn from_wanted_blocks(wanted_blocks: &HashMap<Cid, Priority>) -> Self {
let message = if wanted_blocks.is_empty() {
None
} else {
Some(Message::from_wanted(wanted_blocks.to_owned()))
};
let mut ledger = PeerLedger::new();
ledger.queued_message = message;
ledger
}
/// Adds a block to the queued message.
fn add_block(&mut self, block: Block) {
if self.queued_message.is_none() {
self.queued_message = Some(Message::new());
}
self.queued_message.as_mut().unwrap().add_block(block);
}
/// Removes a block from the queued message.
fn remove_block(&mut self, cid: &Cid) {
if self.queued_message.is_none() {
self.queued_message = Some(Message::new());
}
self.queued_message.as_mut().unwrap().remove_block(cid);
}
/// Adds a block to the want list.
fn want_block(&mut self, cid: &Cid, priority: Priority) {
if self.queued_message.is_none() {
self.queued_message = Some(Message::new());
}
self.queued_message.as_mut().unwrap().want_block(cid, priority);
}
/// Removes the block from the want list.
fn cancel_block(&mut self, cid: &Cid) {
if self.sent_want_list.contains_key(cid) {
self.queued_message.as_mut().unwrap().cancel_block(cid);
} else {
self.queued_message.as_mut().unwrap().soft_cancel_block(cid);
}
}
/// Finalizes the message and returns it's contents.
///
/// Updates the number of sent blocks and the sent want list entries.
fn send_message(&mut self) -> Option<Vec<u8>> {
if self.queued_message.is_none() {
return None;
}
let message = self.queued_message.take().unwrap();
self.sent_blocks += message.blocks.len();
for (cid, priority) in message.want() {
self.sent_want_list.insert(cid.to_owned(), *priority);
}
Some(message.into_bytes())
}
/// Parses a message.
///
/// Updates the number of received blocks and the received want list entries.
fn receive_message(&mut self, bytes: Vec<u8>) -> Result<Message, std::io::Error> {
let message = Message::from_bytes(&bytes)?;
self.received_blocks += message.blocks.len();
for cid in message.cancel() {
self.received_want_list.remove(cid);
}
for (cid, priority) in message.want() {
self.received_want_list.insert(cid.to_owned(), *priority);
}
Ok(message)
}
}
/// A bitswap message.
#[derive(Debug, Clone)]
pub struct Message {
/// List of wanted blocks.
want: HashMap<Cid, Priority>,
/// List of blocks to cancel.
cancel: Vec<Cid>,
/// Wheather it is the full list of wanted blocks.
full: bool,
/// List of blocks to send.
blocks: Vec<Block>,
}
impl Message {
/// Creates a new bitswap message.
pub fn new() -> Self {
Message {
want: HashMap::new(),
cancel: Vec::new(),
full: false,
blocks: Vec::new(),
}
}
/// Creates a new bitswap message from a want list.
pub fn from_wanted(wanted: HashMap<Cid, Priority>) -> Self {
Message {
want: wanted,
cancel: Vec::new(),
full: false,
blocks: Vec::new(),
}
}
/// Returns the list of blocks.
pub fn blocks(&self) -> &Vec<Block> {
&self.blocks
}
/// Returns the list of wanted blocks.
pub fn want(&self) -> &HashMap<Cid, Priority> {
&self.want
}
/// Returns the list of cancelled blocks.
pub fn cancel(&self) -> &Vec<Cid> {
&self.cancel
}
/// Adds a `Block` to the message.
pub fn add_block(&mut self, block: Block) {
self.blocks.push(block);
}
/// Removes the block from the message.
pub fn remove_block(&mut self, cid: &Cid) {
self.blocks.drain_filter(|block| &block.cid() == cid);
}
/// Adds a block to the want list.
pub fn want_block(&mut self, cid: &Cid, priority: Priority) {
self.want.insert(cid.to_owned(), priority);
}
/// Adds a block to the cancel list.
pub fn cancel_block(&mut self, cid: &Cid) {
self.cancel.push(cid.to_owned());
}
/// Removes the block from the want list.
pub fn soft_cancel_block(&mut self, cid: &Cid) {
self.want.remove(cid);
}
/// Turns this `Message` into a message that can be sent to a substream.
pub fn into_bytes(self) -> Vec<u8> {
// TODO
Vec::new()
}
/// Creates a `Message` from bytes that were received from a substream.
pub fn from_bytes(_bytes: &Vec<u8>) -> Result<Self, std::io::Error> {
// TODO
Err(std::io::Error::new(std::io::ErrorKind::Other, "not implemented yet"))
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_ledger_send() {
}
}

View File

@ -1,43 +1,66 @@
//! Bitswap protocol implementation
use cid::Cid;
use crate::p2p::Service;
use crate::block::Cid;
use crate::p2p::{create_swarm, SecioKeyPair, Swarm};
use futures::prelude::*;
use parity_multihash::Multihash;
use std::io::Error;
use std::sync::{Arc, Mutex};
mod behaviour;
//mod behaviour;
mod ledger;
mod protobuf_structs;
mod protocol;
pub mod strategy;
//mod protocol;
#[derive(Clone)]
pub struct Bitswap {
p2p: Arc<Mutex<Service>>,
use self::ledger::Ledger;
pub use self::strategy::{AltruisticStrategy, Strategy};
pub struct Bitswap<S: Strategy> {
swarm: Swarm,
ledger: Ledger,
_strategy: S,
}
impl Bitswap {
pub fn new() -> Self {
impl<S: Strategy> Bitswap<S> {
pub fn new(local_private_key: SecioKeyPair, strategy: S) -> Self {
Bitswap {
p2p: Arc::new(Mutex::new(Service::new())),
swarm: create_swarm(local_private_key),
ledger: Ledger::new(),
_strategy: strategy,
}
}
pub fn get_block(&mut self, cid: Arc<Cid>) {
println!("retriving block");
pub fn want_block(&mut self, cid: Cid) {
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
self.p2p.lock().unwrap().swarm.get_providers(hash);
self.swarm.get_providers(hash);
self.ledger.want_block(cid, 1);
}
pub fn provide_block(&mut self, cid: &Cid) {
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
self.swarm.add_providing(hash);
}
pub fn stop_providing_block(&mut self, cid: &Cid) {
let hash = Multihash::from_bytes(cid.hash.clone()).unwrap();
self.swarm.remove_providing(&hash);
}
}
impl Stream for Bitswap {
impl<S: Strategy> Stream for Bitswap<S> {
type Item = ();
type Error = Error;
// TODO: hookup ledger and strategy properly
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
println!("polling bitswap");
let peer_id = Swarm::local_peer_id(&self.swarm);
self.ledger.peer_connected(peer_id.clone());
self.ledger.peer_disconnected(&peer_id);
self.ledger.send_messages();
self.ledger.receive_message(peer_id, Vec::new());
loop {
match self.p2p.lock().unwrap()
.swarm.poll().expect("Error while polling swarm") {
match self.swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some(event)) => {
println!("Result: {:#?}", event);
return Ok(Async::Ready(Some(())));

34
src/bitswap/strategy.rs Normal file
View File

@ -0,0 +1,34 @@
use crate::block::Cid;
use crate::repo::Repo;
use libp2p::PeerId;
use crate::bitswap::ledger::{Ledger, Priority};
pub trait Strategy {
fn new(repo: Repo) -> Self;
fn receive_want(&mut self, ledger: &mut Ledger, peer_id: &PeerId, cid: Cid, priority: Priority);
}
pub struct AltruisticStrategy {
repo: Repo,
}
impl Strategy for AltruisticStrategy {
fn new(repo: Repo) -> Self {
AltruisticStrategy {
repo,
}
}
fn receive_want(
&mut self,
ledger: &mut Ledger,
peer_id: &PeerId,
cid: Cid,
_priority: Priority,
) {
let block = self.repo.get(&cid);
if block.is_some() {
ledger.send_block(peer_id, block.unwrap());
}
}
}