fix: load the block always on walk_paths
this was a curious bug: to pass the test on interface-ipfs-core we need to be still loading the first block and we must not be on the stream phase. apparently the http cancellation works differently depending on whether any response headers have been sent or not as the test fails if the headers and cancelled error is sent. the issue of returning either an loaded raw block or possibly projected Ipld value was solved by introducing yet another enum Loaded. I did consider `Either` from crate either, and `Result<Box<[u8]>, Ipld>` but the custom enum at least allows us to write a documentation story what the alternatives represent.
This commit is contained in:
parent
2d99d1aed2
commit
31b30cddf5
@ -223,6 +223,16 @@ impl From<(WalkFailed, Cid)> for WalkError {
|
||||
}
|
||||
}
|
||||
|
||||
/// The IpfsPath walk can end in with the target block loaded or parsed and optionally projected as
|
||||
/// an Ipld.
|
||||
#[derive(Debug)]
|
||||
pub enum Loaded {
|
||||
/// The raw block from `ipfs.get_block`
|
||||
Raw(Box<[u8]>),
|
||||
/// Possibly projected IPLD value.
|
||||
Ipld(Ipld),
|
||||
}
|
||||
|
||||
/// Walks the `path` while loading the links.
|
||||
///
|
||||
/// Returns the Cid where we ended up, and an optional Ipld structure if one was projected, and the
|
||||
@ -230,7 +240,7 @@ impl From<(WalkFailed, Cid)> for WalkError {
|
||||
pub async fn walk_path<T: IpfsTypes>(
|
||||
ipfs: &Ipfs<T>,
|
||||
mut path: IpfsPath,
|
||||
) -> Result<(Cid, Option<Ipld>, Vec<String>), WalkError> {
|
||||
) -> Result<(Cid, Loaded, Vec<String>), WalkError> {
|
||||
use ipfs::unixfs::ll::MaybeResolved;
|
||||
|
||||
let mut current = path.take_root().unwrap();
|
||||
@ -238,13 +248,27 @@ pub async fn walk_path<T: IpfsTypes>(
|
||||
|
||||
let mut path_inside_last = Vec::new();
|
||||
|
||||
'outer: while let Some(mut needle) = path.next() {
|
||||
// important: on the `/refs` path we need to fetch the first block to fail deterministic so we
|
||||
// need to load it either way here; if the response gets processed to the stream phase, it'll
|
||||
// always fire up a response and the test 'should print nothing for non-existent hashes' fails.
|
||||
// Not sure how correct that is, but that is the test.
|
||||
|
||||
'outer: loop {
|
||||
let Block { data, .. } = match ipfs.get_block(¤t).await {
|
||||
Ok(block) => block,
|
||||
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
|
||||
};
|
||||
|
||||
// needs to be mutable because the Ipld walk will overwrite it to project down in the
|
||||
// document
|
||||
let mut needle = if let Some(needle) = path.next() {
|
||||
needle
|
||||
} else {
|
||||
return Ok((current, Loaded::Raw(data), Vec::new()));
|
||||
};
|
||||
|
||||
if current.codec() == cid::Codec::DagProtobuf {
|
||||
|
||||
let mut lookup = match ipfs::unixfs::ll::resolve(&data, &needle, &mut cache) {
|
||||
Ok(MaybeResolved::NeedToLoadMore(lookup)) => lookup,
|
||||
Ok(MaybeResolved::Found(cid)) => {
|
||||
@ -267,7 +291,7 @@ pub async fn walk_path<T: IpfsTypes>(
|
||||
let data = ipfs::unixfs::ll::dagpb::node_data(&data)
|
||||
.expect("already parsed once, second time cannot fail")
|
||||
.unwrap_or_default();
|
||||
return Ok((current, Some(Ipld::Bytes(data.to_vec())), vec![needle]));
|
||||
return Ok((current, Loaded::Ipld(Ipld::Bytes(data.to_vec())), vec![needle]));
|
||||
}
|
||||
let e = WalkFailed::from(path::WalkFailed::UnmatchedNamedLink(needle));
|
||||
return Err(WalkError::from((e, current)));
|
||||
@ -299,7 +323,7 @@ pub async fn walk_path<T: IpfsTypes>(
|
||||
let data = ipfs::unixfs::ll::dagpb::node_data(&data)
|
||||
.expect("already parsed once, second time cannot fail")
|
||||
.unwrap_or_default();
|
||||
return Ok((current, Some(Ipld::Bytes(data.to_vec())), vec![needle]));
|
||||
return Ok((current, Loaded::Ipld(Ipld::Bytes(data.to_vec())), vec![needle]));
|
||||
}
|
||||
let e = WalkFailed::from(path::WalkFailed::UnmatchedNamedLink(needle));
|
||||
return Err(WalkError::from((e, next)));
|
||||
@ -339,17 +363,15 @@ pub async fn walk_path<T: IpfsTypes>(
|
||||
needle = match path.next() {
|
||||
Some(needle) => needle,
|
||||
None => break,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if path.len() == 0 {
|
||||
path_inside_last.shrink_to_fit();
|
||||
return Ok((current, Some(ipld), path_inside_last));
|
||||
return Ok((current, Loaded::Ipld(ipld), path_inside_last));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((current, None, Vec::new()))
|
||||
}
|
||||
|
||||
/// Gather links as edges between two documents from all of the `iplds` which represent the
|
||||
@ -368,7 +390,7 @@ pub async fn walk_path<T: IpfsTypes>(
|
||||
/// If there are dag-pb nodes and the libipld has changed it's dag-pb tree structure.
|
||||
fn iplds_refs<T: IpfsTypes>(
|
||||
ipfs: Ipfs<T>,
|
||||
iplds: Vec<(Cid, Option<Ipld>)>,
|
||||
iplds: Vec<(Cid, Loaded)>,
|
||||
max_depth: Option<u64>,
|
||||
unique: bool,
|
||||
) -> impl Stream<Item = Result<(Cid, Cid, Option<String>), String>> + Send + 'static {
|
||||
@ -386,9 +408,8 @@ fn iplds_refs<T: IpfsTypes>(
|
||||
for (origin, maybe_ipld) in iplds {
|
||||
|
||||
let ipld = match maybe_ipld {
|
||||
Some(ipld) => ipld,
|
||||
None => {
|
||||
let Block { data, .. } = ipfs.get_block(&origin).await.map_err(|e| e.to_string())?;
|
||||
Loaded::Ipld(ipld) => ipld,
|
||||
Loaded::Raw(data) => {
|
||||
decode_ipld(&origin, &data).map_err(|e| e.to_string())?
|
||||
}
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user