feat: attach tracing to nodes

Co-authored-by: Joonas Koivunen <joonas@equilibrium.co>
Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
ljedrz 2020-07-30 12:18:46 +02:00
parent c55298b6a4
commit 0140c673b8
12 changed files with 69 additions and 45 deletions

View File

@ -134,8 +134,14 @@ fn main() {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create event loop");
rt.block_on(async move {
let opts: IpfsOptions<ipfs::TestTypes> =
IpfsOptions::new(home.clone().into(), keypair, Vec::new(), false, None);
let opts: IpfsOptions<ipfs::TestTypes> = IpfsOptions::new(
"local_node",
home.clone().into(),
keypair,
Vec::new(),
false,
None,
);
let (ipfs, task) = UninitializedIpfs::new(opts)
.await

View File

@ -119,7 +119,7 @@ mod tests {
use super::routes;
use ipfs::{IpfsOptions, UninitializedIpfs};
let options = IpfsOptions::inmemory_with_generated_keys();
let options = IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
drop(fut);

View File

@ -806,7 +806,7 @@ mod tests {
}
async fn preloaded_testing_ipfs() -> Ipfs<ipfs::TestTypes> {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()

View File

@ -334,7 +334,7 @@ mod tests {
#[tokio::test]
async fn very_long_file_and_symlink_names() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
@ -395,7 +395,7 @@ mod tests {
#[tokio::test]
async fn get_multiblock_file() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()

View File

@ -199,7 +199,7 @@ mod tests {
}
async fn testing_ipfs() -> ipfs::Ipfs<ipfs::TestTypes> {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys();
let options = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, fut) = ipfs::UninitializedIpfs::new(options)
.await
.start()

View File

@ -20,6 +20,7 @@ use futures::stream::{Fuse, Stream};
pub use libipld::ipld::Ipld;
pub use libp2p::core::{connection::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey};
pub use libp2p::identity::Keypair;
use tracing_futures::Instrument;
use std::borrow::Borrow;
use std::collections::HashMap;
@ -83,6 +84,8 @@ impl RepoTypes for TestTypes {
#[derive(Clone)]
pub struct IpfsOptions<Types: IpfsTypes> {
_marker: PhantomData<Types>,
/// The name of the node.
pub name: String,
/// The path of the ipfs repo.
pub ipfs_path: PathBuf,
/// The keypair used with libp2p.
@ -111,8 +114,9 @@ impl<Types: IpfsTypes> fmt::Debug for IpfsOptions<Types> {
impl IpfsOptions<TestTypes> {
/// Creates an inmemory store backed node for tests
pub fn inmemory_with_generated_keys() -> Self {
pub fn inmemory_with_generated_keys<T: AsRef<str>>(name: T) -> Self {
Self {
name: name.as_ref().to_owned(),
_marker: PhantomData,
ipfs_path: std::env::temp_dir().into(),
keypair: Keypair::generate_ed25519(),
@ -147,7 +151,8 @@ impl<I: Borrow<Keypair>> DebuggableKeypair<I> {
}
impl<Types: IpfsTypes> IpfsOptions<Types> {
pub fn new(
pub fn new<T: AsRef<str>>(
name: T,
ipfs_path: PathBuf,
keypair: Keypair,
bootstrap: Vec<(Multiaddr, PeerId)>,
@ -155,12 +160,13 @@ impl<Types: IpfsTypes> IpfsOptions<Types> {
kad_protocol: Option<String>,
) -> Self {
Self {
_marker: PhantomData,
name: name.as_ref().to_owned(),
ipfs_path,
keypair,
bootstrap,
mdns,
kad_protocol,
_marker: PhantomData,
}
}
}
@ -205,12 +211,13 @@ impl<T: IpfsTypes> Default for IpfsOptions<T> {
let bootstrap = config.bootstrap();
IpfsOptions {
_marker: PhantomData,
name: "local_node".into(),
ipfs_path,
keypair,
bootstrap,
mdns: true,
kad_protocol: None,
_marker: PhantomData,
}
}
}
@ -228,6 +235,7 @@ impl<Types: IpfsTypes> Clone for Ipfs<Types> {
/// for interacting with IPFS.
#[derive(Debug)]
pub struct IpfsInner<Types: IpfsTypes> {
name: String,
repo: Repo<Types>,
keys: DebuggableKeypair<Keypair>,
to_task: Sender<IpfsEvent>,
@ -292,9 +300,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
/// Initialize the ipfs node. The returned `Ipfs` value is cloneable, send and sync, and the
/// future should be spawned on a executor as soon as possible.
pub async fn start(
mut self,
) -> Result<(Ipfs<Types>, impl std::future::Future<Output = ()>), Error> {
pub async fn start(mut self) -> Result<(Ipfs<Types>, impl Future<Output = ()>), Error> {
use futures::stream::StreamExt;
let repo = Option::take(&mut self.repo).unwrap();
@ -310,6 +316,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
let UninitializedIpfs { keys, .. } = self;
let ipfs = Ipfs(Arc::new(IpfsInner {
name: self.options.name.clone(),
repo,
keys: DebuggableKeypair(keys),
to_task,
@ -337,6 +344,12 @@ impl<Types: IpfsTypes> std::ops::Deref for Ipfs<Types> {
}
}
macro_rules! traced {
($name:expr, $op:expr) => {{
$op.instrument(tracing::trace_span!("facade", node = $name.as_str()))
}};
}
impl<Types: IpfsTypes> Ipfs<Types> {
fn dag(&self) -> IpldDag<Types> {
IpldDag::new(self.clone())
@ -348,13 +361,13 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// Puts a block into the ipfs repo.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
Ok(self.repo.put_block(block).await?.0)
Ok(traced!(&self.name, self.repo.put_block(block)).await?.0)
}
/// Retrieves a block from the local blockstore, or starts fetching from the network or join an
/// already started fetch.
pub async fn get_block(&self, cid: &Cid) -> Result<Block, Error> {
Ok(self.repo.get_block(cid).await?)
Ok(traced!(&self.name, self.repo.get_block(cid)).await?)
}
/// Remove block from the ipfs repo.
@ -963,23 +976,28 @@ mod node {
}
impl Node {
pub async fn new() -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys();
pub async fn new<T: AsRef<str>>(name: T) -> Self {
let opts = IpfsOptions::inmemory_with_generated_keys(name);
Node::with_options(opts).await
}
pub async fn with_options(opts: IpfsOptions<TestTypes>) -> Self {
let name = opts.name.clone();
let (ipfs, fut) = UninitializedIpfs::new(opts)
.instrument(tracing::trace_span!("init", node = name.as_str()))
.await
.start()
.instrument(tracing::trace_span!("start", node = name.as_str()))
.await
.expect("Inmemory instance must succeed start");
.unwrap();
let jh = async_std::task::spawn(fut);
let background_task = async_std::task::spawn(
fut.instrument(tracing::trace_span!("bgtask", node = name.as_str())),
);
Node {
ipfs,
background_task: jh,
background_task,
}
}
@ -1095,7 +1113,7 @@ mod tests {
use multihash::Sha2_256;
pub async fn create_mock_ipfs() -> Ipfs<TestTypes> {
let options = IpfsOptions::inmemory_with_generated_keys();
let options = IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);

View File

@ -6,7 +6,7 @@ async fn connect_two_nodes() {
let (tx, rx) = futures::channel::oneshot::channel();
let node_a = task::spawn(async move {
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
@ -27,7 +27,7 @@ async fn connect_two_nodes() {
println!("got back from the other node: {:?}", other_addrs);
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys();
let opts = ipfs::IpfsOptions::inmemory_with_generated_keys("test_node");
let (ipfs, fut) = ipfs::UninitializedIpfs::new(opts)
.await
.start()
@ -64,8 +64,8 @@ async fn connect_two_nodes() {
// one should dial both of the addresses, resulting in two connections.
#[async_std::test]
async fn connect_two_nodes_with_two_connections_doesnt_panic() {
let node_a = ipfs::Node::new().await;
let node_b = ipfs::Node::new().await;
let node_a = ipfs::Node::new("a").await;
let node_b = ipfs::Node::new("b").await;
node_a
.add_listening_address(libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16)))

View File

@ -11,8 +11,8 @@ async fn exchange_block() {
let data = b"hello block\n".to_vec().into_boxed_slice();
let cid = Cid::new_v1(Codec::Raw, Sha2_256::digest(&data));
let a = Node::new().await;
let b = Node::new().await;
let a = Node::new("a").await;
let b = Node::new("b").await;
let (_, mut addrs) = b.identity().await.unwrap();

View File

@ -10,8 +10,8 @@ async fn kademlia_local_peer_discovery() {
// start up PEER_COUNT bootstrapper nodes
let mut bootstrappers = Vec::with_capacity(BOOTSTRAPPER_COUNT);
for _ in 0..BOOTSTRAPPER_COUNT {
bootstrappers.push(Node::new().await);
for i in 0..BOOTSTRAPPER_COUNT {
bootstrappers.push(Node::new(format!("bootstrapper_{}", i)).await);
}
// register the bootstrappers' ids and addresses
@ -37,7 +37,7 @@ async fn kademlia_local_peer_discovery() {
}
// introduce a peer and connect it to one of the bootstrappers
let peer = Node::new().await;
let peer = Node::new("peer").await;
assert!(peer
.add_peer(
bootstrapper_ids[0].0.clone(),
@ -64,7 +64,7 @@ async fn kademlia_popular_content_discovery() {
// introduce a peer and specify the Kademlia protocol to it
// without a specified protocol, the test will not complete
let mut opts = IpfsOptions::inmemory_with_generated_keys();
let mut opts = IpfsOptions::inmemory_with_generated_keys("test_node");
opts.kad_protocol = Some("/ipfs/lan/kad/1.0.0".to_owned());
let peer = Node::with_options(opts).await;

View File

@ -1,6 +1,6 @@
#[async_std::test]
async fn multiple_consecutive_ephemeral_listening_addresses() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
@ -14,7 +14,7 @@ async fn multiple_consecutive_ephemeral_listening_addresses() {
#[async_std::test]
async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
let target = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
@ -44,7 +44,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_same_ip() {
#[async_std::test]
#[cfg(not(target_os = "macos"))]
async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
// it doesnt work on mac os x as 127.0.0.2 is not enabled by default.
let first =
@ -61,7 +61,7 @@ async fn multiple_concurrent_ephemeral_listening_addresses_on_different_ip() {
#[async_std::test]
async fn adding_unspecified_addr_resolves_with_first() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
// there is no test in trying to match this with others as ... that would be quite
// perilous.
node.add_listening_address(libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16)))
@ -71,7 +71,7 @@ async fn adding_unspecified_addr_resolves_with_first() {
#[async_std::test]
async fn listening_for_multiple_unspecified_addresses() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
// there is no test in trying to match this with others as ... that would be quite
// perilous.
let target = libp2p::build_multiaddr!(Ip4([0, 0, 0, 0]), Tcp(0u16));
@ -95,7 +95,7 @@ async fn listening_for_multiple_unspecified_addresses() {
#[async_std::test]
async fn remove_listening_address() {
let node = ipfs::Node::new().await;
let node = ipfs::Node::new("test_node").await;
let unbound = libp2p::build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(0u16));
let first = node.add_listening_address(unbound.clone()).await.unwrap();

View File

@ -5,14 +5,14 @@ use std::time::Duration;
#[async_std::test]
async fn subscribe_only_once() {
let a = Node::new().await;
let a = Node::new("test_node").await;
let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap();
a.pubsub_subscribe("some_topic".into()).await.unwrap_err();
}
#[async_std::test]
async fn resubscribe_after_unsubscribe() {
let a = Node::new().await;
let a = Node::new("test_node").await;
let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap();
a.pubsub_unsubscribe("topic").await.unwrap();
@ -24,7 +24,7 @@ async fn resubscribe_after_unsubscribe() {
#[async_std::test]
async fn unsubscribe_via_drop() {
let a = Node::new().await;
let a = Node::new("test_node").await;
let msgs = a.pubsub_subscribe("topic".into()).await.unwrap();
assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]);
@ -37,7 +37,7 @@ async fn unsubscribe_via_drop() {
#[async_std::test]
async fn can_publish_without_subscribing() {
let a = Node::new().await;
let a = Node::new("test_node").await;
a.pubsub_publish("topic".into(), b"foobar".to_vec())
.await
.unwrap()
@ -132,8 +132,8 @@ async fn publish_between_two_nodes() {
}
async fn two_connected_nodes() -> ((Node, PeerId), (Node, PeerId)) {
let a = Node::new().await;
let b = Node::new().await;
let a = Node::new("a").await;
let b = Node::new("b").await;
let (a_pk, _) = a.identity().await.unwrap();
let a_id = a_pk.into_peer_id();

View File

@ -51,7 +51,7 @@ async fn check_cid_subscriptions(ipfs: &Node, cid: &Cid, expected_count: usize)
#[async_std::test]
async fn wantlist_cancellation() {
// start a single node
let ipfs = Node::new().await;
let ipfs = Node::new("test_node").await;
// execute a get_block request
let cid = Cid::try_from("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KaGa").unwrap();