feat: build directory trees on /add

This commit is contained in:
Joonas Koivunen 2020-08-05 20:36:56 +03:00
parent c59a4cbe22
commit ccd6bbe248
4 changed files with 187 additions and 111 deletions

1
Cargo.lock generated
View File

@ -1183,6 +1183,7 @@ dependencies = [
name = "ipfs-http" name = "ipfs-http"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-stream", "async-stream",
"bytes 0.5.6", "bytes 0.5.6",
"cid", "cid",

View File

@ -10,6 +10,7 @@ prost-build = { default-features = false, version = "0.6" }
vergen = { default-features = false, version = "3.1" } vergen = { default-features = false, version = "3.1" }
[dependencies] [dependencies]
anyhow = "*"
async-stream = { default-features = false, version = "0.3" } async-stream = { default-features = false, version = "0.3" }
bytes = { default-features = false, version = "0.5" } bytes = { default-features = false, version = "0.5" }
cid = { default-features = false, version = "0.5" } cid = { default-features = false, version = "0.5" }

View File

@ -1,11 +1,15 @@
use super::AddArgs; use super::AddArgs;
use crate::v0::support::StringError; use crate::v0::support::StringError;
use bytes::{Buf, Bytes}; use bytes::{buf::BufMutExt, Buf, BufMut, Bytes, BytesMut};
use cid::Cid; use cid::Cid;
use futures::stream::{Stream, TryStreamExt}; use futures::stream::{Stream, StreamExt, TryStreamExt};
use ipfs::{Ipfs, IpfsTypes}; use ipfs::unixfs::ll::{
dir::builder::{BufferingTreeBuilder, TreeBuildingFailed, TreeConstructionFailed},
file::adder::FileAdder,
};
use ipfs::{Block, Ipfs, IpfsTypes};
use mime::Mime; use mime::Mime;
use mpart_async::server::MultipartStream; use mpart_async::server::{MultipartError, MultipartStream};
use serde::Serialize; use serde::Serialize;
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt; use std::fmt;
@ -15,63 +19,111 @@ pub(super) async fn add_inner<T: IpfsTypes>(
ipfs: Ipfs<T>, ipfs: Ipfs<T>,
_opts: AddArgs, _opts: AddArgs,
content_type: Mime, content_type: Mime,
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Unpin, body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Unpin + 'static,
) -> Result<impl Reply, Rejection> { ) -> Result<impl Reply, Rejection> {
// FIXME: this should be without adder at least
use ipfs::unixfs::ll::{dir::builder::BufferingTreeBuilder, file::adder::FileAdder};
let boundary = content_type let boundary = content_type
.get_param("boundary") .get_param("boundary")
.map(|v| v.to_string()) .map(|v| v.to_string())
.ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?; .ok_or_else(|| StringError::from("missing 'boundary' on content-type"))?;
let mut stream = let stream = MultipartStream::new(Bytes::from(boundary), body.map_ok(|mut buf| buf.to_bytes()));
MultipartStream::new(Bytes::from(boundary), body.map_ok(|mut buf| buf.to_bytes()));
// Stream<Output = Result<Json, impl Rejection>>
//
// refine it to
//
// Stream<Output = Result<Json, AddError>>
// | |
// | convert rejection and stop the stream?
// | |
// | /
// Stream<Output = Result<impl Into<Bytes>, impl std::error::Error + Send + Sync + 'static>>
let st = add_stream(ipfs, stream);
// TODO: we could map the errors into json objects at least? (as we cannot return them as
// trailers)
let body = crate::v0::support::StreamResponse(st);
Ok(body)
}
#[derive(Debug)]
enum AddError {
Parsing(MultipartError),
Header(MultipartError),
InvalidFilename(std::str::Utf8Error),
UnsupportedField(String),
UnsupportedContentType(String),
ResponseSerialization(serde_json::Error),
Persisting(ipfs::Error),
TreeGathering(TreeBuildingFailed),
TreeBuilding(TreeConstructionFailed),
}
impl From<MultipartError> for AddError {
fn from(e: MultipartError) -> AddError {
AddError::Parsing(e)
}
}
impl fmt::Display for AddError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO
write!(fmt, "{:?}", self)
}
}
impl std::error::Error for AddError {}
fn add_stream<St, E>(
ipfs: Ipfs<impl IpfsTypes>,
mut fields: MultipartStream<St, E>,
) -> impl Stream<Item = Result<Bytes, AddError>> + Send + 'static
where
St: Stream<Item = Result<Bytes, E>> + Send + Unpin + 'static,
E: Into<anyhow::Error> + Send + 'static,
{
async_stream::try_stream! {
// TODO: wrap-in-directory option // TODO: wrap-in-directory option
let mut tree = BufferingTreeBuilder::default(); let mut tree = BufferingTreeBuilder::default();
// this should be a while loop but clippy will warn if this is a while loop which will only get let mut buffer = BytesMut::new();
// executed once.
while let Some(mut field) = stream tracing::trace!("stream started");
while let Some(mut field) = fields
.try_next() .try_next()
.await .await?
.map_err(|e| StringError::from(format!("IO error: {}", e)))?
{ {
let field_name = field
.name()
.map_err(|e| StringError::from(format!("unparseable headers: {}", e)))?;
if field_name != "file" { let field_name = field.name().map_err(AddError::Header)?;
// files are file{,-1,-2,-3,..}
// directories are dir{,-1,-2,-3,..}
let _ = if !field_name.starts_with("file") {
// this seems constant for files and directories // this seems constant for files and directories
return Err(StringError::from(format!("unsupported field: {}", field_name)).into()); Err(AddError::UnsupportedField(field_name.to_string()))
} } else {
// this is a bit ackward with the ? operator but it should save us the yield
// Err(..) followed by return; this is only available in the `stream!` variant,
// which continues after errors by default..
Ok(())
}?;
let filename = field let filename = field.filename().map_err(AddError::Header)?;
.filename() let filename = percent_encoding::percent_decode_str(filename)
.map_err(|e| StringError::from(format!("unparseable filename: {}", e)))? .decode_utf8()
.to_string(); .map(|cow| cow.into_owned())
.map_err(AddError::InvalidFilename)?;
// unixfsv1.5 metadata seems to be in custom headers for both files and additional let content_type = field.content_type().map_err(AddError::Header)?;
// directories:
// - mtime: timespec
// - mtime-nsecs: timespec
//
// should probably read the metadata here to have it available for both files and
// directories?
//
// FIXME: tomorrow:
// - need to make this a stream
// - need to yield progress reports
// - before yielding file results, we should add it to builder
// - finally at the end we should build the tree
let content_type = field let next = match content_type {
.content_type() "application/octet-stream" => {
.map_err(|e| StringError::from(format!("unparseable content-type: {}", e)))?; tracing::trace!("processing file {:?}", filename);
if content_type == "application/octet-stream" {
// Content-Type: application/octet-stream for files
let mut adder = FileAdder::default(); let mut adder = FileAdder::default();
let mut total = 0u64; let mut total = 0u64;
@ -79,7 +131,7 @@ pub(super) async fn add_inner<T: IpfsTypes>(
let next = field let next = field
.try_next() .try_next()
.await .await
.map_err(|e| StringError::from(format!("IO error: {}", e)))?; .map_err(AddError::Parsing)?;
match next { match next {
Some(next) => { Some(next) => {
@ -88,12 +140,12 @@ pub(super) async fn add_inner<T: IpfsTypes>(
let (iter, used) = adder.push(&next.slice(read..)); let (iter, used) = adder.push(&next.slice(read..));
read += used; read += used;
let maybe_tuple = import_all(&ipfs, iter).await.map_err(|e| { let maybe_tuple = import_all(&ipfs, iter).await.map_err(AddError::Persisting)?;
StringError::from(format!("Failed to save blocks: {}", e))
})?;
total += maybe_tuple.map(|t| t.1).unwrap_or(0); total += maybe_tuple.map(|t| t.1).unwrap_or(0);
} }
tracing::trace!("read {} bytes", read);
} }
None => break, None => break,
} }
@ -101,56 +153,72 @@ pub(super) async fn add_inner<T: IpfsTypes>(
let (root, subtotal) = import_all(&ipfs, adder.finish()) let (root, subtotal) = import_all(&ipfs, adder.finish())
.await .await
.map_err(|e| StringError::from(format!("Failed to save blocks: {}", e)))? .map_err(AddError::Persisting)?
.expect("I think there should always be something from finish -- except if the link block has just been compressed?"); .expect("I think there should always be something from finish -- except if the link block has just been compressed?");
total += subtotal; total += subtotal;
tracing::trace!("completed processing file of {} bytes: {:?}", total, filename);
// using the filename as the path since we can tolerate a single empty named file // using the filename as the path since we can tolerate a single empty named file
// however the second one will cause issues // however the second one will cause issues
tree.put_file(filename.as_ref().unwrap_or_default(), root, total) tree.put_file(&filename, root.clone(), total)
.map_err(|e| { .map_err(AddError::TreeGathering)?;
StringError::from(format!("Failed to record file in the tree: {}", e))
})?;
let root = root.to_string();
let filename: Cow<'_, str> = if filename.is_empty() { let filename: Cow<'_, str> = if filename.is_empty() {
// cid needs to be repeated if no filename was given // cid needs to be repeated if no filename was given
Cow::Borrowed(&root) Cow::Owned(root.to_string())
} else { } else {
Cow::Owned(filename) Cow::Owned(filename)
}; };
return Ok(warp::reply::json(&Response::Added { serde_json::to_writer((&mut buffer).writer(), &Response::Added {
name: filename, name: filename,
hash: Cow::Borrowed(&root), hash: Quoted(&root),
size: Quoted(total), size: Quoted(total),
})); }).map_err(AddError::ResponseSerialization)?;
} else if content_type == "application/x-directory" {
// Content-Type: application/x-directory for additional directories or for setting buffer.put(&b"\r\n"[..]);
// metadata on them
return Err(StringError::from(format!( Ok(buffer.split().freeze())
"not implemented: {}", },
content_type /*"application/x-directory"
))); |*/ unsupported => {
} else { Err(AddError::UnsupportedContentType(unsupported.to_string()))
// should be 405?
return Err(StringError::from(format!(
"unsupported content-type: {}",
content_type
)));
} }
}?;
yield next;
} }
Err(StringError::from("not implemented").into()) let mut full_path = String::new();
let mut block_buffer = Vec::new();
let mut iter = tree.build(&mut full_path, &mut block_buffer);
while let Some(res) = iter.next_borrowed() {
let (path, cid, total, block) = res.map_err(AddError::TreeBuilding)?;
// shame we need to allocate once again here..
ipfs.put_block(Block { cid: cid.to_owned(), data: block.into() }).await.map_err(AddError::Persisting)?;
serde_json::to_writer((&mut buffer).writer(), &Response::Added {
name: Cow::Borrowed(path),
hash: Quoted(cid),
size: Quoted(total),
}).map_err(AddError::ResponseSerialization)?;
buffer.put(&b"\r\n"[..]);
yield buffer.split().freeze();
}
}
} }
async fn import_all( async fn import_all(
ipfs: &Ipfs<impl IpfsTypes>, ipfs: &Ipfs<impl IpfsTypes>,
iter: impl Iterator<Item = (Cid, Vec<u8>)>, iter: impl Iterator<Item = (Cid, Vec<u8>)>,
) -> Result<Option<(Cid, u64)>, ipfs::Error> { ) -> Result<Option<(Cid, u64)>, ipfs::Error> {
use ipfs::Block;
// TODO: use FuturesUnordered // TODO: use FuturesUnordered
let mut last: Option<Cid> = None; let mut last: Option<Cid> = None;
let mut total = 0u64; let mut total = 0u64;
@ -188,10 +256,10 @@ enum Response<'a> {
#[serde(rename_all = "PascalCase")] #[serde(rename_all = "PascalCase")]
Added { Added {
/// The resulting Cid as a string. /// The resulting Cid as a string.
hash: Cow<'a, str>, hash: Quoted<&'a Cid>,
/// Name of the file added from filename or the resulting Cid. /// Name of the file added from filename or the resulting Cid.
name: Cow<'a, str>, name: Cow<'a, str>,
/// Stringified version of the total size in bytes. /// Stringified version of the total cumulative size in bytes.
size: Quoted<u64>, size: Quoted<u64>,
}, },
} }

View File

@ -243,7 +243,7 @@ impl BufferingTreeBuilder {
/// Returned `PostOrderIterator` will use the given `full_path` and `block_buffer` to store /// Returned `PostOrderIterator` will use the given `full_path` and `block_buffer` to store
/// it's data during the walk. `PostOrderIterator` implements `Iterator` while also allowing /// it's data during the walk. `PostOrderIterator` implements `Iterator` while also allowing
/// borrowed access via `next_borrowed`. /// borrowed access via `next_borrowed`.
fn build<'a>( pub fn build<'a>(
self, self,
full_path: &'a mut String, full_path: &'a mut String,
block_buffer: &'a mut Vec<u8>, block_buffer: &'a mut Vec<u8>,
@ -263,6 +263,7 @@ impl BufferingTreeBuilder {
persisted_cids: Default::default(), persisted_cids: Default::default(),
reused_children: Vec::new(), reused_children: Vec::new(),
cid: None, cid: None,
total_size: 0,
wrap_in_directory: self.opts.wrap_in_directory, wrap_in_directory: self.opts.wrap_in_directory,
} }
} }
@ -336,6 +337,7 @@ pub struct PostOrderIterator<'a> {
persisted_cids: HashMap<Option<u64>, BTreeMap<String, Leaf>>, persisted_cids: HashMap<Option<u64>, BTreeMap<String, Leaf>>,
reused_children: Vec<Visited>, reused_children: Vec<Visited>,
cid: Option<Cid>, cid: Option<Cid>,
total_size: u64,
// from TreeOptions // from TreeOptions
wrap_in_directory: bool, wrap_in_directory: bool,
} }
@ -411,9 +413,9 @@ impl<'a> PostOrderIterator<'a> {
}) })
} }
fn next_borrowed<'b>( pub fn next_borrowed<'b>(
&'b mut self, &'b mut self,
) -> Option<Result<(&'b str, &'b Cid, &'b [u8]), TreeConstructionFailed>> { ) -> Option<Result<(&'b str, &'b Cid, u64, &'b [u8]), TreeConstructionFailed>> {
while let Some(visited) = self.pending.pop() { while let Some(visited) = self.pending.pop() {
let (name, depth) = match &visited { let (name, depth) = match &visited {
Visited::Descent { name, depth, .. } => (name.as_deref(), *depth), Visited::Descent { name, depth, .. } => (name.as_deref(), *depth),
@ -494,6 +496,7 @@ impl<'a> PostOrderIterator<'a> {
}; };
self.cid = Some(leaf.link.clone()); self.cid = Some(leaf.link.clone());
self.total_size = leaf.total_size;
// this reuse strategy is probably good enough // this reuse strategy is probably good enough
collected.clear(); collected.clear();
@ -525,6 +528,7 @@ impl<'a> PostOrderIterator<'a> {
return Some(Ok(( return Some(Ok((
self.full_path.as_str(), self.full_path.as_str(),
self.cid.as_ref().unwrap(), self.cid.as_ref().unwrap(),
self.total_size,
&self.block_buffer, &self.block_buffer,
))); )));
} }
@ -539,7 +543,9 @@ impl<'a> Iterator for PostOrderIterator<'a> {
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.next_borrowed().map(|res| { self.next_borrowed().map(|res| {
res.map(|(full_path, cid, block)| (full_path.to_string(), cid.to_owned(), block.into())) res.map(|(full_path, cid, _, block)| {
(full_path.to_string(), cid.to_owned(), block.into())
})
}) })
} }
} }