Add some bitswap boilerplate.

This commit is contained in:
David Craven 2019-02-02 16:59:38 +01:00
parent 31ae0162fa
commit c5e8bfd92b
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
3 changed files with 175 additions and 0 deletions

85
src/bitswap/behaviour.rs Normal file
View File

@ -0,0 +1,85 @@
//! Handles the `/ipfs/bitswap/1.0.0` and `/ipfs/bitswap/1.1.0` protocols. This
//! allows exchanging IPFS blocks.
//!
//! # Usage
//!
//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
//! will allow providing and reciving IPFS blocks.
use futures::prelude::*;
use libp2p::core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::core::protocols_handler::ProtocolsHandler;
use libp2p::{Multiaddr, PeerId};
use std::marker::PhantomData;
use tokio::prelude::*;
/// Network behaviour that handles sending and receiving IPFS blocks.
pub struct Bitswap<TSubstream> {
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// Queue of events to report to the user.
events: Vec<BitswapEvent>,
}
/// Event generated by the `Bitswap` behaviour.
pub enum BitswapEvent {
}
impl<TSubstream> Bitswap<TSubstream> {
/// Creates a `Bitswap`.
pub fn new() -> Self {
Bitswap {
marker: PhantomData,
events: Vec::new(),
}
}
}
impl<TSubstream> Default for Bitswap<TSubstream> {
#[inline]
fn default() -> Self {
Bitswap::new()
}
}
impl<TSubstream> NetworkBehaviour for Bitswap<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = ();
type OutEvent = BitswapEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
()
}
fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_node_event(
&mut self,
source: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
}
fn poll(
&mut self,
_: &mut PollParameters,
) -> Async<NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
if !self.events.is_empty() {
let event = self.events.remove(0);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Async::NotReady
}
}

View File

@ -6,7 +6,9 @@ use parity_multihash::Multihash;
use std::io::Error;
use std::sync::{Arc, Mutex};
mod behaviour;
mod protobuf_structs;
mod protocol;
#[derive(Clone)]
pub struct Bitswap {

88
src/bitswap/protocol.rs Normal file
View File

@ -0,0 +1,88 @@
/// Reperesents a prototype for an upgrade to handle the bitswap protocol.
///
/// The protocol works the following way:
///
/// - TODO
use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::{io, iter};
use tokio::prelude::*;
#[derive(Default, Debug, Copy, Clone)]
pub struct Bitswap;
impl UpgradeInfo for Bitswap {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/ipfs/bitswap/1.0.0", b"/ipfs/bitswap/1.1.0")
}
}
impl<TSocket> InboundUpgrade<TSocket> for Bitswap
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = ();
#[inline]
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
}
}
impl<TSocket> OutboundUpgrade<TSocket> for Bitswap
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = ();
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
}
}
#[cfg(test)]
mod tests {
use tokio_tcp::{TcpListener, TcpStream};
use super::Bitswap;
use futures::{Future, Stream};
use libp2p::core::upgrade;
// TODO: rewrite tests with the MemoryTransport
#[test]
fn want_receive() {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
let server = listener
.incoming()
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| {
upgrade::apply_inbound(c.unwrap(), Bitswap::default())
.map_err(|_| panic!())
});
let client = TcpStream::connect(&listener_addr)
.and_then(|c| {
upgrade::apply_outbound(c, Bitswap::default())
.map_err(|_| panic!())
})
.map(|_| ());
let mut runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(server.select(client).map_err(|_| panic!())).unwrap();
}
#[test]
fn provide_want_send() {
}
}