Merge pull request #104 from dvc94ch/swarm-http-2

Implement swarm api.
This commit is contained in:
Joonas Koivunen 2020-03-22 12:13:50 +02:00 committed by GitHub
commit c8b44941f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 746 additions and 316 deletions

28
Cargo.lock generated
View File

@ -521,19 +521,6 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
@ -1222,7 +1209,6 @@ dependencies = [
"async-trait 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)",
"bitswap 0.1.0",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"domain 0.4.1 (git+https://github.com/nlnetlabs/domain)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1239,6 +1225,19 @@ dependencies = [
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)",
"void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ipfs-examples"
version = "0.1.0"
dependencies = [
"async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"ipfs 0.1.0",
"libipld 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"multihash 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -3778,7 +3777,6 @@ dependencies = [
"checksum const-random-macro 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
"checksum constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"

View File

@ -16,7 +16,6 @@ async-std = { version = "1.5.0", features = ["attributes", "std"] }
async-trait = "0.1.24"
bitswap = { path = "bitswap" }
byteorder = "1.3.4"
crossbeam = "0.7.3"
dirs = "2.0.2"
domain = { git = "https://github.com/nlnetlabs/domain", features = ["resolv"] }
futures = { version = "0.3.4", features = ["compat", "io-compat"] }
@ -31,6 +30,7 @@ rocksdb = { version = "0.13.0", optional = true }
serde = { version = "1.0.104", features = ["derive"] }
serde_json = "1.0.48"
thiserror = "1.0.11"
void = "1.0.2"
[build-dependencies]
prost-build = "0.6.1"
@ -39,7 +39,7 @@ prost-build = "0.6.1"
env_logger = "0.7.1"
[workspace]
members = [ "bitswap", "http" ]
members = [ "bitswap", "http", "examples" ]
[patch.crates-io]
ctr = { git = "https://github.com/koivunej/stream-ciphers.git", branch = "ctr128-64to128" }

View File

@ -63,10 +63,8 @@ use ipfs::{IpfsOptions, IpfsPath, Ipld, Types, UninitializedIpfs};
use libipld::ipld;
fn main() {
env_logger::init();
let options = IpfsOptions::<Types>::default();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo

14
examples/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "ipfs-examples"
version = "0.1.0"
authors = ["David Craven <david@craven.ch>"]
edition = "2018"
publish = false
[dependencies]
async-std = "1.5.0"
env_logger = "0.7.1"
futures = "0.3.4"
ipfs = { path = ".." }
libipld = "0.1.0"
multihash = "0.10.1"

View File

@ -4,10 +4,8 @@ use ipfs::{IpfsOptions, Types, UninitializedIpfs};
use libipld::ipld;
fn main() {
env_logger::init();
let options = IpfsOptions::<Types>::default();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();

View File

@ -4,10 +4,8 @@ use ipfs::{IpfsOptions, IpfsPath, TestTypes, UninitializedIpfs};
use std::str::FromStr;
fn main() {
env_logger::init();
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
let path =
IpfsPath::from_str("/ipfs/zdpuB1caPcm4QNXeegatVfLQ839Lmprd5zosXGwRUBJHwj66X").unwrap();

View File

@ -5,10 +5,8 @@ use multihash::Sha2_256;
use std::convert::TryInto;
fn main() {
env_logger::init();
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo

View File

@ -3,10 +3,8 @@ use ipfs::{IpfsOptions, IpfsPath, PeerId, TestTypes, UninitializedIpfs};
use std::str::FromStr;
fn main() {
env_logger::init();
let options = IpfsOptions::<TestTypes>::default();
env_logger::Builder::new()
.parse_filters(&options.ipfs_log)
.init();
task::block_on(async move {
// Start daemon and initialize repo

3
examples/src/main.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

View File

@ -134,7 +134,7 @@ fn main() {
rt.block_on(async move {
let opts: IpfsOptions<ipfs::TestTypes> =
IpfsOptions::new(home.clone().into(), keypair, Vec::new());
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false);
let (ipfs, task) = UninitializedIpfs::new(opts)
.await

View File

@ -47,19 +47,11 @@ where
.or(warp::path!("refs" / ..).and_then(not_implemented))
.or(warp::path!("repo" / ..).and_then(not_implemented))
.or(warp::path!("stats" / ..).and_then(not_implemented))
.or(warp::path!("swarm" / "connect")
.and(query::<swarm::ConnectQuery>())
.and_then(swarm::connect))
.or(warp::path!("swarm" / "peers")
.and(query::<swarm::PeersQuery>())
.and_then(swarm::peers))
.or(warp::path!("swarm" / "addrs").and_then(swarm::addrs))
.or(warp::path!("swarm" / "addrs" / "local")
.and(query::<swarm::AddrsLocalQuery>())
.and_then(swarm::addrs_local))
.or(warp::path!("swarm" / "disconnect")
.and(query::<swarm::DisconnectQuery>())
.and_then(swarm::disconnect))
.or(swarm::connect(ipfs))
.or(swarm::peers(ipfs))
.or(swarm::addrs(ipfs))
.or(swarm::addrs_local(ipfs))
.or(swarm::disconnect(ipfs))
.or(warp::path!("version")
.and(query::<version::Query>())
.and_then(version::version)),

View File

@ -1,103 +1,174 @@
use super::support::{with_ipfs, StringError};
use ipfs::{Ipfs, IpfsTypes, Multiaddr};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use warp::{query, Filter};
#[derive(Debug, Deserialize)]
pub struct ConnectQuery {
arg: String,
struct ConnectQuery {
arg: Multiaddr,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct ConnectResponse {
strings: Vec<String>,
}
pub async fn connect(_query: ConnectQuery) -> Result<impl warp::Reply, std::convert::Infallible> {
let response = ConnectResponse { strings: vec![] };
async fn connect_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: ConnectQuery,
) -> Result<impl warp::Reply, warp::Rejection> {
ipfs.connect(query.arg)
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
let response: &[&str] = &[];
Ok(warp::reply::json(&response))
}
pub fn connect<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("swarm" / "connect")
.and(with_ipfs(ipfs))
.and(query::<ConnectQuery>())
.and_then(connect_query)
}
#[derive(Debug, Deserialize)]
pub struct PeersQuery {
struct PeersQuery {
verbose: Option<bool>,
streams: Option<bool>,
latency: Option<bool>,
direction: Option<bool>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct PeersResponse {
struct PeersResponse {
peers: Vec<Peer>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct Peer {
struct Peer {
addr: String,
direction: u32,
latency: String,
muxer: String,
peer: String,
streams: Vec<Stream>,
latency: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct Stream {
protocol: String,
}
pub async fn peers(_query: PeersQuery) -> Result<impl warp::Reply, std::convert::Infallible> {
let response = PeersResponse { peers: vec![] };
async fn peers_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: PeersQuery,
) -> Result<impl warp::Reply, warp::Rejection> {
let peers = ipfs
.peers()
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?
.into_iter()
.map(|conn| {
let latency = if let Some(true) = query.verbose {
conn.rtt.map(|d| format!("{}ms", d.as_millis() / 2))
} else {
None
};
Peer {
addr: conn.address.to_string(),
peer: conn.peer_id.to_string(),
latency,
}
})
.collect();
let response = PeersResponse { peers };
Ok(warp::reply::json(&response))
}
pub fn peers<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("swarm" / "peers")
.and(with_ipfs(ipfs))
.and(query::<PeersQuery>())
.and_then(peers_query)
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct AddrsResponse {
struct AddrsResponse {
addrs: BTreeMap<String, Vec<String>>,
}
pub async fn addrs() -> Result<impl warp::Reply, std::convert::Infallible> {
let response = AddrsResponse {
addrs: BTreeMap::new(),
};
async fn addrs_query<T: IpfsTypes>(ipfs: Ipfs<T>) -> Result<impl warp::Reply, warp::Rejection> {
let addresses = ipfs
.addrs()
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
let mut res = BTreeMap::new();
for (peer_id, addrs) in addresses {
res.insert(
peer_id.to_string(),
addrs.into_iter().map(|a| a.to_string()).collect(),
);
}
let response = AddrsResponse { addrs: res };
Ok(warp::reply::json(&response))
}
pub fn addrs<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("swarm" / "addrs")
.and(with_ipfs(ipfs))
.and_then(addrs_query)
}
#[derive(Debug, Deserialize)]
pub struct AddrsLocalQuery {
struct AddrsLocalQuery {
id: Option<bool>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct AddrsLocalResponse {
struct AddrsLocalResponse {
strings: Vec<String>,
}
pub async fn addrs_local(
async fn addrs_local_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
_query: AddrsLocalQuery,
) -> Result<impl warp::Reply, std::convert::Infallible> {
let response = AddrsLocalResponse { strings: vec![] };
) -> Result<impl warp::Reply, warp::Rejection> {
let addresses = ipfs
.addrs_local()
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?
.into_iter()
.map(|a| a.to_string())
.collect();
let response = AddrsLocalResponse { strings: addresses };
Ok(warp::reply::json(&response))
}
pub fn addrs_local<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("swarm" / "addrs" / "local")
.and(with_ipfs(ipfs))
.and(query::<AddrsLocalQuery>())
.and_then(addrs_local_query)
}
#[derive(Debug, Deserialize)]
pub struct DisconnectQuery {
arg: String,
struct DisconnectQuery {
arg: Multiaddr,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct DisconnectResponse {
strings: Vec<String>,
}
pub async fn disconnect(
_query: DisconnectQuery,
) -> Result<impl warp::Reply, std::convert::Infallible> {
let response = DisconnectResponse { strings: vec![] };
async fn disconnect_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: DisconnectQuery,
) -> Result<impl warp::Reply, warp::Rejection> {
ipfs.disconnect(query.arg)
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
let response: &[&str] = &[];
Ok(warp::reply::json(&response))
}
pub fn disconnect<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("swarm" / "disconnect")
.and(with_ipfs(ipfs))
.and(query::<DisconnectQuery>())
.and_then(disconnect_query)
}

View File

@ -5,6 +5,7 @@ use rand::{rngs::OsRng, Rng};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
use thiserror::Error;
const BOOTSTRAP_NODES: &[&str] = &[
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
@ -70,10 +71,14 @@ impl fmt::Debug for KeyMaterial {
}
}
#[derive(Debug)]
enum KeyMaterialLoadingFailure {
Io(std::io::Error),
RsaDecoding(libp2p::identity::error::DecodingError),
#[derive(Debug, Error)]
pub enum KeyMaterialLoadingFailure {
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
RsaDecoding(#[from] libp2p::identity::error::DecodingError),
#[error("{0}")]
Config(#[from] serde_json::error::Error),
}
impl KeyMaterial {
@ -125,16 +130,16 @@ impl KeyMaterial {
}
impl ConfigFile {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
fs::read_to_string(&path)
.map(|content| Self::parse(&content))
.map(|parsed| parsed.into_loaded().unwrap())
.unwrap_or_else(|_| {
let config = ConfigFile::default();
config.store_at(path).unwrap();
config.into_loaded().unwrap()
})
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, KeyMaterialLoadingFailure> {
if path.as_ref().exists() {
let content = fs::read_to_string(&path)?;
let config = Self::parse(&content)?;
config.into_loaded()
} else {
let config = ConfigFile::default();
config.store_at(path)?;
config.into_loaded()
}
}
fn load(&mut self) -> Result<(), KeyMaterialLoadingFailure> {
@ -146,11 +151,12 @@ impl ConfigFile {
Ok(self)
}
fn parse(s: &str) -> Self {
serde_json::from_str(s).unwrap()
fn parse(s: &str) -> Result<Self, KeyMaterialLoadingFailure> {
Ok(serde_json::from_str(s)?)
}
pub fn store_at<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
fs::create_dir_all(path.as_ref().parent().unwrap())?;
let string = serde_json::to_string_pretty(self).unwrap();
fs::write(path, string)
}
@ -208,7 +214,7 @@ mod tests {
fn supports_older_v1_and_ed25519_v2() {
let input = r#"{"private_key":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"bootstrap":[]}"#;
let actual = ConfigFile::parse(input);
let actual = ConfigFile::parse(input).unwrap();
let roundtrip = serde_json::to_string(&actual).unwrap();
@ -219,7 +225,7 @@ mod tests {
fn supports_v2() {
let input = r#"{"rsa_pkcs8_filename":"foobar.pk8","bootstrap":[]}"#;
let actual = ConfigFile::parse(input);
let actual = ConfigFile::parse(input).unwrap();
let roundtrip = serde_json::to_string(&actual).unwrap();

View File

@ -6,7 +6,10 @@
#[macro_use]
extern crate log;
use anyhow::format_err;
use async_std::path::PathBuf;
use async_std::task;
pub use bitswap::Block;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
@ -15,10 +18,8 @@ use futures::stream::Fuse;
pub use libipld::cid::Cid;
use libipld::cid::Codec;
pub use libipld::ipld::Ipld;
pub use libp2p::{
identity::{Keypair, PublicKey},
Multiaddr, PeerId,
};
pub use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, PublicKey};
pub use libp2p::identity::Keypair;
use std::borrow::Borrow;
use std::fmt;
@ -35,12 +36,14 @@ pub mod ipns;
pub mod p2p;
pub mod path;
pub mod repo;
mod subscription;
pub mod unixfs;
use self::config::ConfigFile;
use self::dag::IpldDag;
pub use self::error::Error;
use self::ipns::Ipns;
pub use self::p2p::Connection;
pub use self::p2p::SwarmTypes;
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
pub use self::path::IpfsPath;
@ -48,11 +51,6 @@ pub use self::repo::RepoTypes;
use self::repo::{create_repo, Repo, RepoEvent, RepoOptions};
use self::unixfs::File;
static IPFS_LOG: &str = "info";
static IPFS_PATH: &str = ".rust-ipfs";
static XDG_APP_NAME: &str = "rust-ipfs";
static CONFIG_FILE: &str = "config.json";
/// All types can be changed at compile time by implementing
/// `IpfsTypes`.
pub trait IpfsTypes: SwarmTypes + RepoTypes {}
@ -84,14 +82,14 @@ impl RepoTypes for TestTypes {
#[derive(Clone)]
pub struct IpfsOptions<Types: IpfsTypes> {
_marker: PhantomData<Types>,
/// The ipfs log level that should be passed to env_logger.
pub ipfs_log: String,
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
pub keypair: Keypair,
/// Nodes dialed during startup
/// Nodes dialed during startup.
pub bootstrap: Vec<(Multiaddr, PeerId)>,
/// Enables mdns for peer discovery when true.
pub mdns: bool,
}
impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
@ -99,10 +97,10 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
// needed since libp2p::identity::Keypair does not have a Debug impl, and the IpfsOptions
// is a struct with all public fields, so don't enforce users to use this wrapper.
fmt.debug_struct("IpfsOptions")
.field("ipfs_log", &self.ipfs_log)
.field("ipfs_path", &self.ipfs_path)
.field("bootstrap", &self.bootstrap)
.field("keypair", &DebuggableKeypair(&self.keypair))
.field("mdns", &self.mdns)
.finish()
}
}
@ -131,75 +129,49 @@ impl<I: Borrow<Keypair>> DebuggableKeypair<I> {
}
impl<Types: IpfsTypes> IpfsOptions<Types> {
pub fn new(ipfs_path: PathBuf, keypair: Keypair, bootstrap: Vec<(Multiaddr, PeerId)>) -> Self {
pub fn new(
ipfs_path: PathBuf,
keypair: Keypair,
bootstrap: Vec<(Multiaddr, PeerId)>,
mdns: bool,
) -> Self {
Self {
_marker: PhantomData,
ipfs_log: String::from("trace"),
ipfs_path,
keypair,
bootstrap,
mdns,
}
}
fn secio_key_pair(&self) -> &Keypair {
&self.keypair
}
fn bootstrap(&self) -> &[(Multiaddr, PeerId)] {
&self.bootstrap
}
}
impl Default for IpfsOptions<Types> {
impl<T: IpfsTypes> Default for IpfsOptions<T> {
/// Create `IpfsOptions` from environment.
fn default() -> Self {
let ipfs_log = std::env::var("IPFS_LOG").unwrap_or_else(|_| IPFS_LOG.into());
let ipfs_path = std::env::var("IPFS_PATH")
.unwrap_or_else(|_| {
let mut ipfs_path = std::env::var("HOME").unwrap_or_else(|_| "".into());
ipfs_path.push_str("/");
ipfs_path.push_str(IPFS_PATH);
ipfs_path
})
.into();
let path = dirs::config_dir()
let ipfs_path = if let Ok(path) = std::env::var("IPFS_PATH") {
PathBuf::from(path)
} else {
let root = if let Some(home) = dirs::home_dir() {
home
} else {
std::env::current_dir().unwrap()
};
root.join(".rust-ipfs").into()
};
let config_path = dirs::config_dir()
.unwrap()
.join(XDG_APP_NAME)
.join(CONFIG_FILE);
let config = ConfigFile::new(path);
.join("rust-ipfs")
.join("config.json");
let config = ConfigFile::new(config_path).unwrap();
let keypair = config.secio_key_pair();
let bootstrap = config.bootstrap();
IpfsOptions {
_marker: PhantomData,
ipfs_log,
ipfs_path,
keypair,
bootstrap,
}
}
}
impl Default for IpfsOptions<TestTypes> {
/// Creates `IpfsOptions` for testing without reading or writing to the
/// file system.
fn default() -> Self {
let ipfs_log = std::env::var("IPFS_LOG").unwrap_or_else(|_| IPFS_LOG.into());
let ipfs_path = std::env::var("IPFS_PATH")
.unwrap_or_else(|_| IPFS_PATH.into())
.into();
let config = std::env::var("IPFS_TEST_CONFIG")
.map(ConfigFile::new)
.unwrap_or_default();
let keypair = config.secio_key_pair();
let bootstrap = config.bootstrap();
IpfsOptions {
_marker: PhantomData,
ipfs_log,
ipfs_path,
keypair,
bootstrap,
mdns: true,
}
}
}
@ -215,10 +187,22 @@ pub struct Ipfs<Types: IpfsTypes> {
to_task: Sender<IpfsEvent>,
}
type Channel<T> = OneshotSender<Result<T, Error>>;
/// Events used internally to communicate with the swarm, which is executed in the the background
/// task.
#[derive(Debug)]
enum IpfsEvent {
/// Connect
Connect(Multiaddr, Channel<()>),
/// Addresses
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
/// Local addresses
Listeners(Channel<Vec<Multiaddr>>),
/// Connections
Connections(Channel<Vec<Connection>>),
/// Disconnect
Disconnect(Multiaddr, Channel<()>),
/// Request background task to return the listened and external addresses
GetAddresses(OneshotSender<Vec<Multiaddr>>),
Exit,
@ -237,7 +221,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
/// Configures a new UninitializedIpfs with from the given options.
pub async fn new(options: IpfsOptions<Types>) -> Self {
let repo_options = RepoOptions::<Types>::from(&options);
let keys = options.secio_key_pair().clone();
let keys = options.keypair.clone();
let (repo, repo_events) = create_repo(repo_options);
let swarm_options = SwarmOptions::<Types>::from(&options);
let swarm = create_swarm(swarm_options, repo.clone()).await;
@ -352,6 +336,45 @@ impl<Types: IpfsTypes> Ipfs<Types> {
Ok(())
}
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connect(addr, tx))
.await?;
rx.await?
}
pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Addresses(tx)).await?;
rx.await?
}
pub async fn addrs_local(&self) -> Result<Vec<Multiaddr>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task.clone().send(IpfsEvent::Listeners(tx)).await?;
rx.await?
}
pub async fn peers(&self) -> Result<Vec<Connection>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Connections(tx))
.await?;
rx.await?
}
pub async fn disconnect(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::Disconnect(addr, tx))
.await?;
rx.await?
}
pub async fn identity(&self) -> Result<(PublicKey, Vec<Multiaddr>), Error> {
let (tx, rx) = oneshot_channel();
@ -398,6 +421,31 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
};
match inner {
IpfsEvent::Connect(addr, ret) => {
let fut = self.swarm.connect(addr);
task::spawn(async move {
let res = fut.await.map_err(|err| format_err!("{}", err));
ret.send(res).ok();
});
}
IpfsEvent::Addresses(ret) => {
let addrs = self.swarm.addrs();
ret.send(Ok(addrs)).ok();
}
IpfsEvent::Listeners(ret) => {
let listeners = Swarm::listeners(&self.swarm).cloned().collect();
ret.send(Ok(listeners)).ok();
}
IpfsEvent::Connections(ret) => {
let connections = self.swarm.connections();
ret.send(Ok(connections)).ok();
}
IpfsEvent::Disconnect(addr, ret) => {
if let Some(disconnector) = self.swarm.disconnect(addr) {
disconnector.disconnect(&mut self.swarm);
}
ret.send(Ok(())).ok();
}
IpfsEvent::GetAddresses(ret) => {
// perhaps this could be moved under `IpfsEvent` or free functions?
let mut addresses = Vec::new();

View File

@ -1,27 +1,38 @@
use super::swarm::{Connection, Disconnector, SwarmApi};
use crate::p2p::{SwarmOptions, SwarmTypes};
use crate::repo::Repo;
use crate::subscription::SubscriptionFuture;
use bitswap::{Bitswap, Strategy};
use libipld::cid::Cid;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::floodsub::{Floodsub, FloodsubEvent};
use libp2p::identify::{Identify, IdentifyEvent};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{Kademlia, KademliaEvent};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::ping::{Ping, PingEvent};
use libp2p::swarm::NetworkBehaviourEventProcess;
use libp2p::swarm::toggle::Toggle;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess};
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
use std::sync::Arc;
/// Behaviour type.
#[derive(NetworkBehaviour)]
pub struct Behaviour<TSwarmTypes: SwarmTypes> {
mdns: Mdns,
mdns: Toggle<Mdns>,
kademlia: Kademlia<MemoryStore>,
bitswap: Bitswap<TSwarmTypes::TStrategy>,
ping: Ping,
identify: Identify,
floodsub: Floodsub,
swarm: SwarmApi,
}
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<()> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, _event: ()) {}
}
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<void::Void> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, _event: void::Void) {}
}
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<MdnsEvent> for Behaviour<TSwarmTypes> {
@ -29,17 +40,14 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<MdnsEvent> for Behavi
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
debug!("mdns: Discovered peer {}", peer.to_base58());
self.bitswap.connect(peer.clone());
self.floodsub.add_node_to_partial_view(peer);
log::trace!("mdns: Discovered peer {}", peer.to_base58());
self.add_peer(peer);
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
debug!("mdns: Expired peer {}", peer.to_base58());
self.floodsub.remove_node_from_partial_view(&peer);
}
log::trace!("mdns: Expired peer {}", peer.to_base58());
self.remove_peer(&peer);
}
}
}
@ -54,18 +62,9 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<KademliaEvent>
match event {
KademliaEvent::Discovered { peer_id, ty, .. } => {
debug!("kad: Discovered peer {} {:?}", peer_id.to_base58(), ty);
log::trace!("kad: Discovered peer {} {:?}", peer_id.to_base58(), ty);
self.add_peer(peer_id);
}
// FIXME: unsure what this has been superceded with... perhaps with GetRecordResult?
/*
KademliaEvent::FindNodeResult { key, closer_peers } => {
if closer_peers.is_empty() {
info!("kad: Could not find closer peer to {}", key.to_base58());
}
for peer in closer_peers {
info!("kad: Found closer peer {} to {}", peer.to_base58(), key.to_base58());
}
}*/
KademliaEvent::GetProvidersResult(Ok(GetProvidersOk {
key,
providers,
@ -88,17 +87,13 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<KademliaEvent>
let cid = PeerId::from_bytes(key.to_vec()).unwrap().to_base58();
warn!("kad: timed out get providers query for {}", cid);
}
x => {
debug!("kad ignored event {:?}", x);
event => {
log::trace!("kad: {:?}", event);
}
}
}
}
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<()> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, _event: ()) {}
}
impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TSwarmTypes> {
fn inject_event(&mut self, event: PingEvent) {
use libp2p::ping::handler::{PingFailure, PingSuccess};
@ -107,29 +102,31 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<PingEvent> for Behavi
peer,
result: Result::Ok(PingSuccess::Ping { rtt }),
} => {
debug!(
log::trace!(
"ping: rtt to {} is {} ms",
peer.to_base58(),
rtt.as_millis()
);
self.swarm.set_rtt(&peer, rtt);
}
PingEvent {
peer,
result: Result::Ok(PingSuccess::Pong),
} => {
debug!("ping: pong from {}", peer.to_base58());
log::trace!("ping: pong from {}", peer.to_base58());
}
PingEvent {
peer,
result: Result::Err(PingFailure::Timeout),
} => {
warn!("ping: timeout to {}", peer.to_base58());
log::trace!("ping: timeout to {}", peer.to_base58());
self.remove_peer(&peer);
}
PingEvent {
peer,
result: Result::Err(PingFailure::Other { error }),
} => {
error!("ping: failure with {}: {}", peer.to_base58(), error);
log::error!("ping: failure with {}: {}", peer.to_base58(), error);
}
}
}
@ -139,7 +136,7 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<IdentifyEvent>
for Behaviour<TSwarmTypes>
{
fn inject_event(&mut self, event: IdentifyEvent) {
debug!("identify: {:?}", event);
log::trace!("identify: {:?}", event);
}
}
@ -147,7 +144,7 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<FloodsubEvent>
for Behaviour<TSwarmTypes>
{
fn inject_event(&mut self, event: FloodsubEvent) {
debug!("floodsub: {:?}", event);
log::trace!("floodsub: {:?}", event);
}
}
@ -156,10 +153,14 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
pub async fn new(options: SwarmOptions<TSwarmTypes>, repo: Arc<Repo<TSwarmTypes>>) -> Self {
info!("Local peer id: {}", options.peer_id.to_base58());
let mdns = Mdns::new().expect("Failed to create mDNS service");
let store = libp2p::kad::record::store::MemoryStore::new(options.peer_id.to_owned());
let mdns = if options.mdns {
Some(Mdns::new().expect("Failed to create mDNS service"))
} else {
None
}
.into();
let store = MemoryStore::new(options.peer_id.to_owned());
let mut kademlia = Kademlia::new(options.peer_id.to_owned(), store);
for (addr, peer_id) in &options.bootstrap {
kademlia.add_address(peer_id, addr.to_owned());
@ -171,9 +172,10 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
let identify = Identify::new(
"/ipfs/0.1.0".into(),
"rust-ipfs".into(),
options.key_pair.public(),
options.keypair.public(),
);
let floodsub = Floodsub::new(options.peer_id);
let swarm = SwarmApi::new();
Behaviour {
mdns,
@ -182,9 +184,44 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
ping,
identify,
floodsub,
swarm,
}
}
pub fn add_peer(&mut self, peer: PeerId) {
self.swarm.add_peer(peer.clone());
self.floodsub.add_node_to_partial_view(peer);
// TODO self.bitswap.add_node_to_partial_view(peer);
}
pub fn remove_peer(&mut self, peer: &PeerId) {
self.swarm.remove_peer(&peer);
self.floodsub.remove_node_from_partial_view(&peer);
// TODO self.bitswap.remove_peer(&peer);
}
pub fn addrs(&mut self) -> Vec<(PeerId, Vec<Multiaddr>)> {
let peers = self.swarm.peers().cloned().collect::<Vec<_>>();
let mut addrs = Vec::with_capacity(peers.len());
for peer_id in peers.into_iter() {
let peer_addrs = self.addresses_of_peer(&peer_id);
addrs.push((peer_id, peer_addrs));
}
addrs
}
pub fn connections(&self) -> Vec<Connection> {
self.swarm.connections().cloned().collect()
}
pub fn connect(&mut self, addr: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
self.swarm.connect(addr)
}
pub fn disconnect(&mut self, addr: Multiaddr) -> Option<Disconnector> {
self.swarm.disconnect(addr)
}
pub fn want_block(&mut self, cid: Cid) {
info!("Want block {}", cid.to_string());
//let hash = Multihash::from_bytes(cid.to_bytes()).unwrap();

View File

@ -9,8 +9,11 @@ use libp2p::{Multiaddr, PeerId};
use std::sync::Arc;
mod behaviour;
mod swarm;
mod transport;
pub use swarm::Connection;
pub type TSwarm<SwarmTypes> = Swarm<behaviour::Behaviour<SwarmTypes>>;
pub trait SwarmTypes: RepoTypes + Sized {
@ -19,21 +22,24 @@ pub trait SwarmTypes: RepoTypes + Sized {
pub struct SwarmOptions<TSwarmTypes: SwarmTypes> {
_marker: PhantomData<TSwarmTypes>,
pub key_pair: Keypair,
pub keypair: Keypair,
pub peer_id: PeerId,
pub bootstrap: Vec<(Multiaddr, PeerId)>,
pub mdns: bool,
}
impl<TSwarmTypes: SwarmTypes> From<&IpfsOptions<TSwarmTypes>> for SwarmOptions<TSwarmTypes> {
fn from(options: &IpfsOptions<TSwarmTypes>) -> Self {
let key_pair = options.secio_key_pair().clone();
let peer_id = key_pair.public().into_peer_id();
let bootstrap = options.bootstrap().to_vec();
let keypair = options.keypair.clone();
let peer_id = keypair.public().into_peer_id();
let bootstrap = options.bootstrap.clone();
let mdns = options.mdns;
SwarmOptions {
_marker: PhantomData,
key_pair,
keypair,
peer_id,
bootstrap,
mdns,
}
}
}
@ -46,7 +52,7 @@ pub async fn create_swarm<TSwarmTypes: SwarmTypes>(
let peer_id = options.peer_id.clone();
// Set up an encrypted TCP transport over the Mplex protocol.
let transport = transport::build_transport(&options);
let transport = transport::build_transport(options.keypair.clone());
// Create a Kademlia behaviour
let behaviour = behaviour::build_behaviour(options, repo).await;

218
src/p2p/swarm.rs Normal file
View File

@ -0,0 +1,218 @@
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use core::task::{Context, Poll};
use libp2p::core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p::swarm::protocols_handler::{
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, Swarm};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct Connection {
pub peer_id: PeerId,
pub address: Multiaddr,
pub rtt: Option<Duration>,
}
pub struct Disconnector {
peer_id: PeerId,
}
impl Disconnector {
pub fn disconnect<T: NetworkBehaviour>(self, swarm: &mut Swarm<T>) {
Swarm::ban_peer_id(swarm, self.peer_id.clone());
Swarm::unban_peer_id(swarm, self.peer_id);
}
}
#[derive(Debug, Default)]
pub struct SwarmApi {
events: VecDeque<NetworkBehaviourAction<void::Void, void::Void>>,
peers: HashSet<PeerId>,
connect_registry: SubscriptionRegistry<Multiaddr, Result<(), String>>,
connections: HashMap<Multiaddr, Connection>,
connected_peers: HashMap<PeerId, Multiaddr>,
}
impl SwarmApi {
pub fn new() -> Self {
Self::default()
}
pub fn add_peer(&mut self, peer_id: PeerId) {
self.peers.insert(peer_id);
}
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers.iter()
}
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.peers.remove(peer_id);
}
pub fn connections(&self) -> impl Iterator<Item = &Connection> {
self.connections.iter().map(|(_, conn)| conn)
}
pub fn set_rtt(&mut self, peer_id: &PeerId, rtt: Duration) {
if let Some(addr) = self.connected_peers.get(peer_id) {
if let Some(mut conn) = self.connections.get_mut(addr) {
conn.rtt = Some(rtt);
}
}
}
pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
log::trace!("connect {}", address.to_string());
self.events.push_back(NetworkBehaviourAction::DialAddress {
address: address.clone(),
});
self.connect_registry.create_subscription(address)
}
pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
log::trace!("disconnect {}", address.to_string());
self.connections.remove(&address);
let peer_id = self
.connections
.get(&address)
.map(|conn| conn.peer_id.clone());
if let Some(peer_id) = &peer_id {
self.connected_peers.remove(peer_id);
}
peer_id.map(|peer_id| Disconnector { peer_id })
}
}
impl NetworkBehaviour for SwarmApi {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
log::trace!("new_handler");
Default::default()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
log::trace!("addresses_of_peer {}", peer_id);
if let Some(addr) = self.connected_peers.get(peer_id).cloned() {
vec![addr]
} else {
Default::default()
}
}
fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
log::trace!("inject_connected {} {:?}", peer_id.to_string(), cp);
let addr = match cp {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
};
let conn = Connection {
peer_id: peer_id.clone(),
address: addr.clone(),
rtt: None,
};
self.peers.insert(peer_id.clone());
self.connected_peers.insert(peer_id, addr.clone());
self.connections.insert(addr.clone(), conn);
self.connect_registry.finish_subscription(&addr, Ok(()));
}
fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
log::trace!("inject_disconnected {} {:?}", peer_id.to_string(), cp);
let addr = match cp {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
};
self.connected_peers.remove(peer_id);
self.connections.remove(&addr);
}
fn inject_node_event(&mut self, _peer_id: PeerId, _event: void::Void) {}
fn inject_addr_reach_failure(
&mut self,
_peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
log::trace!("inject_addr_reach_failure {} {}", addr, error);
self.connect_registry
.finish_subscription(addr, Err(format!("{}", error)));
}
#[allow(clippy::type_complexity)]
fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<
Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
log::trace!("poll");
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::transport::{build_transport, TTransport};
use async_std::task;
use futures::channel::mpsc;
use futures::future::{select, FutureExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use libp2p::identity::Keypair;
use libp2p::swarm::Swarm;
#[test]
fn swarm_api() {
env_logger::init();
let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, SwarmApi::new(), peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, SwarmApi::new(), peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let peer1 = async move {
while let Some(_) = swarm1.next().now_or_never() {}
for l in Swarm::listeners(&swarm1) {
tx.send(l.clone()).await.unwrap();
}
loop {
swarm1.next().await;
}
};
let peer2 = async move {
let future = swarm2.connect(rx.next().await.unwrap());
let poll_swarm = async move {
loop {
swarm2.next().await;
}
};
select(Box::pin(future), Box::pin(poll_swarm)).await;
};
let result = select(Box::pin(peer1), Box::pin(peer2));
task::block_on(result);
}
fn mk_transport() -> (PeerId, TTransport) {
let key = Keypair::generate_ed25519();
let peer_id = key.public().into_peer_id();
let transport = build_transport(key);
(peer_id, transport)
}
}

View File

@ -1,7 +1,8 @@
use crate::p2p::{SwarmOptions, SwarmTypes};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::boxed::Boxed;
use libp2p::core::transport::upgrade::Version;
use libp2p::core::upgrade::SelectUpgrade;
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::secio::SecioConfig;
use libp2p::tcp::TcpConfig;
@ -16,18 +17,14 @@ pub(crate) type TTransport = Boxed<(PeerId, StreamMuxerBox), Error>;
/// Builds the transport that serves as a common ground for all connections.
///
/// Set up an encrypted TCP transport over the Mplex protocol.
pub fn build_transport<TSwarmTypes: SwarmTypes>(options: &SwarmOptions<TSwarmTypes>) -> TTransport {
let secio_config = SecioConfig::new(options.key_pair.to_owned());
let yamux_config = YamuxConfig::default();
let mplex_config = MplexConfig::new();
pub fn build_transport(key: Keypair) -> TTransport {
TcpConfig::new()
.nodelay(true)
.upgrade(Version::V1)
.authenticate(secio_config)
.multiplex(libp2p::core::upgrade::SelectUpgrade::new(
yamux_config,
mplex_config,
.authenticate(SecioConfig::new(key))
.multiplex(SelectUpgrade::new(
YamuxConfig::default(),
MplexConfig::new(),
))
.timeout(Duration::from_secs(20))
.map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer)))

View File

@ -1,21 +1,18 @@
//! IPFS repo
use crate::error::Error;
use crate::path::IpfsPath;
use crate::subscription::SubscriptionRegistry;
use crate::IpfsOptions;
use async_std::path::PathBuf;
use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bitswap::Block;
use core::fmt::Debug;
use core::future::Future;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll, Waker};
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::sink::SinkExt;
use libipld::cid::Cid;
use libp2p::PeerId;
use std::collections::HashMap;
use libp2p::core::PeerId;
pub mod fs;
pub mod mem;
@ -79,7 +76,7 @@ pub struct Repo<TRepoTypes: RepoTypes> {
block_store: TRepoTypes::TBlockStore,
data_store: TRepoTypes::TDataStore,
events: Sender<RepoEvent>,
subscriptions: Mutex<HashMap<Cid, Arc<Mutex<Subscription>>>>,
subscriptions: Mutex<SubscriptionRegistry<Cid, Block>>,
}
#[derive(Clone, Debug)]
@ -134,50 +131,49 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
/// Puts a block into the block store.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
let cid = self.block_store.put(block.clone()).await?;
if let Some(subscription) = self.subscriptions.lock().await.remove(&cid) {
subscription.lock().await.wake(block);
}
// sending only fails if the background task has exited
// TODO: not sure if this cloning of the sender is ok, this will probably be unbounded
// memory usage, but it requires &mut for some reason (for example tokios sender does not
// need one)
let _ = self
.events
self.subscriptions
.lock()
.await
.finish_subscription(&cid, block);
// sending only fails if no one is listening anymore
// and that is okay with us.
self.events
.clone()
.send(RepoEvent::ProvideBlock(cid.clone()))
.await;
.await
.ok();
Ok(cid)
}
/// Retrives a block from the block store.
pub async fn get_block(&self, cid: &Cid) -> Result<Block, Error> {
let subscription = if let Some(block) = self.block_store.get(&cid.clone()).await? {
Arc::new(Mutex::new(Subscription::ready(block)))
if let Some(block) = self.block_store.get(&cid).await? {
Ok(block)
} else {
// sending only fails if the background task has exited
let _ = self
.events
.clone()
.send(RepoEvent::WantBlock(cid.clone()))
.await;
self.subscriptions
let subscription = self
.subscriptions
.lock()
.await
.entry(cid.clone())
.or_default()
.create_subscription(cid.clone());
// sending only fails if no one is listening anymore
// and that is okay with us.
self.events
.clone()
};
Ok(BlockFuture { subscription }.await)
.send(RepoEvent::WantBlock(cid.clone()))
.await
.ok();
Ok(subscription.await)
}
}
/// Remove block from the block store.
pub async fn remove_block(&self, cid: &Cid) -> Result<(), Error> {
// sending only fails if the background task has exited
let _ = self
.events
self.events
.clone()
.send(RepoEvent::UnprovideBlock(cid.to_owned()))
.await;
.await
.ok();
self.block_store.remove(cid).await?;
Ok(())
}
@ -226,60 +222,6 @@ impl<T: RepoTypes> bitswap::BitswapStore for Repo<T> {
}
}
#[derive(Debug, Default)]
struct Subscription {
block: Option<Block>,
wakers: Vec<Waker>,
}
impl Subscription {
pub fn ready(block: Block) -> Self {
Self {
block: Some(block),
wakers: vec![],
}
}
pub fn add_waker(&mut self, waker: Waker) {
self.wakers.push(waker);
}
pub fn result(&self) -> Option<Block> {
self.block.clone()
}
pub fn wake(&mut self, block: Block) {
self.block = Some(block);
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
pub struct BlockFuture {
subscription: Arc<Mutex<Subscription>>,
}
impl Future for BlockFuture {
type Output = Block;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let future = self.subscription.lock();
futures::pin_mut!(future);
match future.poll(context) {
Poll::Ready(mut subscription) => {
if let Some(result) = subscription.result() {
Poll::Ready(result)
} else {
subscription.add_waker(context.waker().clone());
Poll::Pending
}
}
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;

108
src/subscription.rs Normal file
View File

@ -0,0 +1,108 @@
use async_std::future::Future;
use async_std::task::{Context, Poll, Waker};
use core::fmt::Debug;
use core::hash::Hash;
use core::pin::Pin;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct SubscriptionRegistry<TReq: Debug + Eq + Hash, TRes: Debug> {
subscriptions: HashMap<TReq, Arc<Mutex<Subscription<TRes>>>>,
}
impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
pub fn new() -> Self {
Self {
subscriptions: Default::default(),
}
}
pub fn create_subscription(&mut self, req: TReq) -> SubscriptionFuture<TRes> {
let subscription = self.subscriptions.entry(req).or_default().clone();
SubscriptionFuture { subscription }
}
pub fn finish_subscription(&mut self, req: &TReq, res: TRes) {
if let Some(subscription) = self.subscriptions.remove(req) {
subscription.lock().unwrap().wake(res);
}
}
}
impl<TReq: Debug + Eq + Hash, TRes: Debug> Default for SubscriptionRegistry<TReq, TRes> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct Subscription<TResult> {
result: Option<TResult>,
wakers: Vec<Waker>,
}
impl<TResult> Subscription<TResult> {
pub fn new() -> Self {
Self {
result: Default::default(),
wakers: Default::default(),
}
}
pub fn add_waker(&mut self, waker: Waker) {
self.wakers.push(waker);
}
pub fn wake(&mut self, result: TResult) {
self.result = Some(result);
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
impl<TResult: Clone> Subscription<TResult> {
pub fn result(&self) -> Option<TResult> {
self.result.clone()
}
}
impl<TResult> Default for Subscription<TResult> {
fn default() -> Self {
Self::new()
}
}
pub struct SubscriptionFuture<TResult> {
subscription: Arc<Mutex<Subscription<TResult>>>,
}
impl<TResult: Clone> Future for SubscriptionFuture<TResult> {
type Output = TResult;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscription = self.subscription.lock().unwrap();
if let Some(result) = subscription.result() {
Poll::Ready(result)
} else {
subscription.add_waker(context.waker().clone());
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[async_std::test]
async fn subscription() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let s1 = registry.create_subscription(0);
let s2 = registry.create_subscription(0);
registry.finish_subscription(&0, 10);
assert_eq!(s1.await, 10);
assert_eq!(s2.await, 10);
}
}