307: Use tokio instead of async_std r=koivunej a=ljedrz

We already use it in `ipfs-http`, our dependencies either already use it or are compatible with it and it has some nice features that we'd like to use.

Note: the `async` tests now specify `max_threads = 1` in order to make sure the multi-threaded executor is not being lenient about any hidden bugs.

fixes https://github.com/rs-ipfs/rust-ipfs/issues/275

Co-authored-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
bors[bot] 2020-08-13 15:04:45 +00:00 committed by GitHub
commit 143c3dad93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 256 additions and 323 deletions

73
Cargo.lock generated
View File

@ -78,12 +78,6 @@ version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b602bfe940d21c130f3895acd65221e8a61270debe89d628b9cb4e3ccb8569b"
[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "arrayref"
version = "0.3.6"
@ -115,16 +109,6 @@ dependencies = [
"syn",
]
[[package]]
name = "async-attributes"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "async-channel"
version = "1.4.0"
@ -142,7 +126,6 @@ version = "1.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00d68a33ebc8b57800847d00787307f84a562224a14db069b0acefe4c2abbf5d"
dependencies = [
"async-attributes",
"async-task",
"crossbeam-utils",
"futures-channel",
@ -255,7 +238,6 @@ checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
name = "bitswap"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"cid",
"fnv",
@ -266,6 +248,7 @@ dependencies = [
"prost",
"prost-build",
"thiserror",
"tokio",
"tracing",
"unsigned-varint 0.3.3",
]
@ -1354,7 +1337,6 @@ name = "ipfs"
version = "0.1.0"
dependencies = [
"anyhow",
"async-std",
"async-stream",
"async-trait",
"base64 0.12.3",
@ -1378,6 +1360,7 @@ dependencies = [
"serde_json",
"sha2 0.9.1",
"thiserror",
"tokio",
"tracing",
"tracing-futures",
"tracing-subscriber",
@ -1785,7 +1768,6 @@ version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b1fa2bbad054020cb875546a577a66a65a5bf42eff55ed5265f92ffee3cc052"
dependencies = [
"async-std",
"futures 0.3.5",
"futures-timer",
"get_if_addrs",
@ -1793,6 +1775,7 @@ dependencies = [
"libp2p-core",
"log",
"socket2",
"tokio",
]
[[package]]
@ -1907,35 +1890,12 @@ dependencies = [
"kernel32-sys",
"libc",
"log",
"miow 0.2.1",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio-named-pipes"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
dependencies = [
"log",
"mio",
"miow 0.3.5",
"winapi 0.3.9",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
]
[[package]]
name = "miow"
version = "0.2.1"
@ -1948,16 +1908,6 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e"
dependencies = [
"socket2",
"winapi 0.3.9",
]
[[package]]
name = "mpart-async"
version = "0.4.1"
@ -2783,16 +2733,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
dependencies = [
"arc-swap",
"libc",
]
[[package]]
name = "signature"
version = "1.2.2"
@ -3051,17 +2991,12 @@ dependencies = [
"futures-core",
"iovec",
"lazy_static",
"libc",
"memchr",
"mio",
"mio-named-pipes",
"mio-uds",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"slab",
"tokio-macros",
"winapi 0.3.9",
]
[[package]]

View File

@ -11,7 +11,6 @@ nightly = []
[dependencies]
anyhow = { default-features = false, version = "1.0" }
async-std = { default-features = false, features = ["attributes", "std"], version = "1.6" }
async-stream = { default-features = false, version = "0.3" }
async-trait = { default-features = false, version = "0.1" }
base64 = { default-features = false, features = ["alloc"], version = "0.12" }
@ -24,7 +23,7 @@ domain = { default-features = false, version = "0.5" }
domain-resolv = { default-features = false, version = "0.5" }
futures = { default-features = false, features = ["compat", "io-compat"], version = "0.3.5" }
ipfs-unixfs = { path = "unixfs" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-async-std", "mdns", "mplex", "noise", "ping", "yamux"], version = "0.23" }
libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mdns", "mplex", "noise", "ping", "yamux"], version = "0.23" }
multibase = { default-features = false, version = "0.8" }
multihash = { default-features = false, version = "0.11" }
prost = { default-features = false, version = "0.6" }
@ -32,6 +31,7 @@ rand = { default-features = false, features = ["getrandom"], version = "0.7" }
serde = { default-features = false, features = ["derive"], version = "1.0" }
serde_json = { default-features = false, features = ["std"], version = "1.0" }
thiserror = { default-features = false, version = "1.0" }
tokio = { default-features = false, features = ["fs", "rt-threaded", "stream"], version = "0.2" }
tracing = { default-features = false, features = ["log"], version = "0.1" }
tracing-futures = { default-features = false, features = ["std", "futures-03"], version = "0.2" }
void = { default-features = false, version = "1.0" }
@ -42,6 +42,7 @@ prost-build = { default-features = false, version = "0.6" }
[dev-dependencies]
hex-literal = { default-features = false, version = "0.3" }
sha2 = { default-features = false, version = "0.9" }
tokio = { default-features = false, features = ["io-std"], version = "0.2" }
tracing-subscriber = { default-features = false, features = ["fmt", "tracing-log", "ansi", "env-filter"], version = "0.2" }
[workspace]

View File

@ -77,38 +77,37 @@ _Note: binaries available via `cargo install` is coming soon._
## Getting started
```rust,no_run
use async_std::task;
use tokio::task;
use futures::join;
use ipfs::{make_ipld, Ipfs, IpfsPath, Ipld, Types, UninitializedIpfs};
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
task::block_on(async move {
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<Types>, _) = UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<Types>, _) = UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
// Create a DAG
let f1 = ipfs.put_dag(make_ipld!("block1"));
let f2 = ipfs.put_dag(make_ipld!("block2"));
let (res1, res2) = join!(f1, f2);
let root = make_ipld!([res1.unwrap(), res2.unwrap()]);
let cid = ipfs.put_dag(root).await.unwrap();
let path = IpfsPath::from(cid);
// Create a DAG
let f1 = ipfs.put_dag(make_ipld!("block1"));
let f2 = ipfs.put_dag(make_ipld!("block2"));
let (res1, res2) = join!(f1, f2);
let root = make_ipld!([res1.unwrap(), res2.unwrap()]);
let cid = ipfs.put_dag(root).await.unwrap();
let path = IpfsPath::from(cid);
// Query the DAG
let path1 = path.sub_path("0").unwrap();
let path2 = path.sub_path("1").unwrap();
let f1 = ipfs.get_dag(path1);
let f2 = ipfs.get_dag(path2);
let (res1, res2) = join!(f1, f2);
println!("Received block with contents: {:?}", res1.unwrap());
println!("Received block with contents: {:?}", res2.unwrap());
// Query the DAG
let path1 = path.sub_path("0").unwrap();
let path2 = path.sub_path("1").unwrap();
let f1 = ipfs.get_dag(path1);
let f2 = ipfs.get_dag(path2);
let (res1, res2) = join!(f1, f2);
println!("Received block with contents: {:?}", res1.unwrap());
println!("Received block with contents: {:?}", res2.unwrap());
// Exit
ipfs.exit_daemon();
});
// Exit
ipfs.exit_daemon();
}
```

View File

@ -8,7 +8,6 @@ version = "0.1.0"
prost-build = { default-features = false, version = "0.6" }
[dependencies]
async-std = { default-features = false, version = "1.6" }
async-trait = { default-features = false, version = "0.1" }
cid = { default-features = false, version = "0.5" }
fnv = { default-features = false, version = "1.0" }
@ -18,5 +17,6 @@ libp2p-swarm = { default-features = false, version = "0.20" }
multihash = { default-features = false, version = "0.11" }
prost = { default-features = false, version = "0.6" }
thiserror = { default-features = false, version = "1.0" }
tokio = { default-features = false, version = "0.2" }
tracing = { default-features = false, version = "0.1" }
unsigned-varint = { default-features = false, version = "0.3" }

View File

@ -1,22 +1,21 @@
use async_std::task;
use futures::join;
use ipfs::{make_ipld, Ipfs, TestTypes, UninitializedIpfs};
use tokio::task;
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
task::block_on(async move {
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
let f1 = ipfs.put_dag(make_ipld!("block1"));
let f2 = ipfs.put_dag(make_ipld!("block2"));
let (res1, res2) = join!(f1, f2);
let f1 = ipfs.put_dag(make_ipld!("block1"));
let f2 = ipfs.put_dag(make_ipld!("block2"));
let (res1, res2) = join!(f1, f2);
let root = make_ipld!([res1.unwrap(), res2.unwrap()]);
ipfs.put_dag(root).await.unwrap();
let root = make_ipld!([res1.unwrap(), res2.unwrap()]);
ipfs.put_dag(root).await.unwrap();
ipfs.exit_daemon().await;
});
ipfs.exit_daemon().await;
}

View File

@ -1,25 +1,24 @@
use async_std::task;
use futures::join;
use ipfs::{Ipfs, IpfsPath, TestTypes, UninitializedIpfs};
use std::str::FromStr;
use tokio::task;
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let path =
IpfsPath::from_str("/ipfs/zdpuB1caPcm4QNXeegatVfLQ839Lmprd5zosXGwRUBJHwj66X").unwrap();
task::block_on(async move {
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
let f1 = ipfs.get_dag(path.sub_path("0").unwrap());
let f2 = ipfs.get_dag(path.sub_path("1").unwrap());
let (res1, res2) = join!(f1, f2);
println!("Received block with contents: {:?}", res1.unwrap());
println!("Received block with contents: {:?}", res2.unwrap());
let f1 = ipfs.get_dag(path.sub_path("0").unwrap());
let f2 = ipfs.get_dag(path.sub_path("1").unwrap());
let (res1, res2) = join!(f1, f2);
println!("Received block with contents: {:?}", res1.unwrap());
println!("Received block with contents: {:?}", res2.unwrap());
ipfs.exit_daemon().await;
});
ipfs.exit_daemon().await;
}

View File

@ -1,15 +1,16 @@
#![recursion_limit = "512"]
use cid::Cid;
use futures::io::AsyncWriteExt;
use futures::pin_mut;
use futures::stream::StreamExt; // needed for StreamExt::next
use ipfs::{Ipfs, TestTypes, UninitializedIpfs};
use std::convert::TryFrom;
use std::env;
use std::process::exit;
use tokio::io::AsyncWriteExt;
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// this example will wait forever attempting to fetch a CID provided at command line. It is
@ -36,45 +37,43 @@ fn main() {
}
};
async_std::task::block_on(async move {
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
async_std::task::spawn(fut);
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
tokio::task::spawn(fut);
let (public_key, addresses) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
let (public_key, addresses) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
eprintln!("Please connect an ipfs node having {} to:\n", cid);
eprintln!("Please connect an ipfs node having {} to:\n", cid);
let peer_id = public_key.into_peer_id().to_string();
let peer_id = public_key.into_peer_id().to_string();
for address in addresses {
eprintln!(" - {}/p2p/{}", address, peer_id);
}
for address in addresses {
eprintln!(" - {}/p2p/{}", address, peer_id);
}
eprintln!();
eprintln!();
let stream = ipfs.cat_unixfs(cid, None).await.unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
// The stream needs to be pinned on the stack to be used with StreamExt::next
pin_mut!(stream);
let mut stdout = async_std::io::stdout();
let stream = ipfs.cat_unixfs(cid, None).await.unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
// The stream needs to be pinned on the stack to be used with StreamExt::next
pin_mut!(stream);
let mut stdout = tokio::io::stdout();
loop {
// This could be made more performant by polling the stream while writing to stdout.
match stream.next().await {
Some(Ok(bytes)) => {
stdout.write_all(&bytes).await.unwrap();
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
exit(1);
}
None => break,
loop {
// This could be made more performant by polling the stream while writing to stdout.
match stream.next().await {
Some(Ok(bytes)) => {
stdout.write_all(&bytes).await.unwrap();
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
exit(1);
}
None => break,
}
})
}
}

View File

@ -1,65 +1,67 @@
#![recursion_limit = "512"]
use async_std::task;
use cid::{Cid, Codec};
use ipfs::{Block, Ipfs, TestTypes, UninitializedIpfs};
use multihash::Sha2_256;
use tokio::{
io::{stdin, AsyncBufReadExt, BufReader},
task,
};
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// this example demonstrates
// - block building
// - local swarm communication with go-ipfs
task::block_on(async move {
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
let data = b"block-want\n".to_vec().into_boxed_slice();
let wanted = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let data = b"block-want\n".to_vec().into_boxed_slice();
let wanted = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let (public_key, addresses) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
let (public_key, addresses) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
eprintln!("Please connect an ipfs node having {} to:\n", wanted);
eprintln!("Please connect an ipfs node having {} to:\n", wanted);
let peer_id = public_key.into_peer_id().to_string();
let peer_id = public_key.into_peer_id().to_string();
for address in addresses {
eprintln!(" - {}/p2p/{}", address, peer_id);
}
for address in addresses {
eprintln!(" - {}/p2p/{}", address, peer_id);
}
eprintln!();
eprintln!("The block wanted in this example can be created on the other node:");
eprintln!(" echo block-want | ipfs block put -f raw");
eprintln!();
eprintln!();
eprintln!("The block wanted in this example can be created on the other node:");
eprintln!(" echo block-want | ipfs block put -f raw");
eprintln!();
// Create a Block
let data = b"block-provide\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let provided = ipfs.put_block(Block::new(data, cid)).await.unwrap();
// Create a Block
let data = b"block-provide\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let provided = ipfs.put_block(Block::new(data, cid)).await.unwrap();
eprintln!(
"After connecting the node, it can be used to get block: {}",
provided
);
eprintln!("This should print out \"block-provide\\n\":");
eprintln!(" ipfs block get {}", provided);
eprintln!();
eprintln!(
"After connecting the node, it can be used to get block: {}",
provided
);
eprintln!("This should print out \"block-provide\\n\":");
eprintln!(" ipfs block get {}", provided);
eprintln!();
// Retrive a Block
let block = ipfs.get_block(&wanted).await.unwrap();
let contents = std::str::from_utf8(block.data()).unwrap();
eprintln!("Block retrieved: {:?}", contents);
// Retrive a Block
let block = ipfs.get_block(&wanted).await.unwrap();
let contents = std::str::from_utf8(block.data()).unwrap();
eprintln!("Block retrieved: {:?}", contents);
eprintln!();
eprintln!("Press enter or CTRL-C to exit this example.");
eprintln!();
eprintln!("Press enter or CTRL-C to exit this example.");
let _ = async_std::io::stdin().read_line(&mut String::new()).await;
let _ = BufReader::new(stdin()).read_line(&mut String::new()).await;
ipfs.exit_daemon().await;
});
ipfs.exit_daemon().await;
}

View File

@ -1,37 +1,36 @@
use async_std::task;
use ipfs::{Ipfs, IpfsPath, PeerId, TestTypes, UninitializedIpfs};
use std::str::FromStr;
use tokio::task;
fn main() {
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
task::block_on(async move {
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
task::spawn(fut);
// Create a Block
let cid = ipfs.put_dag("block v0".into()).await.unwrap();
let ipfs_path = IpfsPath::from(cid);
// Publish a Block
let ipns_path = ipfs
.publish_ipns(&PeerId::random(), &ipfs_path)
.await
.unwrap();
// Create a Block
let cid = ipfs.put_dag("block v0".into()).await.unwrap();
let ipfs_path = IpfsPath::from(cid);
// Publish a Block
let ipns_path = ipfs
.publish_ipns(&PeerId::random(), &ipfs_path)
.await
.unwrap();
// Resolve a Block
let new_ipfs_path = ipfs.resolve_ipns(&ipns_path).await.unwrap();
assert_eq!(ipfs_path, new_ipfs_path);
// Resolve a Block
let new_ipfs_path = ipfs.resolve_ipns(&ipns_path).await.unwrap();
assert_eq!(ipfs_path, new_ipfs_path);
// Resolve dnslink
let ipfs_path = IpfsPath::from_str("/ipns/ipfs.io").unwrap();
println!("Resolving {:?}", ipfs_path.to_string());
let ipfs_path = ipfs.resolve_ipns(&ipfs_path).await.unwrap();
println!("Resolved stage 1: {:?}", ipfs_path.to_string());
let ipfs_path = ipfs.resolve_ipns(&ipfs_path).await.unwrap();
println!("Resolved stage 2: {:?}", ipfs_path.to_string());
// Resolve dnslink
let ipfs_path = IpfsPath::from_str("/ipns/ipfs.io").unwrap();
println!("Resolving {:?}", ipfs_path.to_string());
let ipfs_path = ipfs.resolve_ipns(&ipfs_path).await.unwrap();
println!("Resolved stage 1: {:?}", ipfs_path.to_string());
let ipfs_path = ipfs.resolve_ipns(&ipfs_path).await.unwrap();
println!("Resolved stage 2: {:?}", ipfs_path.to_string());
ipfs.exit_daemon().await;
});
ipfs.exit_daemon().await;
}

View File

@ -32,7 +32,7 @@ serde_json = { default-features = false, version = "1.0" }
structopt = { default-features = false, version = "0.3" }
tar = { default-features = false, version = "0.4" }
thiserror = { default-features = false, version = "1.0" }
tokio = { default-features = false, features = ["full"], version = "0.2" }
tokio = { default-features = false, version = "0.2" }
tracing = { default-features = false, features = ["log"], version = "0.1" }
tracing-subscriber = { default-features = false, features = ["fmt", "tracing-log", "env-filter"], version = "0.2" }
url = { default-features = false, version = "2.1" }

View File

@ -134,8 +134,7 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");
rt.block_on(async move {
let opts: IpfsOptions =
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None);
let opts: IpfsOptions = IpfsOptions::new(home.clone(), keypair, Vec::new(), false, None);
let (ipfs, task): (Ipfs<ipfs::TestTypes>, _) = UninitializedIpfs::new(opts, None)
.await

View File

@ -175,7 +175,7 @@ mod tests {
routes(&ipfs, shutdown_tx)
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn not_found_as_plaintext() {
let routes = testing_routes().await;
let resp = warp::test::request()
@ -189,7 +189,7 @@ mod tests {
assert_eq!(resp.body(), "404 page not found");
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn invalid_peer_id_as_messageresponse() {
let routes = testing_routes().await;
let resp = warp::test::request()

View File

@ -536,7 +536,7 @@ mod tests {
})
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn url_hacked_args() {
let response = request()
.path("/pubsub/pub?arg=some_channel&arg=foobar")
@ -546,7 +546,7 @@ mod tests {
assert_eq!(body, r#"{"message":"foobar","topic":"some_channel"}"#);
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn message_in_body() {
let response = request()
.path("/pubsub/pub?arg=some_channel")

View File

@ -626,7 +626,7 @@ mod tests {
use std::collections::HashSet;
use std::convert::TryFrom;
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn test_inner_local() {
let filter = local(&*preloaded_testing_ipfs().await);
@ -673,7 +673,7 @@ mod tests {
assert!(diff.is_empty(), "{:?}", diff);
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn all_refs_from_root() {
let Node { ipfs, bg_task: _bt } = preloaded_testing_ipfs().await;
@ -714,7 +714,7 @@ mod tests {
assert_edges(&expected, all_edges.as_slice());
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn all_unique_refs_from_root() {
let Node { ipfs, bg_task: _bt } = preloaded_testing_ipfs().await;
@ -757,7 +757,7 @@ mod tests {
assert!(diff.is_empty(), "{:?}", diff);
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn refs_with_path() {
let ipfs = preloaded_testing_ipfs().await;

View File

@ -316,7 +316,7 @@ mod tests {
}
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn very_long_file_and_symlink_names() {
let ipfs = Node::new("test_node").await;
@ -372,7 +372,7 @@ mod tests {
assert_eq!(found, expected);
}
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn get_multiblock_file() {
let ipfs = Node::new("test_node").await;

View File

@ -377,7 +377,7 @@ impl<D: fmt::Display> serde::Serialize for Quoted<D> {
mod tests {
use crate::v0::root_files::add;
#[tokio::test]
#[tokio::test(max_threads = 1)]
async fn add_single_block_file() {
let ipfs = tokio_ipfs().await;

View File

@ -95,7 +95,7 @@ mod tests {
use super::*;
use crate::{make_ipld, Node};
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_resolve_root_cid() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
@ -105,7 +105,7 @@ mod tests {
assert_eq!(res, data);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_resolve_array_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
@ -118,7 +118,7 @@ mod tests {
assert_eq!(res, make_ipld!(2));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_resolve_nested_array_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
@ -131,7 +131,7 @@ mod tests {
assert_eq!(res, make_ipld!(2));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_resolve_object_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
@ -146,7 +146,7 @@ mod tests {
assert_eq!(res, make_ipld!(false));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_resolve_cid_elem() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);

View File

@ -79,19 +79,20 @@ pub async fn resolve(domain: &str) -> Result<IpfsPath, Error> {
mod tests {
use super::*;
#[async_std::test]
#[tokio::test(max_threads = 1)]
#[ignore]
async fn test_resolve1() {
let res = resolve("ipfs.io").await.unwrap().to_string();
assert_eq!(res, "/ipns/website.ipfs.io");
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
#[ignore]
async fn test_resolve2() {
let res = resolve("website.ipfs.io").await.unwrap().to_string();
// FIXME: perhaps this should just be a path to multihash? otherwise it'll
// break every time they update the site.
assert_eq!(res, "/ipfs/QmbV3st6TDZVocp4H2f4KE3tvLP1BEpeRHhZyFL9gD4Ut4");
assert_eq!(
res,
"/ipfs/bafybeiayvrj27f65vbecspbnuavehcb3znvnt2strop2rfbczupudoizya"
);
}
}

View File

@ -8,7 +8,6 @@ extern crate tracing;
pub use crate::ipld::Ipld;
use anyhow::{anyhow, format_err};
use async_std::path::PathBuf;
pub use bitswap::{BitswapEvent, Block, Stats};
pub use cid::Cid;
use cid::Codec;
@ -18,6 +17,7 @@ use futures::sink::SinkExt;
use futures::stream::{Fuse, Stream};
pub use libp2p::core::{connection::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey};
pub use libp2p::identity::Keypair;
use std::path::PathBuf;
use tracing::Span;
use tracing_futures::Instrument;
@ -107,7 +107,7 @@ impl IpfsOptions {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys() -> Self {
Self {
ipfs_path: std::env::temp_dir().into(),
ipfs_path: std::env::temp_dir(),
keypair: Keypair::generate_ed25519(),
mdns: Default::default(),
bootstrap: Default::default(),
@ -186,7 +186,7 @@ impl Default for IpfsOptions {
} else {
std::env::current_dir().unwrap()
};
root.join(".rust-ipfs").into()
root.join(".rust-ipfs")
};
let config_path = dirs::config_dir()
.unwrap()
@ -1047,7 +1047,7 @@ mod node {
/// easier.
pub struct Node {
pub ipfs: Ipfs<TestTypes>,
pub bg_task: async_std::task::JoinHandle<()>,
pub bg_task: tokio::task::JoinHandle<()>,
}
impl Node {
@ -1069,7 +1069,7 @@ mod node {
.await
.unwrap();
let bg_task = async_std::task::spawn(fut.in_current_span());
let bg_task = tokio::task::spawn(fut.in_current_span());
Node { ipfs, bg_task }
}
@ -1124,7 +1124,7 @@ mod node {
pub async fn shutdown(self) {
self.ipfs.exit_daemon().await;
self.bg_task.await;
let _ = self.bg_task.await;
}
}
@ -1260,7 +1260,7 @@ mod tests {
));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_put_and_get_block() {
let ipfs = Node::new("test_node").await;
@ -1273,7 +1273,7 @@ mod tests {
assert_eq!(block, new_block);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_put_and_get_dag() {
let ipfs = Node::new("test_node").await;
@ -1283,7 +1283,7 @@ mod tests {
assert_eq!(data, new_data);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_pin_and_unpin() {
let ipfs = Node::new("test_node").await;

View File

@ -5,7 +5,6 @@ use crate::repo::BlockPut;
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
use crate::{Ipfs, IpfsTypes};
use anyhow::anyhow;
use async_std::task;
use bitswap::{Bitswap, BitswapEvent};
use cid::Cid;
use libp2p::core::{Multiaddr, PeerId};
@ -19,6 +18,7 @@ use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourEventProcess};
use libp2p::NetworkBehaviour;
use multibase::Base;
use std::sync::Arc;
use tokio::task;
/// Behaviour type.
#[derive(NetworkBehaviour)]

View File

@ -74,6 +74,6 @@ impl libp2p::core::Executor for SpannedExecutor {
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
) {
use tracing_futures::Instrument;
async_std::task::spawn(future.instrument(self.0.clone()));
tokio::task::spawn(future.instrument(self.0.clone()));
}
}

View File

@ -339,7 +339,7 @@ mod tests {
));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn swarm_api() {
let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, SwarmApi::default(), peer1_id);

View File

@ -5,7 +5,7 @@ use libp2p::core::upgrade::SelectUpgrade;
use libp2p::identity;
use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig};
use libp2p::tcp::TcpConfig;
use libp2p::tcp::TokioTcpConfig;
use libp2p::yamux::Config as YamuxConfig;
use libp2p::{PeerId, Transport};
use std::io::{Error, ErrorKind};
@ -23,7 +23,7 @@ pub fn build_transport(keypair: identity::Keypair) -> TTransport {
.unwrap();
let noise_config = NoiseConfig::xx(xx_keypair).into_authenticated();
TcpConfig::new()
TokioTcpConfig::new()
.nodelay(true)
.upgrade(Version::V1)
.authenticate(noise_config)

View File

@ -1,9 +1,6 @@
//! Persistent fs backed repo
use crate::error::Error;
use crate::repo::{BlockPut, BlockStore};
use async_std::fs;
use async_std::path::PathBuf;
use async_std::prelude::*;
use async_trait::async_trait;
use bitswap::Block;
use cid::Cid;
@ -12,6 +9,11 @@ use futures::lock::Mutex;
use futures::stream::StreamExt;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::path::PathBuf;
use tokio::{
fs,
io::{AsyncReadExt, AsyncWriteExt},
};
use super::{BlockRm, BlockRmError, RepoCid};
@ -145,12 +147,12 @@ mod tests {
use multihash::Sha2_256;
use std::env::temp_dir;
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_fs_blockstore() {
let mut tmp = temp_dir();
tmp.push("blockstore1");
std::fs::remove_dir_all(tmp.clone()).ok();
let store = FsBlockStore::new(tmp.clone().into());
let store = FsBlockStore::new(tmp.clone());
let data = b"1".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
@ -183,7 +185,7 @@ mod tests {
std::fs::remove_dir_all(tmp).ok();
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_fs_blockstore_open() {
let mut tmp = temp_dir();
tmp.push("blockstore2");
@ -193,14 +195,14 @@ mod tests {
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let block = Block::new(data, cid);
let block_store = FsBlockStore::new(tmp.clone().into());
let block_store = FsBlockStore::new(tmp.clone());
block_store.init().await.unwrap();
block_store.open().await.unwrap();
assert!(!block_store.contains(block.cid()).await.unwrap());
block_store.put(block.clone()).await.unwrap();
let block_store = FsBlockStore::new(tmp.clone().into());
let block_store = FsBlockStore::new(tmp.clone());
block_store.open().await.unwrap();
assert!(block_store.contains(block.cid()).await.unwrap());
assert_eq!(block_store.get(block.cid()).await.unwrap().unwrap(), block);
@ -208,13 +210,13 @@ mod tests {
std::fs::remove_dir_all(&tmp).ok();
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_fs_blockstore_list() {
let mut tmp = temp_dir();
tmp.push("blockstore_list");
std::fs::remove_dir_all(&tmp).ok();
let block_store = FsBlockStore::new(tmp.clone().into());
let block_store = FsBlockStore::new(tmp.clone());
block_store.init().await.unwrap();
block_store.open().await.unwrap();

View File

@ -1,11 +1,11 @@
//! Volatile memory backed repo
use crate::error::Error;
use crate::repo::{BlockPut, BlockStore, Column, DataStore};
use async_std::path::PathBuf;
use async_trait::async_trait;
use bitswap::Block;
use cid::Cid;
use futures::lock::Mutex;
use std::path::PathBuf;
use super::{BlockRm, BlockRmError, RepoCid};
@ -154,10 +154,10 @@ mod tests {
use multihash::Sha2_256;
use std::env::temp_dir;
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_mem_blockstore() {
let tmp = temp_dir();
let store = MemBlockStore::new(tmp.into());
let store = MemBlockStore::new(tmp);
let data = b"1".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let block = Block::new(data, cid.clone());
@ -187,10 +187,10 @@ mod tests {
assert_eq!(get.await.unwrap(), None);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_mem_blockstore_list() {
let tmp = temp_dir();
let mem_store = MemBlockStore::new(tmp.into());
let mem_store = MemBlockStore::new(tmp);
mem_store.init().await.unwrap();
mem_store.open().await.unwrap();
@ -210,10 +210,10 @@ mod tests {
}
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_mem_datastore() {
let tmp = temp_dir();
let store = MemDataStore::new(tmp.into());
let store = MemDataStore::new(tmp);
let col = Column::Ipns;
let key = [1, 2, 3, 4];
let value = [5, 6, 7, 8];

View File

@ -3,7 +3,6 @@ use crate::error::Error;
use crate::path::IpfsPath;
use crate::subscription::{RequestKind, SubscriptionFuture, SubscriptionRegistry};
use crate::IpfsOptions;
use async_std::path::PathBuf;
use async_trait::async_trait;
use bitswap::Block;
use cid::{self, Cid};
@ -16,6 +15,7 @@ use futures::channel::{
use futures::sink::SinkExt;
use libp2p::core::PeerId;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
pub mod fs;
pub mod mem;
@ -366,11 +366,11 @@ pub(crate) mod tests {
pub fn create_mock_repo() -> (Repo<Types>, Receiver<RepoEvent>) {
let mut tmp = temp_dir();
tmp.push("rust-ipfs-repo");
let options: RepoOptions = RepoOptions { path: tmp.into() };
let options: RepoOptions = RepoOptions { path: tmp };
Repo::new(options)
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn test_repo() {
let (repo, _) = create_mock_repo();
repo.init().await.unwrap();

View File

@ -4,13 +4,12 @@
//! sharing the same unique numeric identifier, the `SubscriptionId`.
use crate::{p2p::ConnectionTarget, RepoEvent};
use async_std::future::Future;
use async_std::task::{Context, Poll, Waker};
use cid::Cid;
use core::fmt::Debug;
use core::hash::Hash;
use core::pin::Pin;
use futures::channel::mpsc::Sender;
use futures::future::Future;
use libp2p::{kad::QueryId, Multiaddr, PeerId};
use std::collections::HashMap;
use std::convert::TryFrom;
@ -20,6 +19,7 @@ use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
};
use std::task::{Context, Poll, Waker};
// a counter used to assign unique identifiers to `Subscription`s and `SubscriptionFuture`s
// (which obtain the same number as their counterpart `Subscription`)
@ -443,7 +443,7 @@ mod tests {
}
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn subscription_basics() {
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
@ -455,7 +455,7 @@ mod tests {
assert_eq!(s3.await.unwrap(), 10);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn subscription_cancelled_on_dropping_registry() {
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
@ -463,7 +463,7 @@ mod tests {
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn subscription_cancelled_on_shutdown() {
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = registry.create_subscription(0.into(), None);
@ -471,7 +471,7 @@ mod tests {
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn new_subscriptions_cancelled_after_shutdown() {
let registry = SubscriptionRegistry::<u32, ()>::default();
registry.shutdown();
@ -479,10 +479,10 @@ mod tests {
assert_eq!(s1.await, Err(SubscriptionErr::Cancelled));
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn dropping_subscription_future_after_registering() {
use async_std::future::timeout;
use std::time::Duration;
use tokio::time::timeout;
let registry = SubscriptionRegistry::<u32, ()>::default();
let s1 = timeout(

View File

@ -1,12 +1,12 @@
use async_std::task;
use ipfs::Node;
use tokio::time;
async fn wait(millis: u64) {
task::spawn(task::sleep(std::time::Duration::from_millis(millis))).await;
time::delay_for(std::time::Duration::from_millis(millis)).await;
}
// Ensure that the Bitswap object doesn't leak.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn check_bitswap_cleanups() {
// create a few nodes
let node_a = Node::new("a").await;

View File

@ -12,7 +12,7 @@ fn filter(i: usize) -> bool {
// testing the bitswap protocol (though it would be advised to uncomment
// the tracing_subscriber for stress-testing purposes)
#[ignore]
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn bitswap_stress_test() {
tracing_subscriber::fmt::init();

View File

@ -1,9 +1,9 @@
use async_std::future::timeout;
use ipfs::{ConnectionTarget, Node};
use std::time::Duration;
use tokio::time::timeout;
// Make sure two instances of ipfs can be connected by `Multiaddr`.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_two_nodes_by_addr() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -22,7 +22,7 @@ async fn connect_two_nodes_by_addr() {
// order to connect by PeerId) already performs a dial to the
// given peer within Pubsub::add_node_to_partial_view it calls
#[ignore]
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_two_nodes_by_peer_id() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -40,7 +40,7 @@ async fn connect_two_nodes_by_peer_id() {
}
// Make sure two instances of ipfs can be connected with a multiaddr+peer combo.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_two_nodes_by_addr_and_peer() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -59,7 +59,7 @@ async fn connect_two_nodes_by_addr_and_peer() {
}
// Ensure that duplicate connection attempts don't cause hangs.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_duplicate_multiaddr() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -77,7 +77,7 @@ async fn connect_duplicate_multiaddr() {
}
// Ensure that duplicate connection attempts don't cause hangs.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_duplicate_peer_id() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;
@ -98,7 +98,7 @@ async fn connect_duplicate_peer_id() {
// More complicated one to the above; first node will have two listening addresses and the second
// one should dial both of the addresses, resulting in two connections.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn connect_two_nodes_with_two_connections_doesnt_panic() {
let node_a = Node::new("a").await;
let node_b = Node::new("b").await;

View File

@ -1,10 +1,10 @@
use async_std::future::timeout;
use cid::{Cid, Codec};
use ipfs::{Block, Node};
use multihash::Sha2_256;
use std::time::Duration;
use tokio::time::timeout;
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn exchange_block() {
tracing_subscriber::fmt::init();

View File

@ -1,10 +1,10 @@
use async_std::future::timeout;
use cid::Cid;
use ipfs::{IpfsOptions, Node};
use libp2p::{Multiaddr, PeerId};
use std::time::Duration;
use tokio::time::timeout;
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn kademlia_local_peer_discovery() {
const BOOTSTRAPPER_COUNT: usize = 20;
@ -54,7 +54,7 @@ async fn kademlia_local_peer_discovery() {
}
#[ignore = "targets an actual bootstrapper, so random failures can happen"]
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn kademlia_popular_content_discovery() {
let (bootstrapper_id, bootstrapper_addr): (PeerId, Multiaddr) = (
"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"

View File

@ -1,4 +1,4 @@
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn multiple_consecutive_ephemeral_listening_addresses() {
let node = ipfs::Node::new("test_node").await;
@ -12,7 +12,7 @@ async fn multiple_consecutive_ephemeral_listening_addresses() {
assert_ne!(first, second);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() {
let node = ipfs::Node::new("test_node").await;
@ -41,7 +41,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() {
);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
#[cfg(not(target_os = "macos"))]
async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() {
let node = ipfs::Node::new("test_node").await;
@ -59,7 +59,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() {
second.unwrap();
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn adding_unspecified_addr_resolves_with_first() {
let node = ipfs::Node::new("test_node").await;
// there is no test in trying to match this with others as ... that would be quite
@ -69,7 +69,7 @@ async fn adding_unspecified_addr_resolves_with_first() {
.unwrap();
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn listening_for_multiple_unspecified_addresses() {
let node = ipfs::Node::new("test_node").await;
// there is no test in trying to match this with others as ... that would be quite
@ -93,7 +93,7 @@ async fn listening_for_multiple_unspecified_addresses() {
);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn remove_listening_address() {
let node = ipfs::Node::new("test_node").await;

View File

@ -1,16 +1,17 @@
use async_std::future::{pending, timeout};
use futures::future::pending;
use futures::stream::StreamExt;
use ipfs::{Node, PeerId};
use std::time::Duration;
use tokio::time::timeout;
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn subscribe_only_once() {
let a = Node::new("test_node").await;
let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap();
a.pubsub_subscribe("some_topic".into()).await.unwrap_err();
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn resubscribe_after_unsubscribe() {
let a = Node::new("test_node").await;
@ -22,7 +23,7 @@ async fn resubscribe_after_unsubscribe() {
drop(a.pubsub_subscribe("topic".into()).await.unwrap());
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn unsubscribe_via_drop() {
let a = Node::new("test_node").await;
@ -35,7 +36,7 @@ async fn unsubscribe_via_drop() {
assert_eq!(a.pubsub_subscribed().await.unwrap(), empty);
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn can_publish_without_subscribing() {
let a = Node::new("test_node").await;
a.pubsub_publish("topic".into(), b"foobar".to_vec())
@ -43,7 +44,7 @@ async fn can_publish_without_subscribing() {
.unwrap()
}
#[async_std::test]
#[tokio::test(max_threads = 1)]
#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet
async fn publish_between_two_nodes() {
use futures::stream::StreamExt;

View File

@ -1,11 +1,8 @@
use async_std::{
future::{pending, timeout},
task,
};
use cid::Cid;
use futures::future::{select, Either, FutureExt};
use futures::future::{pending, select, Either, FutureExt};
use futures::future::{AbortHandle, Abortable};
use ipfs::Node;
use tokio::{task, time::timeout};
use std::{
convert::TryFrom,
@ -50,7 +47,7 @@ async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize)
}
/// Check if canceling a Cid affects the wantlist.
#[async_std::test]
#[tokio::test(max_threads = 1)]
async fn wantlist_cancellation() {
tracing_subscriber::fmt::init();