From 0140c673b849b48d4fb8ab49d4380f3debfa2424 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Thu, 30 Jul 2020 12:18:46 +0200 Subject: [PATCH] feat: attach tracing to nodes Co-authored-by: Joonas Koivunen Signed-off-by: ljedrz --- http/src/main.rs | 10 ++++-- http/src/v0.rs | 2 +- http/src/v0/refs.rs | 2 +- http/src/v0/root_files.rs | 4 +-- http/src/v0/root_files/add.rs | 2 +- src/lib.rs | 48 ++++++++++++++++++--------- tests/connect_two.rs | 8 ++--- tests/exchange_block.rs | 4 +-- tests/kademlia.rs | 8 ++--- tests/multiple_listening_addresses.rs | 12 +++---- tests/pubsub.rs | 12 +++---- tests/wantlist_and_cancellation.rs | 2 +- 12 files changed, 69 insertions(+), 45 deletions(-) diff --git a/http/src/main.rs b/http/src/main.rs index 80a7eebc..aa0b4412 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -134,8 +134,14 @@ fn main() { let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop"); rt.block_on(async move { - let opts: IpfsOptions = - IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None); + let opts: IpfsOptions = IpfsOptions::new( + "local_node", + home.clone().into(), + keypair, + Vec::new(), + false, + None, + ); let (ipfs, task) = UninitializedIpfs::new(opts) .await diff --git a/http/src/v0.rs b/http/src/v0.rs index a99c8a2a..8117bfd5 100644 --- a/http/src/v0.rs +++ b/http/src/v0.rs @@ -119,7 +119,7 @@ mod tests { use super::routes; use ipfs::{IpfsOptions, UninitializedIpfs}; - let options = IpfsOptions::inmemory_with_generated_keys(); + let options = IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); drop(fut); diff --git a/http/src/v0/refs.rs b/http/src/v0/refs.rs index 2621759e..e6001b19 100644 --- a/http/src/v0/refs.rs +++ b/http/src/v0/refs.rs @@ -806,7 +806,7 @@ mod tests { } async fn preloaded_testing_ipfs() -> Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files.rs b/http/src/v0/root_files.rs index 8f8a4d1d..742272da 100644 --- a/http/src/v0/root_files.rs +++ b/http/src/v0/root_files.rs @@ -334,7 +334,7 @@ mod tests { #[tokio::test] async fn very_long_file_and_symlink_names() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() @@ -395,7 +395,7 @@ mod tests { #[tokio::test] async fn get_multiblock_file() { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, _) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/http/src/v0/root_files/add.rs b/http/src/v0/root_files/add.rs index 5cd76651..8efeea9c 100644 --- a/http/src/v0/root_files/add.rs +++ b/http/src/v0/root_files/add.rs @@ -199,7 +199,7 @@ mod tests { } async fn testing_ipfs() -> ipfs::Ipfs { - let options = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, fut) = ipfs::UninitializedIpfs::new(options) .await .start() diff --git a/src/lib.rs b/src/lib.rs index a48691f3..2cfd7b4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ use futures::stream::{Fuse, Stream}; pub use libipld::ipld::Ipld; pub use libp2p::core::{connection::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey}; pub use libp2p::identity::Keypair; +use tracing_futures::Instrument; use std::borrow::Borrow; use std::collections::HashMap; @@ -83,6 +84,8 @@ impl RepoTypes for TestTypes { #[derive(Clone)] pub struct IpfsOptions { _marker: PhantomData, + /// The name of the node. + pub name: String, /// The path of the ipfs repo. pub ipfs_path: PathBuf, /// The keypair used with libp2p. @@ -111,8 +114,9 @@ impl fmt::Debug for IpfsOptions { impl IpfsOptions { /// Creates an inmemory store backed node for tests - pub fn inmemory_with_generated_keys() -> Self { + pub fn inmemory_with_generated_keys>(name: T) -> Self { Self { + name: name.as_ref().to_owned(), _marker: PhantomData, ipfs_path: std::env::temp_dir().into(), keypair: Keypair::generate_ed25519(), @@ -147,7 +151,8 @@ impl> DebuggableKeypair { } impl IpfsOptions { - pub fn new( + pub fn new>( + name: T, ipfs_path: PathBuf, keypair: Keypair, bootstrap: Vec<(Multiaddr, PeerId)>, @@ -155,12 +160,13 @@ impl IpfsOptions { kad_protocol: Option, ) -> Self { Self { - _marker: PhantomData, + name: name.as_ref().to_owned(), ipfs_path, keypair, bootstrap, mdns, kad_protocol, + _marker: PhantomData, } } } @@ -205,12 +211,13 @@ impl Default for IpfsOptions { let bootstrap = config.bootstrap(); IpfsOptions { - _marker: PhantomData, + name: "local_node".into(), ipfs_path, keypair, bootstrap, mdns: true, kad_protocol: None, + _marker: PhantomData, } } } @@ -228,6 +235,7 @@ impl Clone for Ipfs { /// for interacting with IPFS. #[derive(Debug)] pub struct IpfsInner { + name: String, repo: Repo, keys: DebuggableKeypair, to_task: Sender, @@ -292,9 +300,7 @@ impl UninitializedIpfs { /// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the /// future should be spawned on a executor as soon as possible. - pub async fn start( - mut self, - ) -> Result<(Ipfs, impl std::future::Future), Error> { + pub async fn start(mut self) -> Result<(Ipfs, impl Future), Error> { use futures::stream::StreamExt; let repo = Option::take(&mut self.repo).unwrap(); @@ -310,6 +316,7 @@ impl UninitializedIpfs { let UninitializedIpfs { keys, .. } = self; let ipfs = Ipfs(Arc::new(IpfsInner { + name: self.options.name.clone(), repo, keys: DebuggableKeypair(keys), to_task, @@ -337,6 +344,12 @@ impl std::ops::Deref for Ipfs { } } +macro_rules! traced { + ($name:expr, $op:expr) => {{ + $op.instrument(tracing::trace_span!("facade", node = $name.as_str())) + }}; +} + impl Ipfs { fn dag(&self) -> IpldDag { IpldDag::new(self.clone()) @@ -348,13 +361,13 @@ impl Ipfs { /// Puts a block into the ipfs repo. pub async fn put_block(&self, block: Block) -> Result { - Ok(self.repo.put_block(block).await?.0) + Ok(traced!(&self.name, self.repo.put_block(block)).await?.0) } /// Retrieves a block from the local blockstore, or starts fetching from the network or join an /// already started fetch. pub async fn get_block(&self, cid: &Cid) -> Result { - Ok(self.repo.get_block(cid).await?) + Ok(traced!(&self.name, self.repo.get_block(cid)).await?) } /// Remove block from the ipfs repo. @@ -963,23 +976,28 @@ mod node { } impl Node { - pub async fn new() -> Self { - let opts = IpfsOptions::inmemory_with_generated_keys(); + pub async fn new>(name: T) -> Self { + let opts = IpfsOptions::inmemory_with_generated_keys(name); Node::with_options(opts).await } pub async fn with_options(opts: IpfsOptions) -> Self { + let name = opts.name.clone(); let (ipfs, fut) = UninitializedIpfs::new(opts) + .instrument(tracing::trace_span!("init", node = name.as_str())) .await .start() + .instrument(tracing::trace_span!("start", node = name.as_str())) .await - .expect("Inmemory instance must succeed start"); + .unwrap(); - let jh = async_std::task::spawn(fut); + let background_task = async_std::task::spawn( + fut.instrument(tracing::trace_span!("bgtask", node = name.as_str())), + ); Node { ipfs, - background_task: jh, + background_task, } } @@ -1095,7 +1113,7 @@ mod tests { use multihash::Sha2_256; pub async fn create_mock_ipfs() -> Ipfs { - let options = IpfsOptions::inmemory_with_generated_keys(); + let options = IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); task::spawn(fut); diff --git a/tests/connect_two.rs b/tests/connect_two.rs index 6bc23c96..aae32a0d 100644 --- a/tests/connect_two.rs +++ b/tests/connect_two.rs @@ -6,7 +6,7 @@ async fn connect_two_nodes() { let (tx, rx) = futures::channel::oneshot::channel(); let node_a = task::spawn(async move { - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() @@ -27,7 +27,7 @@ async fn connect_two_nodes() { println!("got back from the other node: {:?}", other_addrs); - let opts = ipfs::IpfsOptions::inmemory_with_generated_keys(); + let opts = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node"); let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts) .await .start() @@ -64,8 +64,8 @@ async fn connect_two_nodes() { // one should dial both of the addresses, resulting in two connections. #[async_std::test] async fn connect_two_nodes_with_two_connections_doesnt_panic() { - let node_a = ipfs::Node::new().await; - let node_b = ipfs::Node::new().await; + let node_a = ipfs::Node::new("a").await; + let node_b = ipfs::Node::new("b").await; node_a .add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16))) diff --git a/tests/exchange_block.rs b/tests/exchange_block.rs index 4515ba1d..e170d631 100644 --- a/tests/exchange_block.rs +++ b/tests/exchange_block.rs @@ -11,8 +11,8 @@ async fn exchange_block() { let data = b"hello block\n".to_vec().into_boxed_slice(); let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data)); - let a = Node::new().await; - let b = Node::new().await; + let a = Node::new("a").await; + let b = Node::new("b").await; let (_, mut addrs) = b.identity().await.unwrap(); diff --git a/tests/kademlia.rs b/tests/kademlia.rs index b728b4c0..9ada01b0 100644 --- a/tests/kademlia.rs +++ b/tests/kademlia.rs @@ -10,8 +10,8 @@ async fn kademlia_local_peer_discovery() { // start up PEER_COUNT bootstrapper nodes let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT); - for _ in 0..BOOTSTRAPPER_COUNT { - bootstrappers.push(Node::new().await); + for i in 0..BOOTSTRAPPER_COUNT { + bootstrappers.push(Node::new(format!("bootstrapper_{}", i)).await); } // register the bootstrappers' ids and addresses @@ -37,7 +37,7 @@ async fn kademlia_local_peer_discovery() { } // introduce a peer and connect it to one of the bootstrappers - let peer = Node::new().await; + let peer = Node::new("peer").await; assert!(peer .add_peer( bootstrapper_ids[0].0.clone(), @@ -64,7 +64,7 @@ async fn kademlia_popular_content_discovery() { // introduce a peer and specify the Kademlia protocol to it // without a specified protocol, the test will not complete - let mut opts = IpfsOptions::inmemory_with_generated_keys(); + let mut opts = IpfsOptions::inmemory_with_generated_keys("test_node"); opts.kad_protocol = Some("/ipfs/lan/kad/1.0.0".to_owned()); let peer = Node::with_options(opts).await; diff --git a/tests/multiple_listening_addresses.rs b/tests/multiple_listening_addresses.rs index 25333676..a373c816 100644 --- a/tests/multiple_listening_addresses.rs +++ b/tests/multiple_listening_addresses.rs @@ -1,6 +1,6 @@ #[async_std::test] async fn multiple_consecutive_ephemeral_listening_addresses() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); @@ -14,7 +14,7 @@ async fn multiple_consecutive_ephemeral_listening_addresses() { #[async_std::test] async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); @@ -44,7 +44,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() { #[async_std::test] #[cfg(not(target_os = "macos"))] async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; // it doesnt work on mac os x as 127.0.0.2 is not enabled by default. let first = @@ -61,7 +61,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() { #[async_std::test] async fn adding_unspecified_addr_resolves_with_first() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; // there is no test in trying to match this with others as ... that would be quite // perilous. node.add_listening_address(libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16))) @@ -71,7 +71,7 @@ async fn adding_unspecified_addr_resolves_with_first() { #[async_std::test] async fn listening_for_multiple_unspecified_addresses() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; // there is no test in trying to match this with others as ... that would be quite // perilous. let target = libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16)); @@ -95,7 +95,7 @@ async fn listening_for_multiple_unspecified_addresses() { #[async_std::test] async fn remove_listening_address() { - let node = ipfs::Node::new().await; + let node = ipfs::Node::new("test_node").await; let unbound = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)); let first = node.add_listening_address(unbound.clone()).await.unwrap(); diff --git a/tests/pubsub.rs b/tests/pubsub.rs index edca4755..6adf62d6 100644 --- a/tests/pubsub.rs +++ b/tests/pubsub.rs @@ -5,14 +5,14 @@ use std::time::Duration; #[async_std::test] async fn subscribe_only_once() { - let a = Node::new().await; + let a = Node::new("test_node").await; let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap(); a.pubsub_subscribe("some_topic".into()).await.unwrap_err(); } #[async_std::test] async fn resubscribe_after_unsubscribe() { - let a = Node::new().await; + let a = Node::new("test_node").await; let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap(); a.pubsub_unsubscribe("topic").await.unwrap(); @@ -24,7 +24,7 @@ async fn resubscribe_after_unsubscribe() { #[async_std::test] async fn unsubscribe_via_drop() { - let a = Node::new().await; + let a = Node::new("test_node").await; let msgs = a.pubsub_subscribe("topic".into()).await.unwrap(); assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]); @@ -37,7 +37,7 @@ async fn unsubscribe_via_drop() { #[async_std::test] async fn can_publish_without_subscribing() { - let a = Node::new().await; + let a = Node::new("test_node").await; a.pubsub_publish("topic".into(), b"foobar".to_vec()) .await .unwrap() @@ -132,8 +132,8 @@ async fn publish_between_two_nodes() { } async fn two_connected_nodes() -> ((Node, PeerId), (Node, PeerId)) { - let a = Node::new().await; - let b = Node::new().await; + let a = Node::new("a").await; + let b = Node::new("b").await; let (a_pk, _) = a.identity().await.unwrap(); let a_id = a_pk.into_peer_id(); diff --git a/tests/wantlist_and_cancellation.rs b/tests/wantlist_and_cancellation.rs index 6a1859fe..a79d5608 100644 --- a/tests/wantlist_and_cancellation.rs +++ b/tests/wantlist_and_cancellation.rs @@ -51,7 +51,7 @@ async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize) #[async_std::test] async fn wantlist_cancellation() { // start a single node - let ipfs = Node::new().await; + let ipfs = Node::new("test_node").await; // execute a get_block request let cid = Cid::try_from("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KaGa").unwrap();