Run rustfmt.

This commit is contained in:
David Craven 2020-02-23 14:10:51 +01:00
parent edd22040d0
commit 34fab7c774
30 changed files with 465 additions and 432 deletions

View File

@ -1,10 +1,12 @@
use ipfs::{UninitializedIpfs, IpfsOptions, Ipld, Types};
use async_std::task;
use futures::join;
use ipfs::{IpfsOptions, Ipld, Types, UninitializedIpfs};
fn main() {
let options = IpfsOptions::<Types>::default();
env_logger::Builder::new().parse_filters(&options.ipfs_log).init();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();

View File

@ -1,12 +1,15 @@
use std::str::FromStr;
use ipfs::{UninitializedIpfs, IpfsOptions, IpfsPath, TestTypes};
use futures::join;
use async_std::task;
use futures::join;
use ipfs::{IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs};
use std::str::FromStr;
fn main() {
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new().parse_filters(&options.ipfs_log).init();
let path = IpfsPath::from_str("/ipfs/zdpuB1caPcm4QNXeegatVfLQ839Lmprd5zosXGwRUBJHwj66X").unwrap();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
let path =
IpfsPath::from_str("/ipfs/zdpuB1caPcm4QNXeegatVfLQ839Lmprd5zosXGwRUBJHwj66X").unwrap();
task::block_on(async move {
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();

View File

@ -1,10 +1,12 @@
use ipfs::{Block, UninitializedIpfs, IpfsOptions, TestTypes};
use std::convert::TryInto;
use async_std::task;
use ipfs::{Block, IpfsOptions, TestTypes, UninitializedIpfs};
use std::convert::TryInto;
fn main() {
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new().parse_filters(&options.ipfs_log).init();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo
@ -15,7 +17,10 @@ fn main() {
ipfs.put_block(Block::from("block-provide")).await.unwrap();
// Retrive a Block
let block = ipfs.get_block(Block::from("block-want\n").cid()).await.unwrap();
let block = ipfs
.get_block(Block::from("block-want\n").cid())
.await
.unwrap();
let contents: String = block.into();
println!("block contents: {:?}", contents);
@ -23,7 +28,9 @@ fn main() {
ipfs.add("./examples/block.data".into()).await.unwrap();
// Get a file
let path = "/QmSy5pnHk1EnvE5dmJSyFKG5unXLGjPpBuJJCBQkBTvBaW".try_into().unwrap();
let path = "/QmSy5pnHk1EnvE5dmJSyFKG5unXLGjPpBuJJCBQkBTvBaW"
.try_into()
.unwrap();
let file = ipfs.get(path).await.unwrap();
let contents: String = file.into();
println!("file contents: {:?}", contents);

View File

@ -1,10 +1,12 @@
use std::str::FromStr;
use ipfs::{UninitializedIpfs, IpfsOptions, IpfsPath, PeerId, TestTypes};
use async_std::task;
use ipfs::{IpfsOptions, IpfsPath, PeerId, TestTypes, UninitializedIpfs};
use std::str::FromStr;
fn main() {
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new().parse_filters(&options.ipfs_log).init();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo
@ -14,7 +16,10 @@ fn main() {
// Create a Block
let ipfs_path = ipfs.put_dag("block v0".into()).await.unwrap();
// Publish a Block
let ipns_path = ipfs.publish_ipns(&PeerId::random(), &ipfs_path).await.unwrap();
let ipns_path = ipfs
.publish_ipns(&PeerId::random(), &ipfs_path)
.await
.unwrap();
// Resolve a Block
let new_ipfs_path = ipfs.resolve_ipns(&ipns_path).await.unwrap();

View File

@ -1,10 +1,12 @@
use ipfs::{UninitializedIpfs, IpfsOptions, Ipld, Types};
use futures::join;
use async_std::task;
use futures::join;
use ipfs::{IpfsOptions, Ipld, Types, UninitializedIpfs};
fn main() {
let options = IpfsOptions::<Types>::default();
env_logger::Builder::new().parse_filters(&options.ipfs_log).init();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo

View File

@ -5,19 +5,19 @@
//!
//! The `Bitswap` struct implements the `NetworkBehaviour` trait. When used, it
//! will allow providing and reciving IPFS blocks.
use futures::task::Context;
use crate::bitswap::ledger::{Ledger, Message, Priority, I, O};
use crate::bitswap::protocol::BitswapConfig;
use crate::bitswap::strategy::{Strategy, StrategyEvent};
use crate::block::{Block, Cid};
use crate::p2p::SwarmTypes;
use fnv::FnvHashSet;
use futures::task::Context;
use futures::task::Poll;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::protocols_handler::{IntoProtocolsHandler, OneShotHandler, ProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::swarm::protocols_handler::{OneShotHandler, ProtocolsHandler, IntoProtocolsHandler};
use libp2p::{Multiaddr, PeerId};
use std::collections::{HashMap, VecDeque};
use futures::task::Poll;
/// Network behaviour that handles sending and receiving IPFS blocks.
pub struct Bitswap<TSwarmTypes: SwarmTypes> {
@ -53,9 +53,8 @@ impl<TSwarmTypes: SwarmTypes> Bitswap<TSwarmTypes> {
debug!("bitswap: connect");
if self.target_peers.insert(peer_id.clone()) {
debug!(" queuing dial_peer to {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
});
self.events
.push_back(NetworkBehaviourAction::DialPeer { peer_id });
}
debug!("");
}
@ -65,7 +64,9 @@ impl<TSwarmTypes: SwarmTypes> Bitswap<TSwarmTypes> {
/// Called from a Strategy.
pub fn send_block(&mut self, peer_id: PeerId, block: Block) {
debug!("bitswap: send_block");
let ledger = self.connected_peers.get_mut(&peer_id)
let ledger = self
.connected_peers
.get_mut(&peer_id)
.expect("Peer not in ledger?!");
let message = ledger.send_block(block);
debug!(" queuing block for {}", peer_id.to_base58());
@ -120,10 +121,8 @@ impl<TSwarmTypes: SwarmTypes> Bitswap<TSwarmTypes> {
if let Some(event) = message {
let peer_id = peer_id.to_owned();
debug!(" queuing cancel for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id,
event,
});
self.events
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event });
}
}
self.wanted_blocks.remove(cid);
@ -132,8 +131,8 @@ impl<TSwarmTypes: SwarmTypes> Bitswap<TSwarmTypes> {
}
impl<TSwarmTypes> NetworkBehaviour for Bitswap<TSwarmTypes>
where
TSwarmTypes: SwarmTypes,
where
TSwarmTypes: SwarmTypes,
{
type ProtocolsHandler = OneShotHandler<BitswapConfig, Message<O>, InnerMessage>;
type OutEvent = ();
@ -166,25 +165,21 @@ impl<TSwarmTypes> NetworkBehaviour for Bitswap<TSwarmTypes>
//self.connected_peers.remove(peer_id);
}
fn inject_node_event(
&mut self,
source: PeerId,
event: InnerMessage,
) {
fn inject_node_event(&mut self, source: PeerId, event: InnerMessage) {
debug!("bitswap: inject_node_event");
debug!("{:?}", event);
let message = match event {
InnerMessage::Rx(message) => {
message
},
InnerMessage::Rx(message) => message,
InnerMessage::Tx => {
return;
},
}
};
debug!(" received message");
// Update the ledger.
let ledger = self.connected_peers.get_mut(&source)
let ledger = self
.connected_peers
.get_mut(&source)
.expect("Peer not in ledger?!");
ledger.update_incoming_stats(&message);
@ -192,10 +187,12 @@ impl<TSwarmTypes> NetworkBehaviour for Bitswap<TSwarmTypes>
for block in message.blocks() {
// Cancel the block.
self.cancel_block(&block.cid());
self.strategy.process_block(source.clone(), block.to_owned());
self.strategy
.process_block(source.clone(), block.to_owned());
}
for (cid, priority) in message.want() {
self.strategy.process_want(source.clone(), cid.to_owned(), *priority);
self.strategy
.process_want(source.clone(), cid.to_owned(), *priority);
}
// TODO: Remove cancelled `Want` events from the queue.
// TODO: Remove cancelled blocks from `SendEvent`.
@ -210,18 +207,13 @@ impl<TSwarmTypes> NetworkBehaviour for Bitswap<TSwarmTypes>
match self.connected_peers.get_mut(&peer_id) {
None => {
debug!(" requeueing send event to {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id,
event,
})
},
self.events
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event })
}
Some(ref mut ledger) => {
ledger.update_outgoing_stats(&event);
debug!(" send_message to {}", peer_id.to_base58());
return Poll::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event,
});
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
}
}
} else {

View File

@ -1,10 +1,10 @@
use crate::block::{Block, Cid};
use crate::bitswap::bitswap_pb;
use crate::block::{Block, Cid};
use crate::error::Error;
use prost::Message as ProstMessage;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::convert::TryFrom;
use prost::{Message as ProstMessage};
use std::marker::PhantomData;
pub type Priority = i32;

View File

@ -1,12 +1,12 @@
//! Bitswap protocol implementation
pub mod behaviour;
pub mod ledger;
pub mod strategy;
pub mod protocol;
pub mod strategy;
pub use self::behaviour::Bitswap;
pub use self::protocol::BitswapError;
pub use self::ledger::Priority;
pub use self::protocol::BitswapError;
pub use self::strategy::{AltruisticStrategy, Strategy};
mod bitswap_pb {

View File

@ -3,18 +3,17 @@
/// The protocol works the following way:
///
/// - TODO
use crate::bitswap::ledger::{Message, I, O};
use crate::error::Error;
use libp2p::core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade};
use std::{io, iter};
use futures::future::Future;
use futures::io::{AsyncRead, AsyncWrite};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::pin::Pin;
use std::{io, iter};
// Undocumented, but according to JS we our messages have a max size of 512*1024
// https://github.com/ipfs/js-ipfs-bitswap/blob/d8f80408aadab94c962f6b88f343eb9f39fa0fcc/src/decision-engine/index.js#L16
const MAX_BUF_SIZE : usize = 524_288;
const MAX_BUF_SIZE: usize = 524_288;
#[derive(Clone, Debug, Default)]
pub struct BitswapConfig {}
@ -30,7 +29,8 @@ impl UpgradeInfo for BitswapConfig {
}
impl<TSocket> InboundUpgrade<TSocket> for BitswapConfig
where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = Message<I>;
type Error = Error;
@ -72,10 +72,12 @@ impl From<prost::DecodeError> for BitswapError {
impl std::fmt::Display for BitswapError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
BitswapError::ReadError(ref err) =>
write!(f, "Error while reading from socket: {}", err),
BitswapError::ProtobufError(ref err) =>
write!(f, "Error while decoding protobuf: {}", err),
BitswapError::ReadError(ref err) => {
write!(f, "Error while reading from socket: {}", err)
}
BitswapError::ProtobufError(ref err) => {
write!(f, "Error while decoding protobuf: {}", err)
}
}
}
}
@ -100,7 +102,8 @@ impl UpgradeInfo for Message<O> {
}
impl<TSocket> OutboundUpgrade<TSocket> for Message<O>
where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Output = ();
type Error = io::Error;

View File

@ -1,9 +1,9 @@
use crate::block::{Block, Cid};
use crate::bitswap::Priority;
use crate::block::{Block, Cid};
use crate::repo::{Repo, RepoTypes};
use libp2p::PeerId;
use std::sync::mpsc::{channel, Sender, Receiver};
use async_std::task;
use libp2p::PeerId;
use std::sync::mpsc::{channel, Receiver, Sender};
pub trait Strategy<TRepoTypes: RepoTypes>: Send + Unpin {
fn new(repo: Repo<TRepoTypes>) -> Self;
@ -14,10 +14,7 @@ pub trait Strategy<TRepoTypes: RepoTypes>: Send + Unpin {
#[derive(Debug)]
pub enum StrategyEvent {
Send {
peer_id: PeerId,
block: Block,
}
Send { peer_id: PeerId, block: Block },
}
pub struct AltruisticStrategy<TRepoTypes: RepoTypes> {
@ -34,7 +31,12 @@ impl<TRepoTypes: RepoTypes> Strategy<TRepoTypes> for AltruisticStrategy<TRepoTyp
}
fn process_want(&self, source: PeerId, cid: Cid, priority: Priority) {
info!("Peer {} wants block {} with priority {}", source.to_base58(), cid.to_string(), priority);
info!(
"Peer {} wants block {} with priority {}",
source.to_base58(),
cid.to_string(),
priority
);
let events = self.events.0.clone();
let mut repo = self.repo.clone();
@ -42,7 +44,12 @@ impl<TRepoTypes: RepoTypes> Strategy<TRepoTypes> for AltruisticStrategy<TRepoTyp
let res = repo.get_block(&cid).await;
let block = if let Err(e) = res {
warn!("Peer {} wanted block {} but we failed: {}", source.to_base58(), cid, e);
warn!(
"Peer {} wanted block {} but we failed: {}",
source.to_base58(),
cid,
e
);
return;
} else {
res.unwrap()
@ -54,7 +61,12 @@ impl<TRepoTypes: RepoTypes> Strategy<TRepoTypes> for AltruisticStrategy<TRepoTyp
};
if let Err(e) = events.send(req) {
warn!("Peer {} wanted block {} we failed start sending it: {}", source.to_base58(), cid, e);
warn!(
"Peer {} wanted block {} we failed start sending it: {}",
source.to_base58(),
cid,
e
);
}
});
}
@ -69,7 +81,12 @@ impl<TRepoTypes: RepoTypes> Strategy<TRepoTypes> for AltruisticStrategy<TRepoTyp
task::spawn(async move {
let future = repo.put_block(block).boxed();
if let Err(e) = future.await {
debug!("Got block {} from peer {} but failed to store it: {}", cid, source.to_base58(), e);
debug!(
"Got block {} from peer {} but failed to store it: {}",
cid,
source.to_base58(),
e
);
}
});
}

View File

@ -1,7 +1,7 @@
//! Block
pub use cid::Cid;
pub use crate::error::Error;
pub use crate::path::{IpfsPath, PathRoot};
pub use cid::Cid;
#[derive(Clone, Debug, PartialEq)]
/// An immutable ipfs block.
@ -13,10 +13,7 @@ pub struct Block {
impl Block {
/// Creates a new immutable ipfs block.
pub fn new(data: Vec<u8>, cid: Cid) -> Self {
Block {
data,
cid,
}
Block { data, cid }
}
/// Returns the size of the block in bytes.
@ -74,10 +71,7 @@ mod tests {
mh_type: multihash::Hash::SHA2256,
mh_len: 32,
};
let computed_cid = cid::Cid::new_from_prefix(
&prefix,
&content,
).to_string();
let computed_cid = cid::Cid::new_from_prefix(&prefix, &content).to_string();
assert_eq!(cid, computed_cid);
}
@ -91,18 +85,17 @@ mod tests {
mh_type: multihash::Hash::SHA2256,
mh_len: 32,
};
let computed_cid = cid::Cid::new_from_prefix(
&prefix,
&content,
).to_string();
let computed_cid = cid::Cid::new_from_prefix(&prefix, &content).to_string();
assert_eq!(cid, computed_cid);
}
#[test]
fn test_block() {
let block = Block::from("hello block\n");
assert_eq!(block.cid().to_string(),
"QmVNrZhKw9JwYa4YPEZVccQxfgQJq993yP78QEN28927vq");
assert_eq!(
block.cid().to_string(),
"QmVNrZhKw9JwYa4YPEZVccQxfgQJq993yP78QEN28927vq"
);
assert_eq!(block.size(), 12);
}
}

View File

@ -1,8 +1,8 @@
use libp2p::{Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use libp2p::identity::{Keypair, PublicKey};
use rand::{Rng, rngs::OsRng};
use serde_derive::{Serialize, Deserialize};
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use rand::{rngs::OsRng, Rng};
use serde_derive::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
@ -49,18 +49,23 @@ use std::fmt;
impl fmt::Debug for KeyMaterial {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
KeyMaterial::Ed25519 { ref keypair, .. } =>
KeyMaterial::Ed25519 { ref keypair, .. } => {
if let Some(kp) = keypair.as_ref() {
write!(fmt, "{:?}", kp)
} else {
write!(fmt, "Ed25519(not loaded)")
},
KeyMaterial::RsaPkcs8File { ref keypair, ref filename } =>
}
}
KeyMaterial::RsaPkcs8File {
ref keypair,
ref filename,
} => {
if let Some(kp) = keypair.as_ref() {
write!(fmt, "{:?}", kp.public())
} else {
write!(fmt, "Rsa(not loaded: {:?})", filename)
},
}
}
}
}
}
@ -72,12 +77,16 @@ enum KeyMaterialLoadingFailure {
}
impl KeyMaterial {
fn clone_keypair(&self) -> Keypair {
match *self {
KeyMaterial::Ed25519 { ref keypair, .. } => keypair.as_ref().map(|kp| Keypair::Ed25519(kp.as_ref().clone())),
KeyMaterial::RsaPkcs8File { ref keypair, .. } => keypair.as_ref().map(|kp| Keypair::Rsa(kp.as_ref().clone())),
}.expect("KeyMaterial needs to be loaded before accessing the keypair")
KeyMaterial::Ed25519 { ref keypair, .. } => keypair
.as_ref()
.map(|kp| Keypair::Ed25519(kp.as_ref().clone())),
KeyMaterial::RsaPkcs8File { ref keypair, .. } => {
keypair.as_ref().map(|kp| Keypair::Rsa(kp.as_ref().clone()))
}
}
.expect("KeyMaterial needs to be loaded before accessing the keypair")
}
fn public(&self) -> PublicKey {
@ -86,7 +95,10 @@ impl KeyMaterial {
fn load(&mut self) -> Result<(), KeyMaterialLoadingFailure> {
match *self {
KeyMaterial::Ed25519 { ref private_key, ref mut keypair } if keypair.is_none() => {
KeyMaterial::Ed25519 {
ref private_key,
ref mut keypair,
} if keypair.is_none() => {
let mut cloned = *private_key;
let sk = libp2p::identity::ed25519::SecretKey::from_bytes(&mut cloned)
.expect("Failed to extract ed25519::SecretKey");
@ -94,10 +106,12 @@ impl KeyMaterial {
let kp = libp2p::identity::ed25519::Keypair::from(sk);
*keypair = Some(Box::new(kp));
},
KeyMaterial::RsaPkcs8File { ref filename, ref mut keypair } if keypair.is_none() => {
let mut bytes = std::fs::read(filename)
.map_err(KeyMaterialLoadingFailure::Io)?;
}
KeyMaterial::RsaPkcs8File {
ref filename,
ref mut keypair,
} if keypair.is_none() => {
let mut bytes = std::fs::read(filename).map_err(KeyMaterialLoadingFailure::Io)?;
let kp = libp2p::identity::rsa::Keypair::from_pkcs8(&mut bytes)
.map_err(KeyMaterialLoadingFailure::RsaDecoding)?;
*keypair = Some(Box::new(kp));
@ -177,11 +191,17 @@ impl Default for ConfigFile {
// https://github.com/libp2p/rust-libp2p/blob/eb7b7bd919b93e6acf00847c19d1a76c09016120/core/src/peer_id.rs#L62-L74
let private_key: [u8; 32] = OsRng.gen();
let bootstrap = BOOTSTRAP_NODES.iter().map(|node| {
node.parse().unwrap()
}).collect();
let bootstrap = BOOTSTRAP_NODES
.iter()
.map(|node| node.parse().unwrap())
.collect();
ConfigFile {
key: KeyMaterial::Ed25519 { private_key, keypair: None }.into_loaded().unwrap(),
key: KeyMaterial::Ed25519 {
private_key,
keypair: None,
}
.into_loaded()
.unwrap(),
bootstrap,
}
}

View File

@ -11,13 +11,10 @@ pub struct IpldDag<Types: RepoTypes> {
impl<Types: RepoTypes> IpldDag<Types> {
pub fn new(repo: Repo<Types>) -> Self {
IpldDag {
repo,
}
IpldDag { repo }
}
pub async fn put(&self, data: Ipld, codec: Codec) -> Result<IpfsPath, Error>
{
pub async fn put(&self, data: Ipld, codec: Codec) -> Result<IpfsPath, Error> {
let mut repo = self.repo.clone();
let block = data.to_block(codec)?;
let cid = repo.put_block(block).await?;
@ -38,12 +35,10 @@ impl<Types: RepoTypes> IpldDag<Types> {
}
ipld = resolve(ipld, sub_path);
ipld = match ipld {
Ipld::Link(root) => {
match root.cid() {
Some(cid) => Ipld::from(&repo.get_block(cid).await?)?,
None => bail!("expected cid"),
}
}
Ipld::Link(root) => match root.cid() {
Some(cid) => Ipld::from(&repo.get_block(cid).await?)?,
None => bail!("expected cid"),
},
ipld => ipld,
};
}
@ -75,16 +70,19 @@ fn resolve(ipld: Ipld, sub_path: &SubPath) -> Ipld {
match sub_path {
SubPath::Key(key) => {
if let Ipld::Object(mut map) = ipld {
return map.remove(key).unwrap()
return map.remove(key).unwrap();
}
}
SubPath::Index(index) => {
if let Ipld::Array(mut vec) = ipld {
return vec.swap_remove(*index)
return vec.swap_remove(*index);
}
}
}
panic!("Failed to resolved ipld: {:?} sub_path: {:?}", ipld, sub_path);
panic!(
"Failed to resolved ipld: {:?} sub_path: {:?}",
ipld, sub_path
);
}
#[cfg(test)]
@ -124,7 +122,11 @@ mod tests {
async_test(async move {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let data = Ipld::Array(vec![Ipld::U64(1), Ipld::Array(vec![Ipld::U64(2)]), Ipld::U64(3)]);
let data = Ipld::Array(vec![
Ipld::U64(1),
Ipld::Array(vec![Ipld::U64(2)]),
Ipld::U64(3),
]);
let path = dag.put(data.clone(), Codec::DagCBOR).await.unwrap();
let res = dag.get(path.sub_path("1/0").unwrap()).await.unwrap();
assert_eq!(res, Ipld::U64(2));

View File

@ -16,9 +16,7 @@ impl std::error::Error for IpldError {
impl std::fmt::Display for IpldError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
IpldError::UnsupportedCodec(ref codec) => {
write!(f, "Unsupported codec {:?}", codec)
}
IpldError::UnsupportedCodec(ref codec) => write!(f, "Unsupported codec {:?}", codec),
}
}
}

View File

@ -1,9 +1,9 @@
use cbor::{Cbor, Decoder, Encoder};
pub use cbor::{CborBytes, CborTagEncode, CborError, ReadError};
use cid::Prefix;
use crate::block::Cid;
use crate::error::Error;
use crate::ipld::Ipld;
use cbor::{Cbor, Decoder, Encoder};
pub use cbor::{CborBytes, CborError, CborTagEncode, ReadError};
use cid::Prefix;
use rustc_serialize::{Encodable, Encoder as RustcEncoder};
pub(crate) const PREFIX: Prefix = Prefix {
@ -29,7 +29,7 @@ fn cbor_to_ipld(cbor: Cbor) -> Result<Ipld, Error> {
let ipld = match cbor {
Cbor::Break => {
let err = ReadError::Other("Break.".into());
return Err(CborError::Decode(err).into())
return Err(CborError::Decode(err).into());
}
Cbor::Undefined => Ipld::Null,
Cbor::Null => Ipld::Null,
@ -40,16 +40,16 @@ fn cbor_to_ipld(cbor: Cbor) -> Result<Ipld, Error> {
Cbor::Bytes(bytes) => Ipld::Bytes(bytes.0),
Cbor::Unicode(string) => Ipld::String(string),
Cbor::Array(vec) => {
let ipld_vec = vec.into_iter()
let ipld_vec = vec
.into_iter()
.map(cbor_to_ipld)
.collect::<Result<_, _>>()?;
Ipld::Array(ipld_vec)
}
Cbor::Map(map) => {
let ipld_map = map.into_iter()
.map(|(k, v)| {
Ok((k, cbor_to_ipld(v)?))
})
let ipld_map = map
.into_iter()
.map(|(k, v)| Ok((k, cbor_to_ipld(v)?)))
.collect::<Result<_, Error>>()?;
Ipld::Object(ipld_map)
}
@ -60,11 +60,11 @@ fn cbor_to_ipld(cbor: Cbor) -> Result<Ipld, Error> {
} else {
println!("{:?}", *tag.data);
let err = ReadError::Other("Invalid CID.".into());
return Err(CborError::Decode(err).into())
return Err(CborError::Decode(err).into());
}
} else {
let err = ReadError::Other("Unknown tag {}.".into());
return Err(CborError::Decode(err).into())
return Err(CborError::Decode(err).into());
}
}
};
@ -74,33 +74,15 @@ fn cbor_to_ipld(cbor: Cbor) -> Result<Ipld, Error> {
impl Encodable for Ipld {
fn encode<E: RustcEncoder>(&self, e: &mut E) -> Result<(), E::Error> {
match *self {
Ipld::U64(ref u) => {
u.encode(e)
}
Ipld::I64(ref i) => {
i.encode(e)
}
Ipld::Bytes(ref bytes) => {
cbor::CborBytes(bytes.to_owned()).encode(e)
}
Ipld::String(ref string) => {
string.encode(e)
}
Ipld::Array(ref vec) => {
vec.encode(e)
}
Ipld::Object(ref map) => {
map.encode(e)
}
Ipld::F64(f) => {
f.encode(e)
},
Ipld::Bool(b) => {
b.encode(e)
},
Ipld::Null => {
e.emit_nil()
},
Ipld::U64(ref u) => u.encode(e),
Ipld::I64(ref i) => i.encode(e),
Ipld::Bytes(ref bytes) => cbor::CborBytes(bytes.to_owned()).encode(e),
Ipld::String(ref string) => string.encode(e),
Ipld::Array(ref vec) => vec.encode(e),
Ipld::Object(ref map) => map.encode(e),
Ipld::F64(f) => f.encode(e),
Ipld::Bool(b) => b.encode(e),
Ipld::Null => e.emit_nil(),
Ipld::Link(ref root) => {
let bytes = cbor::CborBytes(root.to_bytes());
cbor::CborTagEncode::new(42, &bytes).encode(e)

View File

@ -51,33 +51,30 @@ impl PbNode {
let cid = Cid::from(link.hash)?.into();
let name = link.name;
let size = link.tsize;
links.push(PbLink {
cid,
name,
size,
});
links.push(PbLink { cid, name, size });
}
Ok(PbNode {
links,
data,
})
Ok(PbNode { links, data })
}
fn into_bytes(self) -> Vec<u8> {
let links = self.links.into_iter().map(|link| {
dag_pb::PbLink {
let links = self
.links
.into_iter()
.map(|link| dag_pb::PbLink {
hash: link.cid.to_bytes(),
name: link.name,
tsize: link.size,
}
}).collect::<Vec<_>>();
})
.collect::<Vec<_>>();
let proto = dag_pb::PbNode {
data: self.data,
links,
};
let mut res = Vec::with_capacity(proto.encoded_len());
proto.encode(&mut res).expect("there is no situation in which the protobuf message can be invalid");
proto
.encode(&mut res)
.expect("there is no situation in which the protobuf message can be invalid");
res
}
}
@ -108,15 +105,14 @@ impl TryFrom<Ipld> for PbNode {
match ipld {
Ipld::Object(mut map) => {
let links: Vec<Ipld> = map.remove("Links").ok_or(TryError)?.try_into()?;
let links: Vec<PbLink> = links.into_iter()
.map(|link| link.try_into()).collect::<Result<_, Self::Error>>()?;
let links: Vec<PbLink> = links
.into_iter()
.map(|link| link.try_into())
.collect::<Result<_, Self::Error>>()?;
let data: Vec<u8> = map.remove("Data").ok_or(TryError)?.try_into()?;
Ok(PbNode {
links,
data,
})
Ok(PbNode { links, data })
}
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -130,13 +126,9 @@ impl TryFrom<Ipld> for PbLink {
let cid: PathRoot = map.remove("Hash").ok_or(TryError)?.try_into()?;
let name: String = map.remove("Name").ok_or(TryError)?.try_into()?;
let size: u64 = map.remove("Tsize").ok_or(TryError)?.try_into()?;
Ok(PbLink {
cid,
name,
size,
})
Ok(PbLink { cid, name, size })
}
_ => Err(TryError)
_ => Err(TryError),
}
}
}

View File

@ -34,18 +34,8 @@ pub enum Ipld {
impl Ipld {
pub fn to_block(&self, codec: Codec) -> Result<Block, Error> {
let (prefix, bytes) = match codec {
Codec::DagCBOR => {
(
formats::cbor::PREFIX,
formats::cbor::encode(&self)?,
)
}
Codec::DagProtobuf => {
(
formats::pb::PREFIX,
formats::pb::encode(self.to_owned())?,
)
}
Codec::DagCBOR => (formats::cbor::PREFIX, formats::cbor::encode(&self)?),
Codec::DagProtobuf => (formats::pb::PREFIX, formats::pb::encode(self.to_owned())?),
codec => return Err(IpldError::UnsupportedCodec(codec).into()),
};
let cid = cid::Cid::new_from_prefix(&prefix, &bytes);
@ -62,12 +52,8 @@ impl Ipld {
pub fn from(block: &Block) -> Result<Self, Error> {
let data = match block.cid().prefix().codec {
Codec::DagCBOR => {
formats::cbor::decode(block.data())?
}
Codec::DagProtobuf => {
formats::pb::decode(block.data())?
}
Codec::DagCBOR => formats::cbor::decode(block.data())?,
Codec::DagProtobuf => formats::pb::decode(block.data())?,
codec => return Err(IpldError::UnsupportedCodec(codec).into()),
};
Ok(data)
@ -130,7 +116,11 @@ impl<T: Into<Ipld>> From<HashMap<String, T>> for Ipld {
impl<T: Into<Ipld>> From<HashMap<&str, T>> for Ipld {
fn from(map: HashMap<&str, T>) -> Self {
Ipld::Object(map.into_iter().map(|(k, v)| (k.to_string(), v.into())).collect())
Ipld::Object(
map.into_iter()
.map(|(k, v)| (k.to_string(), v.into()))
.collect(),
)
}
}
@ -170,7 +160,7 @@ impl TryInto<u64> for Ipld {
fn try_into(self) -> Result<u64, Self::Error> {
match self {
Ipld::U64(u) => Ok(u),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -181,7 +171,7 @@ impl TryInto<i64> for Ipld {
fn try_into(self) -> Result<i64, Self::Error> {
match self {
Ipld::I64(i) => Ok(i),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -192,7 +182,7 @@ impl TryInto<Vec<u8>> for Ipld {
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
match self {
Ipld::Bytes(bytes) => Ok(bytes),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -203,7 +193,7 @@ impl TryInto<String> for Ipld {
fn try_into(self) -> Result<String, Self::Error> {
match self {
Ipld::String(string) => Ok(string),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -214,7 +204,7 @@ impl TryInto<Vec<Ipld>> for Ipld {
fn try_into(self) -> Result<Vec<Ipld>, Self::Error> {
match self {
Ipld::Array(vec) => Ok(vec),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -225,7 +215,7 @@ impl TryInto<HashMap<String, Ipld>> for Ipld {
fn try_into(self) -> Result<HashMap<String, Ipld>, Self::Error> {
match self {
Ipld::Object(map) => Ok(map),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -236,7 +226,7 @@ impl TryInto<f64> for Ipld {
fn try_into(self) -> Result<f64, Self::Error> {
match self {
Ipld::F64(f) => Ok(f),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -247,7 +237,7 @@ impl TryInto<bool> for Ipld {
fn try_into(self) -> Result<bool, Self::Error> {
match self {
Ipld::Bool(b) => Ok(b),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -258,7 +248,7 @@ impl TryInto<PathRoot> for Ipld {
fn try_into(self) -> Result<PathRoot, Self::Error> {
match self {
Ipld::Link(root) => Ok(root),
_ => Err(TryError)
_ => Err(TryError),
}
}
}
@ -269,7 +259,7 @@ impl TryInto<Cid> for Ipld {
fn try_into(self) -> Result<Cid, Self::Error> {
match self {
Ipld::Link(root) => root.try_into(),
_ => Err(TryError)
_ => Err(TryError),
}
}
}

View File

@ -3,15 +3,15 @@ use crate::path::IpfsPath;
use domain::core::bits::{Dname, Question};
use domain::core::iana::Rtype;
use domain::core::rdata::Txt;
use domain::resolv::{Resolver, StubResolver};
use domain::resolv::stub::resolver::Query;
use domain::resolv::{Resolver, StubResolver};
use futures::compat::{Compat01As03, Future01CompatExt};
use futures::future::{select_ok, SelectOk};
use futures::pin_mut;
use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context};
use std::str::FromStr;
use futures::future::{select_ok, SelectOk};
use futures::compat::{Compat01As03, Future01CompatExt};
use futures::pin_mut;
use std::task::{Context, Poll};
#[derive(Debug, Fail)]
#[fail(display = "no dnslink entry")]
@ -43,7 +43,7 @@ impl Future for DnsLinkFuture {
if !rest.is_empty() {
_self.query = select_ok(rest);
} else {
return Poll::Ready(Err(DnsLinkError.into()))
return Poll::Ready(Err(DnsLinkError.into()));
}
}
Poll::Pending => return Poll::Pending,
@ -66,7 +66,8 @@ pub async fn resolve(domain: &str) -> Result<IpfsPath, Error> {
Ok(DnsLinkFuture {
query: select_ok(vec![query1, query2]),
}.await?)
}
.await?)
}
#[cfg(test)]

View File

@ -4,9 +4,9 @@ use crate::path::IpfsPath;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use libp2p::core::PublicKey;
use libp2p::identity::Keypair;
use std::time::{Duration, SystemTime};
use std::str::FromStr;
use std::convert::TryFrom;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
#[derive(Clone, Debug, PartialEq)]
pub struct IpnsEntry {
@ -65,8 +65,8 @@ impl IpnsEntry {
}
}
use std::io::Cursor;
use prost::Message;
use std::io::Cursor;
impl TryFrom<&[u8]> for IpnsEntry {
type Error = Error;
@ -93,7 +93,8 @@ impl Into<Vec<u8>> for &IpnsEntry {
let mut proto = proto::IpnsEntry::default();
proto.value = self.value.as_bytes().to_vec();
proto.sequence = self.seq;
let nanos = self.validity
let nanos = self
.validity
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos();
@ -104,7 +105,9 @@ impl Into<Vec<u8>> for &IpnsEntry {
proto.signature = self.signature.clone();
proto.pub_key = self.public_key.clone().into_protobuf_encoding();
let mut res = Vec::with_capacity(proto.encoded_len());
proto.encode(&mut res).expect("there is no situation in which the protobuf message can be invalid");
proto
.encode(&mut res)
.expect("there is no situation in which the protobuf message can be invalid");
res
}
}
@ -140,7 +143,8 @@ mod tests {
#[test]
fn test_from_path() {
let key = generate_key();
let path = IpfsPath::from_str("/ipfs/QmUJPTFZnR2CPGAzmfdYPghgrFtYFB6pf1BqMvqfiPDam8").unwrap();
let path =
IpfsPath::from_str("/ipfs/QmUJPTFZnR2CPGAzmfdYPghgrFtYFB6pf1BqMvqfiPDam8").unwrap();
let ipns = IpnsEntry::from_path(&path, 0, &key);
assert_eq!(path, ipns.resolve().unwrap());
}

View File

@ -16,33 +16,25 @@ pub struct Ipns<Types: RepoTypes> {
impl<Types: RepoTypes> Ipns<Types> {
pub fn new(repo: Repo<Types>) -> Self {
Ipns {
repo
}
Ipns { repo }
}
/// Resolves a ipns path to an ipld path.
pub async fn resolve(&self, path: &IpfsPath) -> Result<IpfsPath, Error>
{
pub async fn resolve(&self, path: &IpfsPath) -> Result<IpfsPath, Error> {
let mut repo = self.repo.clone();
let path = path.to_owned();
match path.root() {
PathRoot::Ipld(_) => Ok(path),
PathRoot::Ipns(peer_id) => {
match repo.get_ipns(peer_id).await? {
Some(path) => Ok(path),
None => bail!("unimplemented"),
}
},
PathRoot::Dns(domain) => {
Ok(dns::resolve(domain).await?)
PathRoot::Ipns(peer_id) => match repo.get_ipns(peer_id).await? {
Some(path) => Ok(path),
None => bail!("unimplemented"),
},
PathRoot::Dns(domain) => Ok(dns::resolve(domain).await?),
}
}
/// Publishes an ipld path.
pub async fn publish(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error>
{
pub async fn publish(&self, key: &PeerId, path: &IpfsPath) -> Result<IpfsPath, Error> {
let future = self.repo.put_ipns(key, path);
let key = key.to_owned();
let mut path = path.to_owned();
@ -52,8 +44,7 @@ impl<Types: RepoTypes> Ipns<Types> {
}
/// Cancel an ipns path.
pub async fn cancel(&self, key: &PeerId) -> Result<(), Error>
{
pub async fn cancel(&self, key: &PeerId) -> Result<(), Error> {
self.repo.remove_ipns(key).await?;
Ok(())
}

View File

@ -1,13 +1,15 @@
//! IPFS node implementation
//#![deny(missing_docs)]
#[macro_use] extern crate failure;
#[macro_use] extern crate log;
pub use libp2p::PeerId;
use std::marker::PhantomData;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate log;
use async_std::path::PathBuf;
use futures::channel::mpsc::{channel, Sender, Receiver};
use futures::channel::mpsc::{channel, Receiver, Sender};
pub use libp2p::PeerId;
use std::future::Future;
use std::marker::PhantomData;
pub mod bitswap;
pub mod block;
@ -23,14 +25,14 @@ pub mod unixfs;
pub use self::block::{Block, Cid};
use self::config::ConfigFile;
pub use self::error::Error;
use self::ipld::IpldDag;
pub use self::ipld::Ipld;
use self::ipld::IpldDag;
use self::ipns::Ipns;
pub use self::p2p::SwarmTypes;
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
pub use self::path::IpfsPath;
pub use self::repo::RepoTypes;
use self::repo::{create_repo, RepoOptions, Repo, RepoEvent};
use self::repo::{create_repo, Repo, RepoEvent, RepoOptions};
use self::unixfs::File;
static IPFS_LOG: &str = "info";
@ -81,12 +83,14 @@ impl Default for IpfsOptions<Types> {
/// Create `IpfsOptions` from environment.
fn default() -> Self {
let ipfs_log = std::env::var("IPFS_LOG").unwrap_or_else(|_| IPFS_LOG.into());
let ipfs_path = std::env::var("IPFS_PATH").unwrap_or_else(|_| {
let mut ipfs_path = std::env::var("HOME").unwrap_or_else(|_| "".into());
ipfs_path.push_str("/");
ipfs_path.push_str(IPFS_PATH);
ipfs_path
}).into();
let ipfs_path = std::env::var("IPFS_PATH")
.unwrap_or_else(|_| {
let mut ipfs_path = std::env::var("HOME").unwrap_or_else(|_| "".into());
ipfs_path.push_str("/");
ipfs_path.push_str(IPFS_PATH);
ipfs_path
})
.into();
let xdg_dirs = xdg::BaseDirectories::with_prefix(XDG_APP_NAME).unwrap();
let path = xdg_dirs.place_config_file(CONFIG_FILE).unwrap();
let config = ConfigFile::new(path);
@ -95,7 +99,7 @@ impl Default for IpfsOptions<Types> {
_marker: PhantomData,
ipfs_log,
ipfs_path,
config
config,
}
}
}
@ -105,8 +109,12 @@ impl Default for IpfsOptions<TestTypes> {
/// file system.
fn default() -> Self {
let ipfs_log = std::env::var("IPFS_LOG").unwrap_or_else(|_| IPFS_LOG.into());
let ipfs_path = std::env::var("IPFS_PATH").unwrap_or_else(|_| IPFS_PATH.into()).into();
let config = std::env::var("IPFS_TEST_CONFIG").map(ConfigFile::new).unwrap_or_default();
let ipfs_path = std::env::var("IPFS_PATH")
.unwrap_or_else(|_| IPFS_PATH.into())
.into();
let config = std::env::var("IPFS_TEST_CONFIG")
.map(ConfigFile::new)
.unwrap_or_default();
IpfsOptions {
_marker: PhantomData,
ipfs_log,
@ -158,9 +166,11 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
}
/// Initialize the ipfs node.
pub async fn start(mut self) -> Result<(Ipfs<Types>, impl std::future::Future<Output = ()>), Error> {
let (repo_events, swarm) = self.moved_on_init
pub async fn start(
mut self,
) -> Result<(Ipfs<Types>, impl std::future::Future<Output = ()>), Error> {
let (repo_events, swarm) = self
.moved_on_init
.take()
.expect("Cant see how this should happen");
@ -176,14 +186,23 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
swarm,
};
let UninitializedIpfs { repo, dag, ipns, exit_events, .. } = self;
Ok((Ipfs {
let UninitializedIpfs {
repo,
dag,
ipns,
exit_events
}, fut))
exit_events,
..
} = self;
Ok((
Ipfs {
repo,
dag,
ipns,
exit_events,
},
fut,
))
}
}
@ -257,7 +276,7 @@ pub struct IpfsFuture<Types: SwarmTypes> {
}
use std::pin::Pin;
use std::task::{Poll, Context};
use std::task::{Context, Poll};
impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
type Output = ();
@ -278,29 +297,32 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
}
{
loop {
let pin = Pin::new(&mut self.repo_events);
match pin.poll_next(ctx) {
Poll::Ready(Some(RepoEvent::WantBlock(cid))) =>
self.swarm.want_block(cid),
Poll::Ready(Some(RepoEvent::ProvideBlock(cid))) =>
self.swarm.provide_block(cid),
Poll::Ready(Some(RepoEvent::UnprovideBlock(cid))) =>
self.swarm.stop_providing_block(&cid),
Poll::Ready(Some(RepoEvent::WantBlock(cid))) => self.swarm.want_block(cid),
Poll::Ready(Some(RepoEvent::ProvideBlock(cid))) => {
self.swarm.provide_block(cid)
}
Poll::Ready(Some(RepoEvent::UnprovideBlock(cid))) => {
self.swarm.stop_providing_block(&cid)
}
Poll::Ready(None) => panic!("other side closed the repo_events?"),
Poll::Pending => break,
}
}
}
{
let poll = Pin::new(&mut self.swarm).poll_next(ctx);
match poll {
Poll::Ready(Some(_)) => {},
Poll::Ready(None) => { return Poll::Ready(()); },
Poll::Pending => { return Poll::Pending; }
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => {
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
@ -314,8 +336,9 @@ mod tests {
/// Testing helper for std::future::Futures until we can upgrade tokio
pub(crate) fn async_test<O, F>(future: F) -> O
where O: 'static + Send,
F: std::future::Future<Output = O> + 'static + Send
where
O: 'static + Send,
F: std::future::Future<Output = O> + 'static + Send,
{
let (tx, rx) = std::sync::mpsc::channel();
task::block_on(async move {
@ -348,7 +371,6 @@ mod tests {
let options = IpfsOptions::<TestTypes>::default();
async_test(async move {
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);

View File

@ -2,15 +2,15 @@ use crate::bitswap::{Bitswap, Strategy};
use crate::block::Cid;
use crate::p2p::{SwarmOptions, SwarmTypes};
use crate::repo::Repo;
use libp2p::{NetworkBehaviour};
use libp2p::swarm::NetworkBehaviourEventProcess;
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::ping::{Ping, PingEvent};
use libp2p::PeerId;
use libp2p::floodsub::{Floodsub, FloodsubEvent};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{Kademlia, KademliaEvent};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::ping::{Ping, PingEvent};
use libp2p::swarm::NetworkBehaviourEventProcess;
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
//use parity_multihash::Multihash;
/// Behaviour type.
@ -24,10 +24,7 @@ pub struct Behaviour<TSwarmTypes: SwarmTypes> {
floodsub: Floodsub,
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<MdnsEvent> for
Behaviour<TSwarmTypes>
{
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<MdnsEvent> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
@ -49,12 +46,11 @@ impl<TSwarmTypes: SwarmTypes>
}
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<KademliaEvent> for
Behaviour<TSwarmTypes>
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<KademliaEvent>
for Behaviour<TSwarmTypes>
{
fn inject_event(&mut self, event: KademliaEvent) {
use libp2p::kad::{GetProvidersOk, GetProvidersError};
use libp2p::kad::{GetProvidersError, GetProvidersOk};
match event {
KademliaEvent::Discovered { peer_id, ty, .. } => {
@ -70,7 +66,11 @@ impl<TSwarmTypes: SwarmTypes>
info!("kad: Found closer peer {} to {}", peer.to_base58(), key.to_base58());
}
}*/
KademliaEvent::GetProvidersResult(Ok(GetProvidersOk { key, providers, closest_peers })) => {
KademliaEvent::GetProvidersResult(Ok(GetProvidersOk {
key,
providers,
closest_peers,
})) => {
// FIXME: really wasteful to run this through Vec
let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58();
if providers.is_empty() {
@ -82,67 +82,76 @@ impl<TSwarmTypes: SwarmTypes>
self.bitswap.connect(peer);
}
}
},
}
KademliaEvent::GetProvidersResult(Err(GetProvidersError::Timeout { key, .. })) => {
// FIXME: really wasteful to run this through Vec
let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58();
warn!("kad: timed out get providers query for {}", cid);
},
x => { debug!("kad ignored event {:?}", x); },
}
x => {
debug!("kad ignored event {:?}", x);
}
}
}
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<()> for
Behaviour<TSwarmTypes>
{
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<()> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, _event: ()) {}
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<PingEvent> for
Behaviour<TSwarmTypes>
{
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, event: PingEvent) {
use libp2p::ping::handler::{PingSuccess, PingFailure};
use libp2p::ping::handler::{PingFailure, PingSuccess};
match event {
PingEvent { peer, result: Result::Ok(PingSuccess::Ping { rtt }) } => {
debug!("ping: rtt to {} is {} ms", peer.to_base58(), rtt.as_millis());
},
PingEvent { peer, result: Result::Ok(PingSuccess::Pong) } => {
PingEvent {
peer,
result: Result::Ok(PingSuccess::Ping { rtt }),
} => {
debug!(
"ping: rtt to {} is {} ms",
peer.to_base58(),
rtt.as_millis()
);
}
PingEvent {
peer,
result: Result::Ok(PingSuccess::Pong),
} => {
debug!("ping: pong from {}", peer.to_base58());
},
PingEvent { peer, result: Result::Err(PingFailure::Timeout) } => {
}
PingEvent {
peer,
result: Result::Err(PingFailure::Timeout),
} => {
warn!("ping: timeout to {}", peer.to_base58());
},
PingEvent { peer, result: Result::Err(PingFailure::Other { error }) } => {
}
PingEvent {
peer,
result: Result::Err(PingFailure::Other { error }),
} => {
error!("ping: failure with {}: {}", peer.to_base58(), error);
}
}
}
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<IdentifyEvent> for
Behaviour<TSwarmTypes>
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSwarmTypes>
{
fn inject_event(&mut self, event: IdentifyEvent) {
debug!("identify: {:?}", event);
}
}
impl<TSwarmTypes: SwarmTypes>
NetworkBehaviourEventProcess<FloodsubEvent> for
Behaviour<TSwarmTypes>
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<FloodsubEvent>
for Behaviour<TSwarmTypes>
{
fn inject_event(&mut self, event: FloodsubEvent) {
debug!("floodsub: {:?}", event);
}
}
impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes>
{
impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
/// Create a Kademlia behaviour with the IPFS bootstrap nodes.
pub async fn new(options: SwarmOptions<TSwarmTypes>, repo: Repo<TSwarmTypes>) -> Self {
info!("Local peer id: {}", options.peer_id.to_base58());
@ -197,6 +206,9 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes>
}
/// Create a IPFS behaviour with the IPFS bootstrap nodes.
pub async fn build_behaviour<TSwarmTypes: SwarmTypes>(options: SwarmOptions<TSwarmTypes>, repo: Repo<TSwarmTypes>) -> Behaviour<TSwarmTypes> {
pub async fn build_behaviour<TSwarmTypes: SwarmTypes>(
options: SwarmOptions<TSwarmTypes>,
repo: Repo<TSwarmTypes>,
) -> Behaviour<TSwarmTypes> {
Behaviour::new(options, repo).await
}

View File

@ -1,10 +1,10 @@
//! P2P handling for IPFS nodes.
use crate::bitswap::Strategy;
use crate::IpfsOptions;
use crate::repo::{Repo, RepoTypes};
use libp2p::{Multiaddr, PeerId};
use libp2p::Swarm;
use crate::IpfsOptions;
use libp2p::identity::Keypair;
use libp2p::Swarm;
use libp2p::{Multiaddr, PeerId};
use std::marker::PhantomData;
mod behaviour;
@ -38,7 +38,10 @@ impl<TSwarmTypes: SwarmTypes> From<&IpfsOptions<TSwarmTypes>> for SwarmOptions<T
}
/// Creates a new IPFS swarm.
pub async fn create_swarm<TSwarmTypes: SwarmTypes>(options: SwarmOptions<TSwarmTypes>, repo: Repo<TSwarmTypes>) -> TSwarm<TSwarmTypes> {
pub async fn create_swarm<TSwarmTypes: SwarmTypes>(
options: SwarmOptions<TSwarmTypes>,
repo: Repo<TSwarmTypes>,
) -> TSwarm<TSwarmTypes> {
let peer_id = options.peer_id.clone();
// Set up an encrypted TCP transport over the Mplex protocol.

View File

@ -1,5 +1,4 @@
use crate::p2p::{SwarmOptions, SwarmTypes};
use libp2p::{PeerId, Transport};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::boxed::Boxed;
use libp2p::core::transport::upgrade::Version;
@ -7,6 +6,7 @@ use libp2p::mplex::MplexConfig;
use libp2p::secio::SecioConfig;
use libp2p::tcp::TcpConfig;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{PeerId, Transport};
use std::io::{Error, ErrorKind};
use std::time::Duration;
@ -25,7 +25,10 @@ pub fn build_transport<TSwarmTypes: SwarmTypes>(options: &SwarmOptions<TSwarmTyp
.nodelay(true)
.upgrade(Version::V1)
.authenticate(secio_config)
.multiplex(libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config))
.multiplex(libp2p::core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
))
.timeout(Duration::from_secs(20))
.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)))
.map_err(|err| Error::new(ErrorKind::Other, err))

View File

@ -4,10 +4,7 @@ use crate::path::SubPath;
#[derive(Debug)]
pub enum IpfsPathError {
InvalidPath(String),
ResolveError {
ipld: Ipld,
path: SubPath,
},
ResolveError { ipld: Ipld, path: SubPath },
ExpectedIpldPath,
}
@ -24,15 +21,11 @@ impl std::error::Error for IpfsPathError {
impl std::fmt::Display for IpfsPathError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
IpfsPathError::InvalidPath(ref path) => {
write!(f, "Invalid path {:?}", path)
}
IpfsPathError::InvalidPath(ref path) => write!(f, "Invalid path {:?}", path),
IpfsPathError::ResolveError { ref path, .. } => {
write!(f, "Can't resolve {}", path.to_string())
}
IpfsPathError::ExpectedIpldPath => {
write!(f, "Expected ipld path but found ipns path")
}
IpfsPathError::ExpectedIpldPath => write!(f, "Expected ipld path but found ipns path"),
}
}
}

View File

@ -2,8 +2,8 @@ use crate::block::Cid;
use crate::error::{Error, TryError};
use libp2p::PeerId;
use std::convert::{TryFrom, TryInto};
use std::str::FromStr;
use std::fmt;
use std::str::FromStr;
pub mod error;
pub use self::error::IpfsPathError;
@ -26,11 +26,9 @@ impl FromStr for IpfsPath {
let root = match (empty, root_type, key) {
(Some(""), Some("ipfs"), Some(key)) => PathRoot::Ipld(Cid::from(key)?),
(Some(""), Some("ipld"), Some(key)) => PathRoot::Ipld(Cid::from(key)?),
(Some(""), Some("ipns"), Some(key)) => {
match PeerId::from_str(key).ok() {
Some(peer_id) => PathRoot::Ipns(peer_id),
None => PathRoot::Dns(key.to_string())
}
(Some(""), Some("ipns"), Some(key)) => match PeerId::from_str(key).ok() {
Some(peer_id) => PathRoot::Ipns(peer_id),
None => PathRoot::Dns(key.to_string()),
},
_ => return Err(IpfsPathError::InvalidPath(string.to_owned()).into()),
};
@ -48,7 +46,6 @@ impl IpfsPath {
}
}
pub fn root(&self) -> &PathRoot {
&self.root
}
@ -90,7 +87,7 @@ impl IpfsPath {
Ok(self)
}
pub fn iter(&self) -> impl Iterator<Item=&SubPath> {
pub fn iter(&self) -> impl Iterator<Item = &SubPath> {
self.path.iter()
}
}

View File

@ -1,17 +1,17 @@
//! Persistent fs backed repo
use crate::block::{Cid, Block};
use crate::block::{Block, Cid};
use crate::error::Error;
use crate::repo::BlockStore;
#[cfg(feature = "rocksdb")]
use crate::repo::{Column, DataStore};
use async_std::fs;
use async_std::path::PathBuf;
use async_std::prelude::*;
use async_trait::async_trait;
use futures::stream::StreamExt;
use std::collections::HashSet;
use std::ffi::OsStr;
use async_std::path::PathBuf;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use async_std::fs;
use async_std::prelude::*;
use futures::stream::StreamExt;
#[derive(Clone, Debug)]
pub struct FsBlockStore {
@ -24,7 +24,7 @@ impl BlockStore for FsBlockStore {
fn new(path: PathBuf) -> Self {
FsBlockStore {
path,
cids: Arc::new(Mutex::new(HashSet::new()))
cids: Arc::new(Mutex::new(HashSet::new())),
}
}
@ -152,11 +152,7 @@ impl DataStore for RocksDataStore {
let ipns_opts = rocksdb::Options::default();
let ipns_cf = rocksdb::ColumnFamilyDescriptor::new("ipns", ipns_opts);
let rdb = rocksdb::DB::open_cf_descriptors(
&db_opts,
&path,
vec![ipns_cf],
)?;
let rdb = rocksdb::DB::open_cf_descriptors(&db_opts, &path, vec![ipns_cf])?;
*db.lock().unwrap() = Some(rdb);
Ok(())
}
@ -213,9 +209,8 @@ fn block_path(mut base: PathBuf, cid: &Cid) -> PathBuf {
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
use crate::tests::async_test;
use std::env::temp_dir;
#[test]
fn test_fs_blockstore() {

View File

@ -1,10 +1,10 @@
//! Volatile memory backed repo
use crate::block::{Cid, Block};
use crate::block::{Block, Cid};
use crate::error::Error;
use crate::repo::{BlockStore, DataStore, Column};
use crate::repo::{BlockStore, Column, DataStore};
use async_std::path::PathBuf;
use async_trait::async_trait;
use std::collections::HashMap;
use async_std::path::PathBuf;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
@ -16,7 +16,7 @@ pub struct MemBlockStore {
impl BlockStore for MemBlockStore {
fn new(_path: PathBuf) -> Self {
MemBlockStore {
blocks: Arc::new(Mutex::new(HashMap::new()))
blocks: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -34,7 +34,10 @@ impl BlockStore for MemBlockStore {
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
let block = self.blocks.lock().unwrap()
let block = self
.blocks
.lock()
.unwrap()
.get(cid)
.map(|block| block.to_owned());
Ok(block)
@ -75,7 +78,7 @@ impl DataStore for MemDataStore {
async fn contains(&self, col: Column, key: &[u8]) -> Result<bool, Error> {
let map = match col {
Column::Ipns => &self.ipns
Column::Ipns => &self.ipns,
};
let contains = map.lock().unwrap().contains_key(key);
Ok(contains)
@ -83,7 +86,7 @@ impl DataStore for MemDataStore {
async fn get(&self, col: Column, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let map = match col {
Column::Ipns => &self.ipns
Column::Ipns => &self.ipns,
};
let value = map.lock().unwrap().get(key).map(|value| value.to_owned());
Ok(value)
@ -91,7 +94,7 @@ impl DataStore for MemDataStore {
async fn put(&self, col: Column, key: &[u8], value: &[u8]) -> Result<(), Error> {
let map = match col {
Column::Ipns => &self.ipns
Column::Ipns => &self.ipns,
};
map.lock().unwrap().insert(key.to_owned(), value.to_owned());
Ok(())
@ -99,7 +102,7 @@ impl DataStore for MemDataStore {
async fn remove(&self, col: Column, key: &[u8]) -> Result<(), Error> {
let map = match col {
Column::Ipns => &self.ipns
Column::Ipns => &self.ipns,
};
map.lock().unwrap().remove(key);
Ok(())
@ -109,8 +112,8 @@ impl DataStore for MemDataStore {
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
use crate::tests::async_test;
use std::env::temp_dir;
#[test]
fn test_mem_blockstore() {

View File

@ -1,17 +1,17 @@
//! IPFS repo
use crate::block::{Cid, Block};
use crate::block::{Block, Cid};
use crate::error::Error;
use crate::path::IpfsPath;
use crate::IpfsOptions;
use libp2p::PeerId;
use async_trait::async_trait;
use std::marker::PhantomData;
use async_std::path::PathBuf;
use futures::channel::mpsc::{channel, Sender, Receiver};
use async_trait::async_trait;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::SinkExt;
use libp2p::PeerId;
use std::marker::PhantomData;
pub mod mem;
pub mod fs;
pub mod mem;
pub trait RepoTypes: Clone + Send + Sync + 'static {
type TBlockStore: BlockStore;
@ -33,7 +33,9 @@ impl<TRepoTypes: RepoTypes> From<&IpfsOptions<TRepoTypes>> for RepoOptions<TRepo
}
}
pub fn create_repo<TRepoTypes: RepoTypes>(options: RepoOptions<TRepoTypes>) -> (Repo<TRepoTypes>, Receiver<RepoEvent>) {
pub fn create_repo<TRepoTypes: RepoTypes>(
options: RepoOptions<TRepoTypes>,
) -> (Repo<TRepoTypes>, Receiver<RepoEvent>) {
Repo::new(options)
}
@ -61,7 +63,7 @@ pub trait DataStore: Clone + Send + Sync + Unpin + 'static {
#[derive(Clone, Copy, Debug)]
pub enum Column {
Ipns
Ipns,
}
#[derive(Clone, Debug)]
@ -87,11 +89,14 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
let block_store = TRepoTypes::TBlockStore::new(blockstore_path);
let data_store = TRepoTypes::TDataStore::new(datastore_path);
let (sender, receiver) = channel::<RepoEvent>(1);
(Repo {
block_store,
data_store,
events: sender,
}, receiver)
(
Repo {
block_store,
data_store,
events: sender,
},
receiver,
)
}
pub async fn init(&self) -> Result<(), Error> {
@ -121,8 +126,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
}
/// Puts a block into the block store.
pub async fn put_block(&mut self, block: Block) -> Result<Cid, Error>
{
pub async fn put_block(&mut self, block: Block) -> Result<Cid, Error> {
let cid = self.block_store.put(block).await?;
// sending only fails if no one is listening anymore
// and that is okay with us.
@ -131,8 +135,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
}
/// Retrives a block from the block store.
pub async fn get_block(&mut self, cid: &Cid) -> Result<Block, Error>
{
pub async fn get_block(&mut self, cid: &Cid) -> Result<Block, Error> {
loop {
if !self.block_store.contains(&cid).await? {
// sending only fails if no one is listening anymore
@ -147,18 +150,19 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
}
/// Remove block from the block store.
pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error>
{
pub async fn remove_block(&mut self, cid: &Cid) -> Result<(), Error> {
// sending only fails if no one is listening anymore
// and that is okay with us.
let _ = self.events.send(RepoEvent::UnprovideBlock(cid.to_owned())).await;
let _ = self
.events
.send(RepoEvent::UnprovideBlock(cid.to_owned()))
.await;
self.block_store.remove(cid).await?;
Ok(())
}
/// Get an ipld path from the datastore.
pub async fn get_ipns(&mut self, ipns: &PeerId) -> Result<Option<IpfsPath>, Error>
{
pub async fn get_ipns(&mut self, ipns: &PeerId) -> Result<Option<IpfsPath>, Error> {
use std::str::FromStr;
let data_store = self.data_store.clone();
@ -170,21 +174,21 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
let path = IpfsPath::from_str(&string)?;
Ok(Some(path))
}
None => Ok(None)
None => Ok(None),
}
}
/// Put an ipld path into the datastore.
pub async fn put_ipns(&self, ipns: &PeerId, path: &IpfsPath) -> Result<(), Error>
{
pub async fn put_ipns(&self, ipns: &PeerId, path: &IpfsPath) -> Result<(), Error> {
let string = path.to_string();
let value = string.as_bytes();
self.data_store.put(Column::Ipns, ipns.as_bytes(), value).await
self.data_store
.put(Column::Ipns, ipns.as_bytes(), value)
.await
}
/// Remove an ipld path from the datastore.
pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error>
{
pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error> {
self.data_store.remove(Column::Ipns, ipns.as_bytes()).await
}
}
@ -192,8 +196,8 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::env::temp_dir;
use crate::tests::async_test;
use std::env::temp_dir;
#[derive(Clone)]
pub struct Types;

View File

@ -1,12 +1,12 @@
use crate::error::Error;
use crate::ipld::{Ipld, IpldDag, formats::pb::PbNode};
use crate::ipld::{formats::pb::PbNode, Ipld, IpldDag};
use crate::path::IpfsPath;
use crate::repo::RepoTypes;
use std::collections::HashMap;
use std::convert::TryInto;
use async_std::path::PathBuf;
use async_std::fs;
use async_std::io::ReadExt;
use async_std::path::PathBuf;
use std::collections::HashMap;
use std::convert::TryInto;
pub struct File {
data: Vec<u8>,
@ -17,20 +17,19 @@ impl File {
let mut file = fs::File::open(path).await?;
let mut data = Vec::new();
file.read_to_end(&mut data).await?;
Ok(File {
data
})
Ok(File { data })
}
pub async fn get_unixfs_v1<T: RepoTypes>(dag: &IpldDag<T>, path: IpfsPath) -> Result<Self, Error> {
pub async fn get_unixfs_v1<T: RepoTypes>(
dag: &IpldDag<T>,
path: IpfsPath,
) -> Result<Self, Error> {
let ipld = dag.get(path).await?;
let pb_node: PbNode = match ipld.try_into() {
Ok(pb_node) => pb_node,
Err(_) => bail!("invalid dag_pb node"),
};
Ok(File {
data: pb_node.data,
})
Ok(File { data: pb_node.data })
}
pub async fn put_unixfs_v1<T: RepoTypes>(&self, dag: &IpldDag<T>) -> Result<IpfsPath, Error> {
@ -45,16 +44,14 @@ impl File {
impl From<Vec<u8>> for File {
fn from(data: Vec<u8>) -> Self {
File {
data,
}
File { data }
}
}
impl From<&str> for File {
fn from(string: &str) -> Self {
File {
data: string.as_bytes().to_vec()
data: string.as_bytes().to_vec(),
}
}
}