diff --git a/tests/pubsub.rs b/tests/pubsub.rs index e7debe2a..c98e605f 100644 --- a/tests/pubsub.rs +++ b/tests/pubsub.rs @@ -48,10 +48,8 @@ async fn can_publish_without_subscribing() { } #[tokio::test] -#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet -async fn publish_between_two_nodes() { +async fn publish_between_two_nodes_single_topic() { use futures::stream::StreamExt; - use std::collections::HashSet; let nodes = spawn_nodes(2, Topology::Line).await; @@ -98,29 +96,50 @@ async fn publish_between_two_nodes() { // the order is not defined, but both should see the other's message and the message they sent let expected = [ - (&[topic.clone()], &nodes[0].id, b"foobar"), - (&[topic.clone()], &nodes[1].id, b"barfoo"), + // first node should witness it's the message it sent + (&[topic.clone()], nodes[0].id, b"foobar", nodes[0].id), + // second node should witness first nodes message, and so on. + (&[topic.clone()], nodes[0].id, b"foobar", nodes[1].id), + (&[topic.clone()], nodes[1].id, b"barfoo", nodes[0].id), + (&[topic.clone()], nodes[1].id, b"barfoo", nodes[1].id), ] .iter() .cloned() - .map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec())) - .collect::>(); + .map(|(topics, sender, data, witness)| (topics.to_vec(), sender, data.to_vec(), witness)) + .collect::>(); - for st in &mut [b_msgs.by_ref(), a_msgs.by_ref()] { - let actual = timeout( + let mut actual = Vec::new(); + + for (st, own_peer_id) in &mut [ + (b_msgs.by_ref(), nodes[1].id), + (a_msgs.by_ref(), nodes[0].id), + ] { + let received = timeout( Duration::from_secs(2), st.take(2) // Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305 // can still be looping .map(|msg| (*msg).clone()) - .map(|msg| (msg.topics, msg.source, msg.data)) - .collect::>(), + .map(|msg| (msg.topics, msg.source, msg.data, *own_peer_id)) + .collect::>(), ) .await .unwrap(); - assert_eq!(expected, actual); + + actual.extend(received); } + // sort the received messages both in expected and actual to make sure they are comparable; + // order of receiving is not part of the tuple and shouldn't matter. + let mut expected = expected; + expected.sort_unstable(); + actual.sort_unstable(); + + assert_eq!( + actual, expected, + "sent and received messages must be present on both nodes' streams" + ); + drop(b_msgs); let mut disappeared = false; @@ -143,10 +162,8 @@ async fn publish_between_two_nodes() { } #[tokio::test] -#[allow(clippy::mutable_key_type)] // clippy doesn't like Vec inside HashSet async fn publish_between_two_nodes_different_topics() { use futures::stream::StreamExt; - use std::collections::HashSet; let nodes = spawn_nodes(2, Topology::Line).await; let node_a = &nodes[0]; @@ -197,34 +214,37 @@ async fn publish_between_two_nodes_different_topics() { .await .unwrap(); - // the order is not defined, but both should see the other's message + // the order between messages is not defined, but both should see the other's message. since we + // receive messages first from node_b's stream we expect this order. + // + // in this test case the nodes are not expected to see their own message because nodes are not + // subscribing to the streams they are sending to. let expected = [ - (&[topic_a.clone()], &node_a.id, b"foobar"), - (&[topic_b.clone()], &node_b.id, b"barfoo"), + (&[topic_a.clone()], node_a.id, b"foobar", node_b.id), + (&[topic_b.clone()], node_b.id, b"barfoo", node_a.id), ] .iter() .cloned() - .map(|(topics, id, data)| (topics.to_vec(), *id, data.to_vec())) - .collect::>(); + .map(|(topics, sender, data, witness)| (topics.to_vec(), sender, data.to_vec(), witness)) + .collect::>(); - let mut actual = HashSet::new(); + let mut actual = Vec::new(); for (st, own_peer_id) in &mut [(b_msgs.by_ref(), node_b.id), (a_msgs.by_ref(), node_a.id)] { - let actual_msg = timeout( + let received = timeout( Duration::from_secs(2), st.take(1) - // Arc::try_unwrap will fail sometimes here as the sender side in src/p2p/pubsub.rs:305 - // can still be looping .map(|msg| (*msg).clone()) - .map(|msg| (msg.topics, msg.source, msg.data)) - .filter(|(_, source_peer_id, _)| future::ready(source_peer_id != own_peer_id)) + .map(|msg| (msg.topics, msg.source, msg.data, *own_peer_id)) .next(), ) .await .unwrap() .unwrap(); - actual.insert(actual_msg); + actual.push(received); } + // ordering is defined for expected and actual by the order of the looping above and the + // initial expected creation. assert_eq!(expected, actual); drop(b_msgs);