231: Streaming multiparts: pubsub r=ljedrz a=koivunej

This was seen originally but at least it seemed some months ago that there wasn't a way to process multiparts streaming. Luckily the older js-ipfs-http-client puts the message in query. Later versions related to #228 use multipart bodies, but the topic is still a query argument.

Depends on #230.

 * Adds test for the older query part message parsing
 * Uses `&'static str` instead of `&'static [u8]` (fixes error messages)

The latest version should leave us failing only the conformance tests related to:

* pinning
* `?timeout=<not sure what unit>`
  * refs
  * cat
  * get
  * block/get
  * dag/get
* wantlist

This is:

```
  146 passing (3m)
  21 pending
  8 failing
```


Co-authored-by: Joonas Koivunen <joonas@equilibrium.co>
This commit is contained in:
bors[bot] 2020-07-09 08:27:56 +00:00 committed by GitHub
commit e37c9ab8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 184 additions and 54 deletions

View File

@ -99,7 +99,9 @@ async fn inner_put<T: IpfsTypes>(
.map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let buffer = try_only_named_multipart(&["data", "file"], 1024 * 1024, boundary, body).await?;
let buffer = try_only_named_multipart(&["data", "file"], 1024 * 1024, boundary, body)
.await
.map_err(StringError::from)?;
// bad thing about Box<[u8]>: converting to it forces an reallocation
let data = buffer.into_boxed_slice();

View File

@ -30,6 +30,17 @@ impl Default for InputEncoding {
}
}
pub fn put<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
path!("dag" / "put")
.and(with_ipfs(ipfs))
.and(query::<PutQuery>())
.and(warp::header::<Mime>("content-type")) // TODO: rejects if missing
.and(warp::body::stream())
.and_then(put_query)
}
async fn put_query<T: IpfsTypes>(
ipfs: Ipfs<T>,
query: PutQuery,
@ -62,7 +73,9 @@ async fn put_query<T: IpfsTypes>(
.map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let buf = try_only_named_multipart(&["data", "file"], 1024 * 1024, boundary, body).await?;
let buf = try_only_named_multipart(&["data", "file"], 1024 * 1024, boundary, body)
.await
.map_err(StringError::from)?;
let data = buf.into_boxed_slice();
let digest = hasher(&data);
@ -81,17 +94,6 @@ async fn put_query<T: IpfsTypes>(
Ok(reply::json(&reply))
}
pub fn put<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
path!("dag" / "put")
.and(with_ipfs(ipfs))
.and(query::<PutQuery>())
.and(warp::header::<Mime>("content-type")) // TODO: rejects if missing
.and(warp::body::stream())
.and_then(put_query)
}
/// Per https://docs-beta.ipfs.io/reference/http/api/#api-v0-block-resolve this endpoint takes in a
/// path and resolves it to the last block (the cid), and to the path inside the final block
/// (rempath).

View File

@ -10,8 +10,7 @@
//! `ipfs::Ipfs::pubsub_subscribe` and thus will panic if an subscription was made outside of this
//! locking mechanism.
use futures::stream::TryStream;
use futures::stream::TryStreamExt;
use futures::stream::{Stream, TryStream, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::stream::StreamExt;
@ -26,12 +25,14 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use warp::hyper::body::Bytes;
use warp::Filter;
use bytes::{Buf, Bytes};
use warp::{Filter, Rejection};
use crate::v0::support::{
with_ipfs, NonUtf8Topic, RequiredArgumentMissing, StreamResponse, StringError,
try_only_named_multipart, with_ipfs, NonUtf8Topic, OnlyMultipartFailure,
RequiredArgumentMissing, StreamResponse, StringError,
};
use mime::Mime;
#[derive(Default)]
pub struct Pubsub {
@ -104,7 +105,7 @@ pub fn publish<T: IpfsTypes>(
warp::path!("pub")
.and(warp::post())
.and(with_ipfs(ipfs))
.and(publish_args(b"arg"))
.and(publish_args("arg"))
.and_then(inner_publish)
}
@ -405,7 +406,6 @@ struct PublishArgs {
#[derive(Debug)]
enum QueryOrBody {
Query(Vec<u8>),
#[allow(dead_code)]
Body(Vec<u8>),
}
@ -417,29 +417,40 @@ impl QueryOrBody {
}
}
impl AsRef<[u8]> for PublishArgs {
fn as_ref(&self) -> &[u8] {
use QueryOrBody::*;
match &self.message {
Query(b) | Body(b) => b.as_slice(),
}
}
}
/// `parameter_name` is byte slice because there is no percent decoding done for that component.
fn publish_args(
parameter_name: &'static [u8],
) -> impl warp::Filter<Extract = (PublishArgs,), Error = warp::Rejection> + Copy {
parameter_name: &'static str,
) -> impl Filter<Extract = (PublishArgs,), Error = warp::Rejection> + Clone {
warp::filters::query::raw()
.and_then(move |s: String| {
let ret = if s.is_empty() {
Err(warp::reject::custom(RequiredArgumentMissing(b"topic")))
Err(warp::reject::custom(RequiredArgumentMissing("topic")))
} else {
// sadly we can't use url::form_urlencoded::parse here as it will do lossy
// conversion to utf8 without us being able to recover the raw bytes, which are
// used by js-ipfs/ipfs-http-client to encode raw Buffers:
// https://github.com/ipfs/js-ipfs/blob/master/packages/ipfs-http-client/src/pubsub/publish.js
let mut args = QueryAsRawPartsParser {
let parser = QueryAsRawPartsParser {
input: s.as_bytes(),
}
.filter(|&(k, _)| k == parameter_name)
.map(|t| t.1);
};
let mut args = parser
.filter(|&(k, _)| k == parameter_name.as_bytes())
.map(|t| t.1);
let first = args
.next()
// can't be missing
.ok_or_else(|| warp::reject::custom(RequiredArgumentMissing(b"arg")))
.ok_or_else(|| warp::reject::custom(RequiredArgumentMissing(parameter_name)))
// decode into Result<String, warp::Rejection>
.and_then(|raw_first| {
percent_encoding::percent_decode(raw_first)
@ -461,18 +472,40 @@ fn publish_args(
futures::future::ready(ret)
})
.and_then(|(topic, opt_arg): (String, Option<QueryOrBody>)| {
let ret = if let Some(message) = opt_arg {
Ok(PublishArgs { topic, message })
} else {
// this branch should check for multipart body, however the js-http client is not
// using that so we can leave it probably for now. Looks like warp doesn't support
// multipart bodies without Content-Length so `go-ipfs` is not supported at this
// time.
Err(warp::reject::custom(RequiredArgumentMissing(b"data")))
};
futures::future::ready(ret)
.and(warp::filters::header::optional::<Mime>("content-type"))
.and(warp::filters::body::stream())
.and_then(publish_args_inner)
}
async fn publish_args_inner(
(topic, opt_arg): (String, Option<QueryOrBody>),
content_type: Option<Mime>,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin,
) -> Result<PublishArgs, Rejection> {
if let Some(message) = opt_arg {
Ok(PublishArgs { topic, message })
} else {
let boundary = content_type
.ok_or_else(|| StringError::from("message needs to be query or in multipart body"))?
.get_param("boundary")
.map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let buffer = match try_only_named_multipart(&["file"], 1024 * 100, boundary, body).await {
Ok(buffer) if buffer.is_empty() => Ok(None),
Ok(buffer) => Ok(Some(buffer)),
Err(OnlyMultipartFailure::NotFound) => Ok(None),
Err(e) => Err(StringError::from(e)),
}?;
// this error is from conformance tests; the field name is different
let buffer = buffer.ok_or_else(|| StringError::from("argument \"data\" is required"))?;
Ok(PublishArgs {
topic,
message: QueryOrBody::Body(buffer),
})
}
}
struct QueryAsRawPartsParser<'a> {
@ -506,3 +539,59 @@ impl<'a> Iterator for QueryAsRawPartsParser<'a> {
}
}
}
#[cfg(test)]
mod tests {
use super::{publish_args, PublishArgs};
use futures::executor::block_on;
use futures::future::ready;
use std::str;
use warp::reply::json;
use warp::{test::request, Filter, Rejection, Reply};
fn publish_args_as_json(
param: &'static str,
) -> impl Filter<Extract = impl Reply, Error = Rejection> {
publish_args(param).and_then(|p: PublishArgs| {
let message = str::from_utf8(p.as_ref()).unwrap();
ready(Ok::<_, warp::Rejection>(json(&serde_json::json!({
"message": message,
"topic": p.topic,
}))))
})
}
#[test]
fn url_hacked_args() {
let response = block_on(
request()
.path("/pubsub/pub?arg=some_channel&arg=foobar")
.reply(&publish_args_as_json("arg")),
);
let body = str::from_utf8(response.body()).unwrap();
assert_eq!(body, r#"{"message":"foobar","topic":"some_channel"}"#);
}
#[test]
fn message_in_body() {
let response = block_on(
request()
.path("/pubsub/pub?arg=some_channel")
.header("content-type", "multipart/form-data; boundary=-----------------------------Z0oYi6XyTm7_x2L4ty8JL")
.body(&b"-------------------------------Z0oYi6XyTm7_x2L4ty8JL\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"\"\r\n\
Content-Type: application/octet-stream\r\n\
\r\n\
aedFIxDJZ2jS1eVB6Pkbv\r\n\
-------------------------------Z0oYi6XyTm7_x2L4ty8JL--\r\n"[..])
.reply(&publish_args_as_json("arg")),
);
let body = str::from_utf8(response.body()).unwrap();
assert_eq!(
body,
r#"{"message":"aedFIxDJZ2jS1eVB6Pkbv","topic":"some_channel"}"#
);
}
}

View File

@ -11,7 +11,7 @@ mod stream;
pub use stream::StreamResponse;
mod body;
pub use body::try_only_named_multipart;
pub use body::{try_only_named_multipart, OnlyMultipartFailure};
/// The common responses apparently returned by the go-ipfs HTTP api on errors.
/// See also: https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/response/error.rs
@ -75,7 +75,7 @@ impl warp::reject::Reject for NonUtf8Topic {}
/// Used by `pubsub/pub`
#[derive(Debug)]
pub(crate) struct RequiredArgumentMissing(pub(crate) &'static [u8]);
pub(crate) struct RequiredArgumentMissing(pub(crate) &'static str);
impl warp::reject::Reject for RequiredArgumentMissing {}
#[derive(Debug)]

View File

@ -1,15 +1,54 @@
use crate::v0::support::StringError;
use bytes::Buf;
use futures::stream::{Stream, TryStreamExt};
use mpart_async::server::MultipartStream;
use warp::Rejection;
use mpart_async::server::{MultipartError, MultipartStream};
use std::fmt;
#[derive(Debug)]
pub enum OnlyMultipartFailure {
UnparseableFieldName,
MultipleValues,
TooLargeValue,
NotFound,
ZeroRead,
IO(MultipartError),
}
impl fmt::Display for OnlyMultipartFailure {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use OnlyMultipartFailure::*;
match self {
UnparseableFieldName => write!(fmt, "multipart field name could not be parsed"),
MultipleValues => write!(fmt, "multiple values of the matching name, expected one"),
TooLargeValue => write!(fmt, "value is too long"),
NotFound => write!(fmt, "value not found"),
ZeroRead => write!(fmt, "internal error: read zero"),
IO(e) => write!(fmt, "parsing failed: {}", e),
}
}
}
impl std::error::Error for OnlyMultipartFailure {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use OnlyMultipartFailure::*;
match self {
IO(e) => Some(e),
_ => None,
}
}
}
impl From<MultipartError> for OnlyMultipartFailure {
fn from(e: MultipartError) -> Self {
OnlyMultipartFailure::IO(e)
}
}
pub async fn try_only_named_multipart<'a>(
allowed_names: &'a [impl AsRef<str> + 'a],
size_limit: usize,
boundary: String,
st: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin + 'a,
) -> Result<Vec<u8>, Rejection> {
) -> Result<Vec<u8>, OnlyMultipartFailure> {
use bytes::Bytes;
let mut stream =
MultipartStream::new(Bytes::from(boundary), st.map_ok(|mut buf| buf.to_bytes()));
@ -20,7 +59,7 @@ pub async fn try_only_named_multipart<'a>(
let mut buffer = Vec::new();
let mut matched = false;
while let Some(mut field) = stream.try_next().await.map_err(StringError::from)? {
while let Some(mut field) = stream.try_next().await? {
// [ipfs http api] says we should expect a "data" but examples use "file" as the
// form field name. newer conformance tests also use former, older latter.
//
@ -28,7 +67,7 @@ pub async fn try_only_named_multipart<'a>(
let name = field
.name()
.map_err(|_| StringError::from("invalid field name"))?;
.map_err(|_| OnlyMultipartFailure::UnparseableFieldName)?;
let mut target = if allowed_names.iter().any(|s| s.as_ref() == name) {
Some(&mut buffer)
@ -38,20 +77,18 @@ pub async fn try_only_named_multipart<'a>(
if matched {
// per spec: only one block should be uploaded at once
return Err(StringError::from("multiple blocks (expecting at most one)").into());
return Err(OnlyMultipartFailure::MultipleValues);
}
matched = target.is_some();
loop {
let next = field.try_next().await.map_err(|e| {
StringError::from(format!("IO error while reading field bytes: {}", e))
})?;
let next = field.try_next().await?;
match (next, target.as_mut()) {
(Some(bytes), Some(target)) => {
if target.len() + bytes.len() > size_limit {
return Err(StringError::from("block is too large").into());
return Err(OnlyMultipartFailure::TooLargeValue);
} else if target.is_empty() {
target.reserve(size_limit);
}
@ -62,7 +99,7 @@ pub async fn try_only_named_multipart<'a>(
if bytes.is_empty() {
// this technically shouldn't be happening any more but erroring
// out instead of spinning wildly is much better.
return Err(StringError::from("internal error: zero read").into());
return Err(OnlyMultipartFailure::ZeroRead);
}
}
(None, _) => break,
@ -71,7 +108,7 @@ pub async fn try_only_named_multipart<'a>(
}
if !matched {
return Err(StringError::from("missing field: \"data\" (or \"file\")").into());
return Err(OnlyMultipartFailure::NotFound);
}
Ok(buffer)