fix: get pubsub/pub message from multipart body

This commit is contained in:
Joonas Koivunen 2020-07-08 21:09:54 +03:00
parent 6ddd7774ea
commit 1c9b31cf76

View File

@ -10,8 +10,7 @@
//! `ipfs::Ipfs::pubsub_subscribe` and thus will panic if an subscription was made outside of this
//! locking mechanism.
use futures::stream::TryStream;
use futures::stream::TryStreamExt;
use futures::stream::{Stream, TryStream, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::stream::StreamExt;
@ -26,12 +25,14 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use warp::hyper::body::Bytes;
use warp::Filter;
use bytes::{Buf, Bytes};
use warp::{Filter, Rejection};
use crate::v0::support::{
with_ipfs, NonUtf8Topic, RequiredArgumentMissing, StreamResponse, StringError,
try_only_named_multipart, with_ipfs, NonUtf8Topic, RequiredArgumentMissing, StreamResponse,
StringError,
};
use mime::Mime;
#[derive(Default)]
pub struct Pubsub {
@ -429,7 +430,7 @@ impl AsRef<[u8]> for PublishArgs {
/// `parameter_name` is byte slice because there is no percent decoding done for that component.
fn publish_args(
parameter_name: &'static str,
) -> impl warp::Filter<Extract = (PublishArgs,), Error = warp::Rejection> + Copy {
) -> impl Filter<Extract = (PublishArgs,), Error = warp::Rejection> + Clone {
warp::filters::query::raw()
.and_then(move |s: String| {
let ret = if s.is_empty() {
@ -439,11 +440,13 @@ fn publish_args(
// conversion to utf8 without us being able to recover the raw bytes, which are
// used by js-ipfs/ipfs-http-client to encode raw Buffers:
// https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs-http-client/src/pubsub/publish.js
let mut args = QueryAsRawPartsParser {
let parser = QueryAsRawPartsParser {
input: s.as_bytes(),
}
.filter(|&(k, _)| k == parameter_name.as_bytes())
.map(|t| t.1);
};
let mut args = parser
.filter(|&(k, _)| k == parameter_name.as_bytes())
.map(|t| t.1);
let first = args
.next()
@ -470,18 +473,36 @@ fn publish_args(
futures::future::ready(ret)
})
.and_then(|(topic, opt_arg): (String, Option<QueryOrBody>)| {
let ret = if let Some(message) = opt_arg {
Ok(PublishArgs { topic, message })
} else {
// this branch should check for multipart body, however the js-http client is not
// using that so we can leave it probably for now. Looks like warp doesn't support
// multipart bodies without Content-Length so `go-ipfs` is not supported at this
// time.
Err(warp::reject::custom(RequiredArgumentMissing("data")))
};
futures::future::ready(ret)
.and(warp::filters::header::optional::<Mime>("content-type"))
.and(warp::filters::body::stream())
.and_then(publish_args_inner)
}
async fn publish_args_inner(
(topic, opt_arg): (String, Option<QueryOrBody>),
content_type: Option<Mime>,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
) -> Result<PublishArgs, Rejection> {
if let Some(message) = opt_arg {
Ok(PublishArgs { topic, message })
} else {
// this branch should check for multipart body, however the js-http client is not
// using that so we can leave it probably for now. Looks like warp doesn't support
// multipart bodies without Content-Length so `go-ipfs` is not supported at this
// time.
let boundary = content_type
.ok_or_else(|| StringError::from("message needs to be query or in multipart body"))?
.get_param("boundary")
.map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let buffer = try_only_named_multipart(&["file"], 1024 * 100, boundary, body).await?;
Ok(PublishArgs {
topic,
message: QueryOrBody::Body(buffer),
})
}
}
struct QueryAsRawPartsParser<'a> {
@ -548,4 +569,26 @@ mod tests {
let body = str::from_utf8(response.body()).unwrap();
assert_eq!(body, r#"{"message":"foobar","topic":"some_channel"}"#);
}
#[test]
fn message_in_body() {
let response = block_on(
request()
.path("/pubsub/pub?arg=some_channel")
.header("content-type", "multipart/form-data; boundary=-----------------------------Z0oYi6XyTm7_x2L4ty8JL")
.body(&b"-------------------------------Z0oYi6XyTm7_x2L4ty8JL\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"\"\r\n\
Content-Type: application/octet-stream\r\n\
\r\n\
aedFIxDJZ2jS1eVB6Pkbv\r\n\
-------------------------------Z0oYi6XyTm7_x2L4ty8JL--\r\n"[..])
.reply(&publish_args_as_json("arg")),
);
let body = str::from_utf8(response.body()).unwrap();
assert_eq!(
body,
r#"{"message":"aedFIxDJZ2jS1eVB6Pkbv","topic":"some_channel"}"#
);
}
}