refactor: move try_only_named.. to support

This commit is contained in:
Joonas Koivunen 2020-07-08 19:24:06 +03:00
parent 087776bda1
commit 4101383855
3 changed files with 86 additions and 76 deletions

View File

@ -1,11 +1,13 @@
use crate::v0::support::{with_ipfs, HandledErr, StreamResponse, StringError};
use crate::v0::support::{
try_only_named_multipart, with_ipfs, HandledErr, StreamResponse, StringError,
};
use bytes::Buf;
use futures::stream::{FuturesOrdered, Stream, StreamExt, TryStreamExt};
use futures::stream::{FuturesOrdered, Stream, StreamExt};
use ipfs::error::Error;
use ipfs::{Ipfs, IpfsTypes};
use libipld::cid::{Cid, Codec, Version};
use mime::Mime;
use mpart_async::server::MultipartStream;
use multihash::Multihash;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
@ -118,79 +120,6 @@ async fn inner_put<T: IpfsTypes>(
})))
}
pub async fn try_only_named_multipart<'a>(
allowed_names: &'a [impl AsRef<str> + 'a],
size_limit: usize,
boundary: String,
st: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin + 'a,
) -> Result<Vec<u8>, Rejection> {
use bytes::Bytes;
let mut stream =
MultipartStream::new(Bytes::from(boundary), st.map_ok(|mut buf| buf.to_bytes()));
// store the first good field here; optimally this would just be an Option but couldn't figure
// out a way to handle the "field matched", "field not matched" cases while supporting empty
// fields.
let mut buffer = Vec::new();
let mut matched = false;
while let Some(mut field) = stream.try_next().await.map_err(StringError::from)? {
// [ipfs http api] says we should expect a "data" but examples use "file" as the
// form field name. newer conformance tests also use former, older latter.
//
// [ipfs http api]: https://docs.ipfs.io/reference/http/api/#api-v0-block-put
let name = field
.name()
.map_err(|_| StringError::from("invalid field name"))?;
let mut target = if allowed_names.iter().any(|s| s.as_ref() == name) {
Some(&mut buffer)
} else {
None
};
if matched {
// per spec: only one block should be uploaded at once
return Err(StringError::from("multiple blocks (expecting at most one)").into());
}
matched = target.is_some();
loop {
let next = field.try_next().await.map_err(|e| {
StringError::from(format!("IO error while reading field bytes: {}", e))
})?;
match (next, target.as_mut()) {
(Some(bytes), Some(target)) => {
if target.len() + bytes.len() > size_limit {
return Err(StringError::from("block is too large").into());
} else if target.is_empty() {
target.reserve(size_limit);
}
target.extend_from_slice(bytes.as_ref());
}
(Some(bytes), None) => {
// noop: we must fully consume the part before moving on to next.
if bytes.is_empty() {
// this technically shouldn't be happening any more but erroring
// out instead of spinning wildly is much better.
return Err(StringError::from("internal error: zero read").into());
}
}
(None, _) => break,
}
}
}
if !matched {
return Err(StringError::from("missing field: \"data\" (or \"file\")").into());
}
Ok(buffer)
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "PascalCase")]
pub struct RmResponse {

View File

@ -10,6 +10,9 @@ pub mod option_parsing;
mod stream;
pub use stream::StreamResponse;
mod body;
pub use body::try_only_named_multipart;
/// The common responses apparently returned by the go-ipfs HTTP api on errors.
/// See also: https://github.com/ferristseng/rust-ipfs-api/blob/master/ipfs-api/src/response/error.rs
#[derive(Debug, Serialize)]

View File

@ -0,0 +1,78 @@
use crate::v0::support::StringError;
use bytes::Buf;
use futures::stream::{Stream, TryStreamExt};
use mpart_async::server::MultipartStream;
use warp::Rejection;
pub async fn try_only_named_multipart<'a>(
allowed_names: &'a [impl AsRef<str> + 'a],
size_limit: usize,
boundary: String,
st: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin + 'a,
) -> Result<Vec<u8>, Rejection> {
use bytes::Bytes;
let mut stream =
MultipartStream::new(Bytes::from(boundary), st.map_ok(|mut buf| buf.to_bytes()));
// store the first good field here; optimally this would just be an Option but couldn't figure
// out a way to handle the "field matched", "field not matched" cases while supporting empty
// fields.
let mut buffer = Vec::new();
let mut matched = false;
while let Some(mut field) = stream.try_next().await.map_err(StringError::from)? {
// [ipfs http api] says we should expect a "data" but examples use "file" as the
// form field name. newer conformance tests also use former, older latter.
//
// [ipfs http api]: https://docs.ipfs.io/reference/http/api/#api-v0-block-put
let name = field
.name()
.map_err(|_| StringError::from("invalid field name"))?;
let mut target = if allowed_names.iter().any(|s| s.as_ref() == name) {
Some(&mut buffer)
} else {
None
};
if matched {
// per spec: only one block should be uploaded at once
return Err(StringError::from("multiple blocks (expecting at most one)").into());
}
matched = target.is_some();
loop {
let next = field.try_next().await.map_err(|e| {
StringError::from(format!("IO error while reading field bytes: {}", e))
})?;
match (next, target.as_mut()) {
(Some(bytes), Some(target)) => {
if target.len() + bytes.len() > size_limit {
return Err(StringError::from("block is too large").into());
} else if target.is_empty() {
target.reserve(size_limit);
}
target.extend_from_slice(bytes.as_ref());
}
(Some(bytes), None) => {
// noop: we must fully consume the part before moving on to next.
if bytes.is_empty() {
// this technically shouldn't be happening any more but erroring
// out instead of spinning wildly is much better.
return Err(StringError::from("internal error: zero read").into());
}
}
(None, _) => break,
}
}
}
if !matched {
return Err(StringError::from("missing field: \"data\" (or \"file\")").into());
}
Ok(buffer)
}