refactor: try_only_named_multipart out of put_block

This commit is contained in:
Joonas Koivunen 2020-07-08 19:17:58 +03:00
parent 06ba40e8f2
commit 087776bda1

View File

@ -96,7 +96,37 @@ async fn inner_put<T: IpfsTypes>(
.get_param("boundary")
.map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let mut stream = MultipartStream::new(boundary, body.map_ok(|mut buf| buf.to_bytes()));
let buffer = try_only_named_multipart(&["data", "file"], 1024 * 1024, boundary, body).await?;
// bad thing about Box<[u8]>: converting to it forces an reallocation
let data = buffer.into_boxed_slice();
let digest = opts.digest()?(&data);
let cid = Cid::new(opts.version()?, opts.format()?, digest).map_err(StringError::from)?;
let size = data.len();
let key = cid.to_string();
let block = ipfs::Block { cid, data };
ipfs.put_block(block).await.map_err(StringError::from)?;
Ok(reply::json(&serde_json::json!({
"Key": key,
"Size": size,
})))
}
pub async fn try_only_named_multipart<'a>(
allowed_names: &'a [impl AsRef<str> + 'a],
size_limit: usize,
boundary: String,
st: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin + 'a,
) -> Result<Vec<u8>, Rejection> {
use bytes::Bytes;
let mut stream =
MultipartStream::new(Bytes::from(boundary), st.map_ok(|mut buf| buf.to_bytes()));
// store the first good field here; optimally this would just be an Option but couldn't figure
// out a way to handle the "field matched", "field not matched" cases while supporting empty
@ -114,21 +144,15 @@ async fn inner_put<T: IpfsTypes>(
.name()
.map_err(|_| StringError::from("invalid field name"))?;
let mut target = match name {
"data" => Some(&mut buffer),
name @ "file" => {
log::warn!("block/put: processing part with deprecated name {:?}", name);
Some(&mut buffer)
}
other => {
log::warn!("block/put: ignoring part named {:?}", other);
None
}
let mut target = if allowed_names.iter().any(|s| s.as_ref() == name) {
Some(&mut buffer)
} else {
None
};
if matched {
// per spec: only one block should be uploaded at once
return Err(StringError::from("multiple blocks (blocks/put only accepts one)").into());
return Err(StringError::from("multiple blocks (expecting at most one)").into());
}
matched = target.is_some();
@ -140,10 +164,10 @@ async fn inner_put<T: IpfsTypes>(
match (next, target.as_mut()) {
(Some(bytes), Some(target)) => {
if target.len() + bytes.len() > 1024 * 1024 {
if target.len() + bytes.len() > size_limit {
return Err(StringError::from("block is too large").into());
} else if target.is_empty() {
target.reserve(1024 * 1024);
target.reserve(size_limit);
}
target.extend_from_slice(bytes.as_ref());
}
@ -164,23 +188,7 @@ async fn inner_put<T: IpfsTypes>(
return Err(StringError::from("missing field: \"data\" (or \"file\")").into());
}
// bad thing about Box<[u8]>: converting to it forces an reallocation
let data = buffer.into_boxed_slice();
let digest = opts.digest()?(&data);
let cid = Cid::new(opts.version()?, opts.format()?, digest).map_err(StringError::from)?;
let size = data.len();
let key = cid.to_string();
let block = ipfs::Block { cid, data };
ipfs.put_block(block).await.map_err(StringError::from)?;
Ok(reply::json(&serde_json::json!({
"Key": key,
"Size": size,
})))
Ok(buffer)
}
#[derive(Debug, Serialize)]