refactor: make pubsub methods move their params

Signed-off-by: ljedrz <ljedrz@gmail.com>
This commit is contained in:
ljedrz
2020-06-26 15:09:17 +02:00
parent 8c89714da9
commit 2603532027
3 changed files with 27 additions and 28 deletions

View File

@ -68,7 +68,7 @@ async fn inner_peers<T: IpfsTypes>(
topic: Option<String>,
) -> Result<impl warp::Reply, warp::Rejection> {
let peers = ipfs
.pubsub_peers(topic.as_deref())
.pubsub_peers(topic)
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
@ -112,8 +112,7 @@ async fn inner_publish<T: IpfsTypes>(
ipfs: Ipfs<T>,
PublishArgs { topic, message }: PublishArgs,
) -> Result<impl warp::Reply, warp::Rejection> {
// FIXME: perhaps these should be taken by value as they are always moved?
ipfs.pubsub_publish(&topic, &message.into_inner())
ipfs.pubsub_publish(topic, message.into_inner())
.await
.map_err(|e| warp::reject::custom(StringError::from(e)))?;
Ok(warp::reply::reply())
@ -161,7 +160,7 @@ async fn inner_subscribe<T: IpfsTypes>(
// the returned stream needs to be set up to be shoveled in a background task
let shoveled = ipfs
.pubsub_subscribe(&topic)
.pubsub_subscribe(topic.clone())
.await
.expect("new subscriptions shouldn't fail while holding the lock");
@ -257,7 +256,7 @@ async fn shovel<T: IpfsTypes>(
topic
);
shoveled = ipfs
.pubsub_subscribe(&topic)
.pubsub_subscribe(topic.clone())
.await
.expect("new subscriptions shouldn't fail while holding the lock");
} else {

View File

@ -471,25 +471,25 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// Subscribes to a given topic. Can be done at most once without unsubscribing in the between.
/// The subscription can be unsubscribed by dropping the stream or calling
/// [`pubsub_unsubscribe`].
pub async fn pubsub_subscribe(&self, topic: &str) -> Result<SubscriptionStream, Error> {
pub async fn pubsub_subscribe(&self, topic: String) -> Result<SubscriptionStream, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubSubscribe(topic.into(), tx))
.send(IpfsEvent::PubsubSubscribe(topic, tx))
.await?;
rx.await?
.ok_or_else(|| format_err!("already subscribed to {:?}", topic))
.ok_or_else(|| format_err!("already subscribed to the given topic"))
}
/// Publishes to the topic which may have been subscribed to earlier
pub async fn pubsub_publish(&self, topic: &str, data: &[u8]) -> Result<(), Error> {
pub async fn pubsub_publish(&self, topic: String, data: Vec<u8>) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPublish(topic.into(), data.to_vec(), tx))
.send(IpfsEvent::PubsubPublish(topic, data, tx))
.await?;
Ok(rx.await?)
@ -508,12 +508,12 @@ impl<Types: IpfsTypes> Ipfs<Types> {
}
/// Returns all known pubsub peers with the optional topic filter
pub async fn pubsub_peers(&self, topic: Option<&str>) -> Result<Vec<PeerId>, Error> {
pub async fn pubsub_peers(&self, topic: Option<String>) -> Result<Vec<PeerId>, Error> {
let (tx, rx) = oneshot_channel();
self.to_task
.clone()
.send(IpfsEvent::PubsubPeers(topic.map(String::from), tx))
.send(IpfsEvent::PubsubPeers(topic, tx))
.await?;
Ok(rx.await?)

View File

@ -9,27 +9,27 @@ const MDNS: bool = false;
#[async_std::test]
async fn subscribe_only_once() {
let a = Node::new(MDNS).await;
let _stream = a.pubsub_subscribe("some_topic").await.unwrap();
a.pubsub_subscribe("some_topic").await.unwrap_err();
let _stream = a.pubsub_subscribe("some_topic".into()).await.unwrap();
a.pubsub_subscribe("some_topic".into()).await.unwrap_err();
}
#[async_std::test]
async fn resubscribe_after_unsubscribe() {
let a = Node::new(MDNS).await;
let mut stream = a.pubsub_subscribe("topic").await.unwrap();
let mut stream = a.pubsub_subscribe("topic".into()).await.unwrap();
a.pubsub_unsubscribe("topic").await.unwrap();
// sender has been dropped
assert_eq!(stream.next().await, None);
drop(a.pubsub_subscribe("topic").await.unwrap());
drop(a.pubsub_subscribe("topic".into()).await.unwrap());
}
#[async_std::test]
async fn unsubscribe_via_drop() {
let a = Node::new(MDNS).await;
let msgs = a.pubsub_subscribe("topic").await.unwrap();
let msgs = a.pubsub_subscribe("topic".into()).await.unwrap();
assert_eq!(a.pubsub_subscribed().await.unwrap(), &["topic"]);
drop(msgs);
@ -41,7 +41,7 @@ async fn unsubscribe_via_drop() {
#[async_std::test]
async fn can_publish_without_subscribing() {
let a = Node::new(MDNS).await;
a.pubsub_publish("topic", b"foobar").await.unwrap()
a.pubsub_publish("topic".into(), b"foobar".to_vec()).await.unwrap()
}
#[async_std::test]
@ -52,16 +52,16 @@ async fn publish_between_two_nodes() {
let ((a, a_id), (b, b_id)) = two_connected_nodes().await;
let topic = "shared";
let topic = "shared".to_owned();
let mut a_msgs = a.pubsub_subscribe(topic).await.unwrap();
let mut b_msgs = b.pubsub_subscribe(topic).await.unwrap();
let mut a_msgs = a.pubsub_subscribe(topic.clone()).await.unwrap();
let mut b_msgs = b.pubsub_subscribe(topic.clone()).await.unwrap();
// need to wait to see both sides so that the messages will get through
let mut appeared = false;
for _ in 0..100usize {
if a.pubsub_peers(Some(topic)).await.unwrap().contains(&b_id)
&& b.pubsub_peers(Some(topic)).await.unwrap().contains(&a_id)
if a.pubsub_peers(Some(topic.clone())).await.unwrap().contains(&b_id)
&& b.pubsub_peers(Some(topic.clone())).await.unwrap().contains(&a_id)
{
appeared = true;
break;
@ -76,16 +76,16 @@ async fn publish_between_two_nodes() {
"timed out before both nodes appeared as pubsub peers"
);
a.pubsub_publish(topic, b"foobar").await.unwrap();
b.pubsub_publish(topic, b"barfoo").await.unwrap();
a.pubsub_publish(topic.clone(), b"foobar".to_vec()).await.unwrap();
b.pubsub_publish(topic.clone(), b"barfoo".to_vec()).await.unwrap();
// the order is not defined, but both should see the other's message and the message they sent
let expected = [(&[topic], &a_id, b"foobar"), (&[topic], &b_id, b"barfoo")]
let expected = [(&[topic.clone()], &a_id, b"foobar"), (&[topic.clone()], &b_id, b"barfoo")]
.iter()
.cloned()
.map(|(topics, id, data)| {
(
topics.iter().map(|&s| s.to_string()).collect::<Vec<_>>(),
topics.to_vec(),
id.clone(),
data.to_vec(),
)
@ -108,7 +108,7 @@ async fn publish_between_two_nodes() {
let mut disappeared = false;
for _ in 0..100usize {
if !a.pubsub_peers(Some(topic)).await.unwrap().contains(&b_id) {
if !a.pubsub_peers(Some(topic.clone())).await.unwrap().contains(&b_id) {
disappeared = true;
break;
}