332: refactor: ipfs.dag().resolve(..) from ipfs-http r=koivunej a=koivunej

This PR continues refactoring/rewriting the ipfs-http IpfsPath resolving, so that we can move refs into ipfs, so that we can implement pinning and unpinning. Realized while doing this that perhaps pinning api operated just on Cids so this may have been unnecessary, however refs works on IpfsPaths so not really sure yet.

The result of `IpfsDag::resolve` may be already loaded block, it's actually always a loaded block. Made some changes to ipfs::unixfs::cat to accept either `Block` or `Cid`. There may have been similar changes as well.

Step forward on #238 and towards #11. It is more lines added than removed probably due to more refined return types and solved local resolving or not following links (the test doesn't work though but the concept or implementation hint makes sense). Previous version did not support local resolving at all, nor could that had been added there in any way.

Co-authored-by: Joonas Koivunen <joonas@equilibrium.co>
This commit is contained in:
bors[bot] 2020-08-25 11:08:11 +00:00 committed by GitHub
commit c9acf4ca8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1327 additions and 750 deletions

View File

@ -51,9 +51,8 @@ tests.swarm(factory)
// Phase 1.2
// this is ignored because the js ipfs-http-client doesn't expose the
// localResolve parameter, and the later versions no longer send Content-Length
// header, which our implementation requires.
// ignored as the test doesn't pass at all through ipfs-http-client even
// against jsipfs. we do at least return the same value.
tests.dag.get(factory, { skip: ['should get only a CID, due to resolving locally only'] })
tests.dag.put(factory)

View File

@ -1,10 +1,6 @@
#![recursion_limit = "512"]
use cid::Cid;
use futures::pin_mut;
use futures::stream::StreamExt; // needed for StreamExt::next
use ipfs::{Ipfs, TestTypes, UninitializedIpfs};
use std::convert::TryFrom;
use ipfs::{Ipfs, IpfsPath, TestTypes, UninitializedIpfs};
use std::env;
use std::process::exit;
use tokio::io::AsyncWriteExt;
@ -17,18 +13,18 @@ async fn main() {
// expected to be used by connecting another ipfs peer to it and providing the blocks from that
// peer.
let cid = match env::args().nth(1).map(Cid::try_from) {
let path = match env::args().nth(1).map(|s| s.parse::<IpfsPath>()) {
Some(Ok(cid)) => cid,
Some(Err(e)) => {
eprintln!(
"Failed to parse {} as Cid: {}",
"Failed to parse {} as IpfsPath: {}",
env::args().nth(1).unwrap(),
e
);
exit(1);
}
None => {
eprintln!("Usage: fetch_and_cat CID");
eprintln!("Usage: fetch_and_cat <IPFS_PATH | CID>");
eprintln!(
"Example will accept connections and print all bytes of the unixfs file to \
stdout."
@ -37,25 +33,31 @@ async fn main() {
}
};
if path.root().cid().is_none() {
eprintln!(
"Unsupported path: ipns resolution is not available yet: {}",
path
);
exit(1);
}
// Start daemon and initialize repo
let (ipfs, fut): (Ipfs<TestTypes>, _) =
UninitializedIpfs::default().await.start().await.unwrap();
tokio::task::spawn(fut);
let (public_key, addresses) = ipfs.identity().await.unwrap();
let (_, addresses) = ipfs.identity().await.unwrap();
assert!(!addresses.is_empty(), "Zero listening addresses");
eprintln!("Please connect an ipfs node having {} to:\n", cid);
let peer_id = public_key.into_peer_id().to_string();
eprintln!("Please connect an ipfs node having {} to:\n", path);
for address in addresses {
eprintln!(" - {}/p2p/{}", address, peer_id);
eprintln!(" - {}", address);
}
eprintln!();
let stream = ipfs.cat_unixfs(cid, None).await.unwrap_or_else(|e| {
let stream = ipfs.cat_unixfs(path, None).await.unwrap_or_else(|e| {
eprintln!("Error: {}", e);
exit(1);
});
@ -63,11 +65,20 @@ async fn main() {
pin_mut!(stream);
let mut stdout = tokio::io::stdout();
let mut total = 0;
loop {
// This could be made more performant by polling the stream while writing to stdout.
match stream.next().await {
Some(Ok(bytes)) => {
total += bytes.len();
stdout.write_all(&bytes).await.unwrap();
eprintln!(
"Received: {:>12} bytes, Total: {:>12} bytes",
bytes.len(),
total
);
}
Some(Err(e)) => {
eprintln!("Error: {}", e);
@ -76,4 +87,6 @@ async fn main() {
None => break,
}
}
eprintln!("Total received: {} bytes", total);
}

View File

@ -115,44 +115,35 @@ pub fn resolve<T: IpfsTypes>(
struct ResolveOptions {
arg: String,
timeout: Option<StringSerialized<humantime::Duration>>,
#[serde(rename = "local-resolve", default)]
local_resolve: bool,
}
async fn inner_resolve<T: IpfsTypes>(
ipfs: Ipfs<T>,
opts: ResolveOptions,
) -> Result<impl Reply, Rejection> {
use crate::v0::refs::{walk_path, IpfsPath, WalkOptions};
use ipfs::IpfsPath;
use std::convert::TryFrom;
let path = IpfsPath::try_from(opts.arg.as_str()).map_err(StringError::from)?;
let walk_opts = WalkOptions {
follow_dagpb_data: true,
};
// I think the naming of local_resolve is quite confusing. when following links we "resolve
// globally" and when not following links we are "resolving locally", or in single document.
let follow_links = !opts.local_resolve;
let (current, _, remaining) = walk_path(&ipfs, &walk_opts, path)
let (resolved, remaining) = ipfs
.dag()
.resolve(path, follow_links)
.maybe_timeout(opts.timeout.map(StringSerialized::into_inner))
.await
.map_err(StringError::from)?
.map_err(StringError::from)?;
let remaining = {
let slashes = remaining.len();
let mut buf =
String::with_capacity(remaining.iter().map(|s| s.len()).sum::<usize>() + slashes);
for piece in remaining.into_iter().rev() {
if !buf.is_empty() {
buf.push('/');
}
buf.push_str(&piece);
}
buf
};
let current = resolved.source();
Ok(reply::json(&json!({
"Cid": { "/": current.to_string() },
"RemPath": remaining,
"RemPath": StringSerialized(remaining),
})))
}

View File

@ -1,10 +1,9 @@
use crate::v0::support::{with_ipfs, MaybeTimeoutExt, StringError};
use cid::{self, Cid};
use futures::stream;
use futures::stream::Stream;
use futures::future::ready;
use futures::stream::{self, FuturesOrdered, Stream, StreamExt, TryStreamExt};
use ipfs::ipld::{decode_ipld, Ipld};
use ipfs::{Block, Error};
use ipfs::{Ipfs, IpfsTypes};
use ipfs::{Block, Ipfs, IpfsTypes};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::VecDeque;
@ -18,10 +17,8 @@ use options::RefsOptions;
mod format;
use format::EdgeFormatter;
pub(crate) mod path;
use ipfs::dag::ResolveError;
pub use ipfs::path::IpfsPath;
use path::resolve_segment;
pub use path::WalkSuccess;
use crate::v0::support::{HandledErr, StreamResponse};
@ -36,8 +33,6 @@ async fn refs_inner<T: IpfsTypes>(
ipfs: Ipfs<T>,
opts: RefsOptions,
) -> Result<impl Reply, Rejection> {
use futures::stream::StreamExt;
let max_depth = opts.max_depth();
let formatter = EdgeFormatter::from_options(opts.edges, opts.format.as_deref())
.map_err(StringError::from)?;
@ -72,6 +67,8 @@ async fn refs_inner<T: IpfsTypes>(
// expects the headers to be blocked before the timeout expires.
let st = st.map(move |res| {
// FIXME: strings are allocated for nothing, could just use a single BytesMut for the
// rendering
let res = match res {
Ok((source, dest, link_name)) => {
let ok = formatter.format(source, dest, link_name);
@ -116,7 +113,7 @@ fn refs_options() -> impl Filter<Extract = (RefsOptions,), Error = Rejection> +
.map_err(StringError::from)
.map_err(warp::reject::custom);
futures::future::ready(res)
ready(res)
})
}
@ -129,28 +126,46 @@ async fn refs_paths<T: IpfsTypes>(
paths: Vec<IpfsPath>,
max_depth: Option<u64>,
unique: bool,
) -> Result<impl Stream<Item = Result<(Cid, Cid, Option<String>), String>> + Send + 'static, Error>
{
use futures::stream::FuturesOrdered;
use futures::stream::TryStreamExt;
) -> Result<
impl Stream<Item = Result<(Cid, Cid, Option<String>), String>> + Send + 'static,
ResolveError,
> {
use ipfs::dag::ResolvedNode;
let opts = WalkOptions {
follow_dagpb_data: true,
};
let dag = ipfs.dag();
// added braces to spell it out for borrowck that opts does not outlive this fn
// added braces to spell it out for borrowck that dag does not outlive this fn
let iplds = {
// the assumption is that futuresordered will poll the first N items until the first completes,
// buffering the others. it might not be 100% parallel but it's probably enough.
let mut walks = FuturesOrdered::new();
for path in paths {
walks.push(walk_path(&ipfs, &opts, path));
walks.push(dag.resolve(path, true));
}
// strip out the path inside last document, we don't need it
walks
.map_ok(|(cid, maybe_ipld, _)| (cid, maybe_ipld))
// strip out the path inside last document, we don't need it
.try_filter_map(|(resolved, _)| {
ready(match resolved {
// filter out anything scoped to /Data on a dag-pb node; those cannot contain
// links as all links for a dag-pb are under /Links
ResolvedNode::DagPbData(_, _) => Ok(None),
ResolvedNode::Link(..) => unreachable!("followed links"),
// decode and hope for the best; this of course does a lot of wasted effort;
// hopefully one day we can do "projectioned decoding", like here we'd only
// need all of the links of the block
ResolvedNode::Block(b) => match decode_ipld(b.cid(), b.data()) {
Ok(ipld) => Ok(Some((b.cid, ipld))),
Err(e) => Err(ResolveError::UnsupportedDocument(b.cid, e.into())),
},
// the most straight-forward variant with pre-projected document
ResolvedNode::Projection(cid, ipld) => Ok(Some((cid, ipld))),
})
})
// TODO: collecting here is actually a quite unnecessary, if only we could make this into a
// stream.. however there may have been the case that all paths need to resolve before
// http status code is determined so perhaps this is the only way.
.try_collect()
.await?
};
@ -158,275 +173,6 @@ async fn refs_paths<T: IpfsTypes>(
Ok(iplds_refs(ipfs, iplds, max_depth, unique))
}
#[derive(Debug)]
pub struct WalkError {
pub(crate) last_cid: Cid,
pub(crate) reason: WalkFailed,
}
#[derive(Debug)]
pub enum WalkFailed {
Loading(Error),
Parsing(ipfs::ipld::BlockError),
DagPb(ipfs::unixfs::ll::ResolveError),
IpldWalking(path::WalkFailed),
}
use std::fmt;
impl fmt::Display for WalkError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use WalkFailed::*;
match &self.reason {
Loading(e) => write!(fmt, "loading of {} failed: {}", self.last_cid, e),
Parsing(e) => write!(fmt, "failed to parse {} as IPLD: {}", self.last_cid, e),
DagPb(e) => write!(
fmt,
"failed to resolve {} over dag-pb: {}",
self.last_cid, e
),
// this is asserted in the conformance tests and I don't really want to change the
// tests for this
IpldWalking(e) => write!(fmt, "{} under {}", e, self.last_cid),
}
}
}
impl std::error::Error for WalkError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use WalkFailed::*;
match &self.reason {
Loading(_) => None, // TODO: anyhow
Parsing(e) => Some(e),
DagPb(e) => Some(e),
IpldWalking(e) => Some(e),
}
}
}
impl From<Error> for WalkFailed {
fn from(e: Error) -> Self {
WalkFailed::Loading(e)
}
}
impl From<ipfs::ipld::BlockError> for WalkFailed {
fn from(e: ipfs::ipld::BlockError) -> Self {
WalkFailed::Parsing(e)
}
}
impl From<ipfs::unixfs::ll::ResolveError> for WalkFailed {
fn from(e: ipfs::unixfs::ll::ResolveError) -> Self {
WalkFailed::DagPb(e)
}
}
impl From<path::WalkFailed> for WalkFailed {
fn from(e: path::WalkFailed) -> Self {
WalkFailed::IpldWalking(e)
}
}
impl From<(WalkFailed, Cid)> for WalkError {
fn from((reason, last_cid): (WalkFailed, Cid)) -> Self {
WalkError { last_cid, reason }
}
}
/// The IpfsPath walk can end in with the target block loaded or parsed and optionally projected as
/// an Ipld.
#[derive(Debug)]
pub enum Loaded {
/// The raw block from `ipfs.get_block`
Raw(Box<[u8]>),
/// Possibly projected IPLD value.
Ipld(Ipld),
}
#[derive(Default, Debug)]
pub struct WalkOptions {
pub follow_dagpb_data: bool,
}
/// Walks the `path` while loading the links.
///
/// Returns the Cid where we ended up, and an optional Ipld structure if one was projected, and the
/// path inside the last document we walked.
pub async fn walk_path<T: IpfsTypes>(
ipfs: &Ipfs<T>,
opts: &WalkOptions,
path: IpfsPath,
) -> Result<(Cid, Loaded, Vec<String>), WalkError> {
use ipfs::unixfs::ll::{MaybeResolved, ResolveError};
let mut current = path
.root()
.cid()
.expect("unsupported: need to add an error variant for this! or design around it")
.to_owned();
let mut iter = path.path().iter();
// cache for any datastructure used in repeated hamt lookups
let mut cache = None;
// the path_inside_last applies only in the IPLD projection case and its main consumer is the
// `/dag/resolve` API where the response is the returned cid and the "remaining path".
let mut path_inside_last = Vec::new();
// important: on the `/refs` path we need to fetch the first block to fail deterministically so we
// need to load it either way here; if the response gets processed to the stream phase, it'll
// always fire up a response and the test 'should print nothing for non-existent hashes' fails.
// Not sure how correct that is, but that is the test.
'outer: loop {
let Block { data, .. } = match ipfs.get_block(&current).await {
Ok(block) => block,
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
};
// needs to be mutable because the Ipld walk will overwrite it to project down in the
// document
let mut needle = if let Some(needle) = iter.next() {
needle
} else {
return Ok((current, Loaded::Raw(data), Vec::new()));
};
if current.codec() == cid::Codec::DagProtobuf {
let mut lookup = match ipfs::unixfs::ll::resolve(&data, &needle, &mut cache) {
Ok(MaybeResolved::NeedToLoadMore(lookup)) => lookup,
Ok(MaybeResolved::Found(cid)) => {
current = cid;
continue;
}
Ok(MaybeResolved::NotFound) => {
return handle_dagpb_not_found(
current,
&data,
needle.to_owned(),
iter.next().is_none(),
opts,
)
}
Err(ResolveError::UnexpectedType(_)) => {
// the conformance tests use a path which would end up going through a file
// and the returned error string is tested against listed alternatives.
// unexpected type is not one of them.
let e =
WalkFailed::from(path::WalkFailed::UnmatchedNamedLink(needle.to_owned()));
return Err(WalkError::from((e, current)));
}
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
};
loop {
let (next, _) = lookup.pending_links();
// need to take ownership in order to enrich the error, next is invalidaded on
// lookup.continue_walk.
let next = next.to_owned();
let Block { data, .. } = match ipfs.get_block(&next).await {
Ok(block) => block,
Err(e) => return Err(WalkError::from((WalkFailed::from(e), next))),
};
match lookup.continue_walk(&data, &mut cache) {
Ok(MaybeResolved::NeedToLoadMore(next)) => lookup = next,
Ok(MaybeResolved::Found(cid)) => {
current = cid;
break;
}
Ok(MaybeResolved::NotFound) => {
return handle_dagpb_not_found(
next,
&data,
needle.to_owned(),
iter.next().is_none(),
opts,
)
}
Err(e) => {
return Err(WalkError::from((
WalkFailed::from(e.into_resolve_error()),
next,
)))
}
}
}
} else {
path_inside_last.clear();
let mut ipld = match decode_ipld(&current, &data) {
Ok(ipld) => ipld,
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
};
loop {
// this needs to be stored at least temporarily to recover the path_inside_last or
// the "remaining path"
let tmp = needle.clone();
ipld = match resolve_segment(&needle, ipld) {
Ok(WalkSuccess::AtDestination(ipld)) => {
path_inside_last.push(tmp);
ipld
}
Ok(WalkSuccess::Link(_, next_cid)) => {
current = next_cid;
continue 'outer;
}
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
};
// we might resolve multiple segments inside a single document
needle = match iter.next() {
Some(needle) => needle,
None => break,
};
}
if iter.len() == 0 {
// when done with the remaining IpfsPath we should be set with the projected Ipld
// document
path_inside_last.shrink_to_fit();
return Ok((current, Loaded::Ipld(ipld), path_inside_last));
}
}
}
}
fn handle_dagpb_not_found(
at: Cid,
data: &[u8],
needle: String,
last_segment: bool,
opts: &WalkOptions,
) -> Result<(Cid, Loaded, Vec<String>), WalkError> {
use ipfs::unixfs::ll::dagpb::node_data;
if opts.follow_dagpb_data && last_segment && needle == "Data" {
// /dag/resolve needs to "resolve through" a dag-pb node down to the "just data" even
// though we do not need to extract it ... however this might be good to just filter with
// refs, as no refs of such path can exist as the links are in the outer structure.
//
// testing with go-ipfs 0.5 reveals that dag resolve only follows links
// which are actually present in the dag-pb, not numeric links like Links/5
// or links/5, even if such are present in the `dag get` output.
//
// comment on this special casing: there cannot be any other such
// special case as the Links do not work like Data so while this is not
// pretty, it's not terrible.
let data = node_data(&data)
.expect("already parsed once, second time cannot fail")
.unwrap_or_default();
Ok((at, Loaded::Ipld(Ipld::Bytes(data.to_vec())), vec![needle]))
} else {
let e = WalkFailed::from(path::WalkFailed::UnmatchedNamedLink(needle));
Err(WalkError::from((e, at)))
}
}
/// Gather links as edges between two documents from all of the `iplds` which represent the
/// document and it's original `Cid`, as the `Ipld` can be a subtree of the document.
///
@ -443,7 +189,7 @@ fn handle_dagpb_not_found(
/// If there are dag-pb nodes and the libipld has changed it's dag-pb tree structure.
fn iplds_refs<T: IpfsTypes>(
ipfs: Ipfs<T>,
iplds: Vec<(Cid, Loaded)>,
iplds: Vec<(Cid, Ipld)>,
max_depth: Option<u64>,
unique: bool,
) -> impl Stream<Item = Result<(Cid, Cid, Option<String>), String>> + Send + 'static {
@ -459,15 +205,7 @@ fn iplds_refs<T: IpfsTypes>(
let mut visited = HashSet::new();
let mut work = VecDeque::new();
for (origin, maybe_ipld) in iplds {
let ipld = match maybe_ipld {
Loaded::Ipld(ipld) => ipld,
Loaded::Raw(data) => {
decode_ipld(&origin, &data).map_err(|e| e.to_string())?
}
};
for (origin, ipld) in iplds {
for (link_name, next_cid) in ipld_links(&origin, ipld) {
work.push_back((0, next_cid, origin.clone(), link_name));
}

View File

@ -1,268 +0,0 @@
//! Ipfs path handling following https://github.com/ipfs/go-path/. This rests under ipfs-http for
//! now until it's ready to be moved to `rust-ipfs`. There is a competing implementation under
//! `libipld` which might do almost the same things, but with different dependencies. This should
//! be moving over to `ipfs` once we have seen that this works for `api/v0/dag/get` as well.
//!
//! Does not allow the root to be anything else than `/ipfs/` or missing at the moment.
use cid::{self, Cid};
use ipfs::ipld::Ipld;
use std::collections::BTreeMap;
use std::fmt;
pub fn resolve_segment(key: &str, mut ipld: Ipld) -> Result<WalkSuccess, WalkFailed> {
ipld = match ipld {
Ipld::Link(cid) if key == "." => {
// go-ipfs: allows this to be skipped. let's require the dot for now.
// FIXME: this would require the iterator to be peekable in addition.
return Ok(WalkSuccess::Link(key.to_owned(), cid));
}
Ipld::Map(mut m) => {
if let Some(ipld) = m.remove(key) {
ipld
} else {
return Err(WalkFailed::UnmatchedMapProperty(m, key.to_owned()));
}
}
Ipld::List(mut l) => {
if let Ok(index) = key.parse::<usize>() {
if index < l.len() {
l.swap_remove(index)
} else {
return Err(WalkFailed::ListIndexOutOfRange(l, index));
}
} else {
return Err(WalkFailed::UnparseableListIndex(l, key.to_owned()));
}
}
x => return Err(WalkFailed::UnmatchableSegment(x, key.to_owned())),
};
if let Ipld::Link(next_cid) = ipld {
Ok(WalkSuccess::Link(key.to_owned(), next_cid))
} else {
Ok(WalkSuccess::AtDestination(ipld))
}
}
/// The success values walking an `IpfsPath` can result to.
#[derive(Debug, PartialEq)]
pub enum WalkSuccess {
/// IpfsPath arrived at destination, following walk attempts will return EmptyPath
AtDestination(Ipld),
/// Path segment lead to a link which needs to be loaded to continue the walk
Link(String, Cid),
}
/// These errors correspond to ones given out by go-ipfs 0.4.23 if the walk cannot be completed.
/// go-ipfs reports these as 500 Internal Errors.
#[derive(Debug, PartialEq)]
pub enum WalkFailed {
/// Map key was not found
UnmatchedMapProperty(BTreeMap<String, Ipld>, String),
/// Segment could not be parsed as index
UnparseableListIndex(Vec<Ipld>, String),
/// Segment was out of range for the list
ListIndexOutOfRange(Vec<Ipld>, usize),
/// Catch-all failure for example when walking a segment on integer
UnmatchableSegment(Ipld, String),
/// Non-ipld walk failure on dag-pb
UnmatchedNamedLink(String),
UnsupportedWalkOnDagPbIpld,
}
impl fmt::Display for WalkFailed {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
// go-ipfs: no such link found or in cat: file does not exist
// js-ipfs: no link named "$key" under $cid
WalkFailed::UnmatchedMapProperty(_, ref key)
| WalkFailed::UnmatchedNamedLink(ref key) => {
write!(fmt, "no link named \"{}\"", key)
},
// go-ipfs: strconv.Atoi: parsing {:?}: invalid syntax
WalkFailed::UnparseableListIndex(_, ref segment) => {
write!(fmt, "Invalid list index: {:?}", segment)
}
// go-ipfs: array index out of range
WalkFailed::ListIndexOutOfRange(ref list, index) => write!(
fmt,
"List index out of range: the length is {} but the index is {}",
list.len(),
index
),
// go-ipfs: tried to resolve through object that had no links
WalkFailed::UnmatchableSegment(_, _) => {
write!(fmt, "Tried to resolve through object that had no links")
},
WalkFailed::UnsupportedWalkOnDagPbIpld => {
write!(fmt, "Tried to walk over dag-pb after converting to IPLD, use ipfs::unixfs::ll or similar directly.")
}
}
}
}
impl std::error::Error for WalkFailed {}
#[cfg(test)]
mod tests {
use super::WalkFailed;
use super::{resolve_segment, WalkSuccess};
use cid::Cid;
use ipfs::{ipld::Ipld, make_ipld, IpfsPath};
use std::convert::TryFrom;
// good_paths, good_but_unsupported, bad_paths from https://github.com/ipfs/go-path/blob/master/path_test.go
fn example_doc_and_a_cid() -> (Ipld, Cid) {
let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
let doc = make_ipld!({
"nested": {
"even": [
{
"more": 5
},
{
"or": "this",
},
{
"or": cid.clone(),
},
{
"5": "or",
}
],
}
});
(doc, cid)
}
#[test]
fn good_walks_on_ipld() {
let (example_doc, _) = example_doc_and_a_cid();
let good_examples = [
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/0/more",
Ipld::Integer(5),
),
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/1/or",
Ipld::from("this"),
),
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3/5",
Ipld::from("or"),
),
];
for (path, expected) in &good_examples {
let p = IpfsPath::try_from(*path).unwrap();
// not really the document cid but it doesn't matter; it just needs to be !dag-pb
//let doc_cid = p.take_root().unwrap();
// projection
assert_eq!(
walk(example_doc.clone(), &p).map(|r| r.0),
Ok(WalkSuccess::AtDestination(expected.clone()))
);
}
}
#[test]
fn walk_link_with_dot() {
let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
let doc = make_ipld!(cid.clone());
let path = "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/./foobar";
let p = IpfsPath::try_from(path).unwrap();
assert_eq!(
walk(doc, &p).map(|r| r.0),
Ok(WalkSuccess::Link(".".into(), cid))
);
}
#[test]
fn walk_link_without_dot_is_unsupported() {
let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
let doc = make_ipld!(cid);
let path = "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/foobar";
let p = IpfsPath::try_from(path).unwrap();
// go-ipfs would walk over the link even without a dot, this will probably come up with
// dag/get
walk(doc, &p).unwrap_err();
}
#[test]
fn good_walk_to_link() {
let (example_doc, cid) = example_doc_and_a_cid();
let path = "bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/something_on_the_next_block";
let p = IpfsPath::try_from(path).unwrap();
let (success, mut remaining) = walk(example_doc, &p).unwrap();
assert_eq!(success, WalkSuccess::Link("or".into(), cid));
assert_eq!(
remaining.next().map(|s| s.as_str()),
Some("something_on_the_next_block")
);
}
#[test]
fn walk_mismatches() {
let (example_doc, _) = example_doc_and_a_cid();
let mismatches = [
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/0/more",
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/-1",
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/1000",
];
for path in &mismatches {
let p = IpfsPath::try_from(*path).unwrap();
// let doc_cid = p.take_root().unwrap();
// using just unwrap_err as the context would be quite troublesome to write
walk(example_doc.clone(), &p).unwrap_err();
}
}
fn walk(
mut doc: Ipld,
path: &'_ IpfsPath,
) -> Result<
(
WalkSuccess,
impl Iterator<Item = &'_ String> + std::fmt::Debug,
),
WalkFailed,
> {
if path.path().is_empty() {
unreachable!("empty path");
}
let current = path.root().cid().unwrap();
if current.codec() == cid::Codec::DagProtobuf {
return Err(WalkFailed::UnsupportedWalkOnDagPbIpld);
}
let mut iter = path.path().iter();
loop {
let needle = if let Some(needle) = iter.next() {
needle
} else {
return Ok((WalkSuccess::AtDestination(doc), iter));
};
doc = match resolve_segment(needle, doc)? {
WalkSuccess::AtDestination(ipld) => ipld,
ret @ WalkSuccess::Link(_, _) => return Ok((ret, iter)),
};
}
}
}

View File

@ -1,17 +1,13 @@
use crate::v0::refs::{walk_path, IpfsPath};
use crate::v0::support::{
with_ipfs, MaybeTimeoutExt, StreamResponse, StringError, StringSerialized,
};
use async_stream::try_stream;
use bytes::Bytes;
use cid::{Cid, Codec};
use futures::stream::TryStream;
use ipfs::unixfs::ll::walk::{self, ContinuedWalk, Walker};
use ipfs::unixfs::{ll::file::FileReadFailed, TraversalFailed};
use ipfs::Block;
use ipfs::{Ipfs, IpfsTypes};
use ipfs::{dag::ResolveError, Block, Ipfs, IpfsPath, IpfsTypes};
use serde::Deserialize;
use std::convert::TryFrom;
use std::fmt;
use std::path::Path;
use warp::{query, Filter, Rejection, Reply};
@ -46,8 +42,7 @@ pub fn add<T: IpfsTypes>(
#[derive(Debug, Deserialize)]
pub struct CatArgs {
// this could be an ipfs path
arg: String,
arg: StringSerialized<IpfsPath>,
offset: Option<u64>,
length: Option<u64>,
timeout: Option<StringSerialized<humantime::Duration>>,
@ -60,36 +55,27 @@ pub fn cat<T: IpfsTypes>(
}
async fn cat_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: CatArgs) -> Result<impl Reply, Rejection> {
let path = IpfsPath::try_from(args.arg.as_str()).map_err(StringError::from)?;
let path = args.arg.into_inner();
let range = match (args.offset, args.length) {
(Some(start), Some(len)) => Some(start..(start + len)),
(Some(_start), None) => todo!("need to abstract over the range"),
(Some(_start), None) => return Err(crate::v0::support::NotImplemented.into()),
(None, Some(len)) => Some(0..len),
(None, None) => None,
};
// FIXME: this is here until we have IpfsPath back at ipfs
// FIXME: this timeout here is ... not great; the end user could be waiting for 2*timeout
let (cid, _, _) = walk_path(&ipfs, &Default::default(), path)
.maybe_timeout(args.timeout.clone().map(StringSerialized::into_inner))
.await
.map_err(StringError::from)?
.map_err(StringError::from)?;
if cid.codec() != Codec::DagProtobuf {
return Err(StringError::from("unknown node type").into());
}
// TODO: timeout for the whole stream!
let ret = ipfs::unixfs::cat(ipfs, cid, range)
let ret = ipfs::unixfs::cat(ipfs, path, range)
.maybe_timeout(args.timeout.map(StringSerialized::into_inner))
.await
.map_err(StringError::from)?;
let stream = match ret {
Ok(stream) => stream,
Err(TraversalFailed::Resolving(err @ ResolveError::NotFound(..))) => {
// this is checked in the tests
return Err(StringError::from(err).into());
}
Err(TraversalFailed::Walking(_, FileReadFailed::UnexpectedType(ut)))
if ut.is_directory() =>
{
@ -103,8 +89,7 @@ async fn cat_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: CatArgs) -> Result<impl Re
#[derive(Deserialize)]
struct GetArgs {
// this could be an ipfs path again
arg: String,
arg: StringSerialized<IpfsPath>,
timeout: Option<StringSerialized<humantime::Duration>>,
}
@ -117,26 +102,34 @@ pub fn get<T: IpfsTypes>(
async fn get_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: GetArgs) -> Result<impl Reply, Rejection> {
use futures::stream::TryStreamExt;
let path = IpfsPath::try_from(args.arg.as_str()).map_err(StringError::from)?;
let path = args.arg.into_inner();
// FIXME: this is here until we have IpfsPath back at ipfs
// FIXME: this timeout is only for the first step, should be for the whole walk!
let (cid, _, _) = walk_path(&ipfs, &Default::default(), path)
let block = resolve_dagpb(&ipfs, path)
.maybe_timeout(args.timeout.map(StringSerialized::into_inner))
.await
.map_err(StringError::from)?
.map_err(StringError::from)?;
if cid.codec() != Codec::DagProtobuf {
return Err(StringError::from("unknown node type").into());
}
Ok(StreamResponse(walk(ipfs, block).into_stream()))
}
Ok(StreamResponse(walk(ipfs, cid).into_stream()))
async fn resolve_dagpb<T: IpfsTypes>(ipfs: &Ipfs<T>, path: IpfsPath) -> Result<Block, StringError> {
let (resolved, _) = ipfs
.dag()
.resolve(path, true)
.await
.map_err(StringError::from)?;
resolved.into_unixfs_block().map_err(StringError::from)
}
fn walk<Types: IpfsTypes>(
ipfs: Ipfs<Types>,
root: Cid,
Block {
cid: root,
data: first_block_data,
}: Block,
) -> impl TryStream<Ok = Bytes, Error = GetError> + 'static {
let mut cache = None;
let mut tar_helper = TarHelper::with_capacity(16 * 1024);
@ -146,10 +139,18 @@ fn walk<Types: IpfsTypes>(
let name = root.to_string();
let mut walker = Walker::new(root, name);
let mut buffer = Some(first_block_data);
try_stream! {
while walker.should_continue() {
let (next, _) = walker.pending_links();
let Block { data, .. } = ipfs.get_block(next).await?;
let data = match buffer.take() {
Some(first) => first,
None => {
let (next, _) = walker.pending_links();
let Block { data, .. } = ipfs.get_block(next).await?;
data
}
};
match walker.next(&data, &mut cache)? {
ContinuedWalk::Bucket(..) => {}

View File

@ -1,11 +1,155 @@
use crate::error::Error;
use crate::ipld::{decode_ipld, encode_ipld, Ipld};
use crate::path::IpfsPath;
use crate::path::{IpfsPath, SlashedPath};
use crate::repo::RepoTypes;
use crate::Ipfs;
use bitswap::Block;
use cid::{Cid, Codec, Version};
use ipfs_unixfs::{
dagpb::{wrap_node_data, NodeData},
dir::{Cache, ShardedLookup},
resolve, MaybeResolved,
};
use std::convert::TryFrom;
use std::error::Error as StdError;
use std::iter::Peekable;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ResolveError {
/// Loading of the block on the path failed
#[error("block loading failed")]
Loading(Cid, #[source] crate::Error),
/// The document is unsupported; this can be a UnixFs directory structure which has unsupported
/// options, or IPLD parsing failed.
#[error("unsupported document")]
UnsupportedDocument(Cid, #[source] Box<dyn StdError + Send + Sync + 'static>),
/// Path contained an index which was out of range for the given [`Ipld::List`].
#[error("list index out of range 0..{elements}: {index}")]
ListIndexOutOfRange {
/// The document with the mismatched index
document: Cid,
/// The path up until the mismatched index
path: SlashedPath,
/// The index in original path
index: usize,
/// Total number of elements found
elements: usize,
},
/// Path attempted to resolve through e.g. a string or an integer.
#[error("tried to resolve through an object that had no links")]
NoLinks(Cid, SlashedPath),
/// Path attempted to resolve through a property, index or link which did not exist.
#[error("no link named {:?} under {0}", .1.iter().last().unwrap())]
NotFound(Cid, SlashedPath),
}
#[derive(Debug, Error)]
pub enum UnexpectedResolved {
#[error("path resolved to unexpected type of document: {:?} or {}", .0, .1.source())]
UnexpectedCodec(cid::Codec, ResolvedNode),
#[error("path did not resolve to a block on {}", .0.source())]
NonBlock(ResolvedNode),
}
/// Used internally before translating to ResolveError at the top level by using the IpfsPath.
#[derive(Debug)]
enum RawResolveLocalError {
Loading(Cid, crate::Error),
UnsupportedDocument(Cid, Box<dyn StdError + Send + Sync + 'static>),
ListIndexOutOfRange {
document: Cid,
segment_index: usize,
index: usize,
elements: usize,
},
InvalidIndex {
document: Cid,
segment_index: usize,
},
NoLinks {
document: Cid,
segment_index: usize,
},
NotFound {
document: Cid,
segment_index: usize,
},
}
impl RawResolveLocalError {
/// When resolving through multiple documents the local resolving functions `resolve_local_ipld`
/// and `resolve_local_dagpb` return local document indices; need to bump the indices with the
/// number of the already matched segments in the previous documents for the path.
fn add_starting_point_in_path(&mut self, start: usize) {
use RawResolveLocalError::*;
match self {
ListIndexOutOfRange {
ref mut segment_index,
..
}
| InvalidIndex {
ref mut segment_index,
..
}
| NoLinks {
ref mut segment_index,
..
}
| NotFound {
ref mut segment_index,
..
} => {
// NOTE: this is the **index** compared to the number of segments matched, i.e. **count**
// from `resolve_local`'s Ok return value.
*segment_index += start;
}
_ => {}
}
}
/// Use the given [`IpfsPath`] to create the truncated [`SlashedPath`] and convert into
/// [`ResolveError`]. The path is truncated so that the last segment is the one which failed to
/// match. No reason it couldn't also be signified with just an index.
fn with_path(self, path: IpfsPath) -> ResolveError {
use RawResolveLocalError::*;
match self {
// FIXME: I'd like to use Result<Result<_, ResolveError>, crate::Error> instead
Loading(cid, e) => ResolveError::Loading(cid, e),
UnsupportedDocument(cid, e) => ResolveError::UnsupportedDocument(cid, e),
ListIndexOutOfRange {
document,
segment_index,
index,
elements,
} => ResolveError::ListIndexOutOfRange {
document,
path: path.into_truncated(segment_index + 1),
index,
elements,
},
NoLinks {
document,
segment_index,
} => ResolveError::NoLinks(document, path.into_truncated(segment_index + 1)),
InvalidIndex {
document,
segment_index,
}
| NotFound {
document,
segment_index,
} => ResolveError::NotFound(document, path.into_truncated(segment_index + 1)),
}
}
}
/// `ipfs.dag` interface providing wrapper around Ipfs.
#[derive(Clone, Debug)]
pub struct IpldDag<Types: RepoTypes> {
ipfs: Ipfs<Types>,
@ -30,43 +174,400 @@ impl<Types: RepoTypes> IpldDag<Types> {
Ok(cid)
}
pub async fn get(&self, path: IpfsPath) -> Result<Ipld, Error> {
/// Resolves a `Cid`-rooted path to a document "node."
///
/// Returns the resolved node as `Ipld`.
pub async fn get(&self, path: IpfsPath) -> Result<Ipld, ResolveError> {
// FIXME: do ipns resolve first
let cid = match path.root().cid() {
Some(cid) => cid,
None => return Err(anyhow::anyhow!("expected cid")),
None => panic!("Ipns resolution not implemented; expected a Cid-based path"),
};
let mut ipld = decode_ipld(&cid, self.ipfs.repo.get_block(&cid).await?.data())?;
for sub_path in path.iter() {
ipld = try_resolve(ipld, sub_path)?;
ipld = match ipld {
Ipld::Link(cid) => decode_ipld(&cid, self.ipfs.repo.get_block(&cid).await?.data())?,
ipld => ipld,
let mut iter = path.iter().peekable();
let (node, _) = match self.resolve0(cid, &mut iter, true).await {
Ok(t) => t,
Err(e) => {
drop(iter);
return Err(e.with_path(path));
}
};
Ipld::try_from(node)
}
/// Resolves a `Cid`-rooted path to a document "node."
///
/// The return value has two kinds of meanings depending on whether links should be followed or
/// not: when following links, the second returned value will be the path inside the last document;
/// when not following links, the second returned value will be the unmatched or "remaining"
/// path.
///
/// Regardless of the `follow_links` option, HAMT-sharded directories will be resolved through
/// as a "single step" in the given IpfsPath.
///
/// Returns a node and the remaining path or the path inside the last document.
pub async fn resolve(
&self,
path: IpfsPath,
follow_links: bool,
) -> Result<(ResolvedNode, SlashedPath), ResolveError> {
// FIXME: do ipns resolve first
let cid = match path.root().cid() {
Some(cid) => cid,
None => panic!("Ipns resolution not implemented; expected a Cid-based path"),
};
let (node, matched_segments) = {
let mut iter = path.iter().peekable();
match self.resolve0(cid, &mut iter, follow_links).await {
Ok(t) => t,
Err(e) => {
drop(iter);
return Err(e.with_path(path));
}
}
};
// we only care about returning this remaining_path with segments up until the last
// document but it can and should contain all of the following segments (if any). there
// could be more segments when `!follow_links`.
let remaining_path = path.into_shifted(matched_segments);
Ok((node, remaining_path))
}
/// Return the node where the resolving ended, and the **count** of segments matched.
async fn resolve0<'a>(
&self,
cid: &Cid,
segments: &mut Peekable<impl Iterator<Item = &'a str>>,
follow_links: bool,
) -> Result<(ResolvedNode, usize), RawResolveLocalError> {
use LocallyResolved::*;
let mut current = cid.to_owned();
let mut total = 0;
let mut cache = None;
loop {
let block = match self.ipfs.repo.get_block(&current).await {
Ok(block) => block,
Err(e) => return Err(RawResolveLocalError::Loading(current, e)),
};
let start = total;
let (resolution, matched) = match resolve_local(block, segments, &mut cache) {
Ok(t) => t,
Err(mut e) => {
e.add_starting_point_in_path(start);
return Err(e);
}
};
total += matched;
let (src, dest) = match resolution {
Complete(ResolvedNode::Link(src, dest)) => (src, dest),
Incomplete(src, lookup) => match self.resolve_hamt(lookup, &mut cache).await {
Ok(dest) => (src, dest),
Err(e) => return Err(RawResolveLocalError::UnsupportedDocument(src, e.into())),
},
Complete(other) => {
// when following links we return the total of links matched before the
// returned document.
return Ok((other, start));
}
};
if !follow_links {
// when not following links we return the total of links matched
return Ok((ResolvedNode::Link(src, dest), total));
} else {
current = dest;
}
}
}
/// To resolve a segment through a HAMT-sharded directory we need to load more blocks, which is
/// why this is a method and not a free `fn` like the other resolving activities.
async fn resolve_hamt(
&self,
mut lookup: ShardedLookup<'_>,
cache: &mut Option<Cache>,
) -> Result<Cid, Error> {
use MaybeResolved::*;
loop {
let (next, _) = lookup.pending_links();
let block = self.ipfs.repo.get_block(next).await?;
match lookup.continue_walk(block.data(), cache)? {
NeedToLoadMore(next) => lookup = next,
Found(cid) => return Ok(cid),
NotFound => return Err(anyhow::anyhow!("key not found: ???")),
}
}
Ok(ipld)
}
}
fn try_resolve(ipld: Ipld, segment: &str) -> Result<Ipld, Error> {
match ipld {
Ipld::Map(mut map) => map
.remove(segment)
.ok_or_else(|| anyhow::anyhow!("no such segment: {:?}", segment)),
Ipld::List(mut vec) => match segment.parse::<usize>() {
Ok(index) if index < vec.len() => Ok(vec.swap_remove(index)),
Ok(_) => Err(anyhow::anyhow!(
"index out of range 0..{}: {:?}",
vec.len(),
segment
)),
Err(_) => Err(anyhow::anyhow!("invalid list index: {:?}", segment)),
},
link @ Ipld::Link(_) if segment == "." => Ok(link),
other => Err(anyhow::anyhow!(
"cannot resolve {:?} through: {:?}",
/// `IpfsPath`'s `Cid`-based variant can be resolved to the block, projections represented by this
/// type.
///
/// Values can be converted to Ipld using `Ipld::try_from`.
#[derive(Debug, PartialEq)]
pub enum ResolvedNode {
/// Block which was loaded at the end of the path.
Block(Block),
/// Path ended in `Data` at a dag-pb node. This is usually not interesting and should be
/// treated as a "Not found" error since dag-pb node did not have a *link* called `Data`. The variant
/// exists as there are interface-ipfs-http tests which require this behaviour.
DagPbData(Cid, NodeData<Box<[u8]>>),
/// Path ended on a !dag-pb document which was projected.
Projection(Cid, Ipld),
/// Local resolving ended with a link
Link(Cid, Cid),
}
impl ResolvedNode {
/// Returns the `Cid` of the **source** document for the encapsulated document or projection of such.
pub fn source(&self) -> &Cid {
match self {
ResolvedNode::Block(Block { cid, .. })
| ResolvedNode::DagPbData(cid, ..)
| ResolvedNode::Projection(cid, ..)
| ResolvedNode::Link(cid, ..) => cid,
}
}
/// Unwraps the dagpb block variant and turns others into UnexpectedResolved.
/// This is useful wherever unixfs operations are continued after resolving an IpfsPath.
pub fn into_unixfs_block(self) -> Result<Block, UnexpectedResolved> {
if self.source().codec() != cid::Codec::DagProtobuf {
Err(UnexpectedResolved::UnexpectedCodec(
cid::Codec::DagProtobuf,
self,
))
} else {
match self {
ResolvedNode::Block(b) => Ok(b),
_ => Err(UnexpectedResolved::NonBlock(self)),
}
}
}
}
impl TryFrom<ResolvedNode> for Ipld {
type Error = ResolveError;
fn try_from(r: ResolvedNode) -> Result<Ipld, Self::Error> {
use ResolvedNode::*;
match r {
Block(block) => Ok(decode_ipld(block.cid(), block.data())
.map_err(move |e| ResolveError::UnsupportedDocument(block.cid, e.into()))?),
DagPbData(_, node_data) => Ok(Ipld::Bytes(node_data.node_data().to_vec())),
Projection(_, ipld) => Ok(ipld),
Link(_, cid) => Ok(Ipld::Link(cid)),
}
}
}
/// Success variants for the `resolve_local` operation on an `Ipld` document.
#[derive(Debug)]
enum LocallyResolved<'a> {
/// Resolution completed.
Complete(ResolvedNode),
/// Resolving was attempted on a block which is a HAMT-sharded bucket, and needs to be
/// continued by loading other buckets.
Incomplete(Cid, ShardedLookup<'a>),
}
#[cfg(test)]
impl LocallyResolved<'_> {
fn unwrap_complete(self) -> ResolvedNode {
match self {
LocallyResolved::Complete(rn) => rn,
x => unreachable!("{:?}", x),
}
}
}
impl From<ResolvedNode> for LocallyResolved<'static> {
fn from(r: ResolvedNode) -> LocallyResolved<'static> {
LocallyResolved::Complete(r)
}
}
/// Resolves the given path segments locally or inside the given document; in addition to
/// `resolve_local_ipld` this fn also handles normal dag-pb and unixfs HAMTs.
fn resolve_local<'a>(
block: Block,
segments: &mut Peekable<impl Iterator<Item = &'a str>>,
cache: &mut Option<Cache>,
) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
if segments.peek().is_none() {
return Ok((LocallyResolved::Complete(ResolvedNode::Block(block)), 0));
}
let Block { cid, data } = block;
if cid.codec() == cid::Codec::DagProtobuf {
// special-case the dagpb since we need to do the HAMT lookup and going through the
// BTreeMaps of ipld for this is quite tiresome. if you are looking for that code for
// simple directories, you can find one in the history of ipfs-http.
// advancing is required here in order for us to determine if this was the last element.
// This should be ok as the only way we can continue resolving deeper is the case of Link
// being matched, and not the error or the DagPbData case.
let segment = segments.next().unwrap();
Ok(resolve_local_dagpb(
cid,
data,
segment,
other
)),
segments.peek().is_none(),
cache,
)?)
} else {
let ipld = match decode_ipld(&cid, &data) {
Ok(ipld) => ipld,
Err(e) => return Err(RawResolveLocalError::UnsupportedDocument(cid, e.into())),
};
resolve_local_ipld(cid, ipld, segments)
}
}
/// Resolving through dagpb documents is basically just mapping from [`MaybeResolved`] to the
/// return value, with the exception that a path ending in "Data" is returned as
/// `ResolvedNode::DagPbData`.
fn resolve_local_dagpb<'a>(
cid: Cid,
data: Box<[u8]>,
segment: &'a str,
is_last: bool,
cache: &mut Option<Cache>,
) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
match resolve(&data, segment, cache) {
Ok(MaybeResolved::NeedToLoadMore(lookup)) => {
Ok((LocallyResolved::Incomplete(cid, lookup), 0))
}
Ok(MaybeResolved::Found(dest)) => {
Ok((LocallyResolved::Complete(ResolvedNode::Link(cid, dest)), 1))
}
Ok(MaybeResolved::NotFound) => {
if segment == "Data" && is_last {
let wrapped = wrap_node_data(data).expect("already deserialized once");
return Ok((
LocallyResolved::Complete(ResolvedNode::DagPbData(cid, wrapped)),
1,
));
}
Err(RawResolveLocalError::NotFound {
document: cid,
segment_index: 0,
})
}
Err(ipfs_unixfs::ResolveError::UnexpectedType(ut)) if ut.is_file() => {
// this might even be correct: files we know are not supported, however not sure if
// symlinks are, let alone custom unxifs types should such exist
Err(RawResolveLocalError::NotFound {
document: cid,
segment_index: 0,
})
}
Err(e) => Err(RawResolveLocalError::UnsupportedDocument(cid, e.into())),
}
}
/// Resolves the given path segments locally or inside the given document. Resolving is terminated
/// upon reaching a link or exhausting the path.
///
/// Returns the number of path segments matched -- the iterator might be consumed more than it was
/// matched.
///
/// Note: Tried to initially work with this through Peekable but this would need two peeks.
///
/// # Limitations
///
/// Does not support dag-pb as segments are resolved differently on dag-pb than the general Ipld.
fn resolve_local_ipld<'a>(
document: Cid,
mut ipld: Ipld,
segments: &mut Peekable<impl Iterator<Item = &'a str>>,
) -> Result<(LocallyResolved<'a>, usize), RawResolveLocalError> {
let mut matched_count = 0;
loop {
ipld = match ipld {
Ipld::Link(cid) => {
if segments.peek() != Some(&".") {
// there is something other than dot next in the path, we should silently match
// over it.
return Ok((ResolvedNode::Link(document, cid).into(), matched_count));
} else {
Ipld::Link(cid)
}
}
ipld => ipld,
};
ipld = match (ipld, segments.next()) {
(Ipld::Link(cid), Some(".")) => {
return Ok((ResolvedNode::Link(document, cid).into(), matched_count + 1));
}
(Ipld::Link(_), Some(_)) => {
unreachable!("case already handled above before advancing the iterator")
}
(Ipld::Map(mut map), Some(segment)) => {
let found = match map.remove(segment) {
Some(f) => f,
None => {
return Err(RawResolveLocalError::NotFound {
document,
segment_index: matched_count,
})
}
};
matched_count += 1;
found
}
(Ipld::List(mut vec), Some(segment)) => match segment.parse::<usize>() {
Ok(index) if index < vec.len() => {
matched_count += 1;
vec.swap_remove(index)
}
Ok(index) => {
return Err(RawResolveLocalError::ListIndexOutOfRange {
document,
segment_index: matched_count,
index,
elements: vec.len(),
});
}
Err(_) => {
return Err(RawResolveLocalError::InvalidIndex {
document,
segment_index: matched_count,
})
}
},
(_, Some(_)) => {
return Err(RawResolveLocalError::NoLinks {
document,
segment_index: matched_count,
});
}
// path has been consumed
(anything, None) => {
return Ok((
ResolvedNode::Projection(document, anything).into(),
matched_count,
))
}
};
}
}
@ -140,4 +641,335 @@ mod tests {
.unwrap();
assert_eq!(res, make_ipld!(1));
}
/// Returns an example ipld document with strings, ints, maps, lists, and a link. The link target is also
/// returned.
fn example_doc_and_cid() -> (Cid, Ipld, Cid) {
let cid = Cid::try_from("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").unwrap();
let doc = make_ipld!({
"nested": {
"even": [
{
"more": 5
},
{
"or": "this",
},
{
"or": cid.clone(),
},
{
"5": "or",
}
],
}
});
let root =
Cid::try_from("bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita").unwrap();
(root, doc, cid)
}
#[test]
fn resolve_cbor_locally_to_end() {
let (root, example_doc, _) = example_doc_and_cid();
let good_examples = [
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/0/more",
Ipld::Integer(5),
),
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/1/or",
Ipld::from("this"),
),
(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3/5",
Ipld::from("or"),
),
];
for (path, expected) in &good_examples {
let p = IpfsPath::try_from(*path).unwrap();
let (resolved, matched_segments) = super::resolve_local_ipld(
root.clone(),
example_doc.clone(),
&mut p.iter().peekable(),
)
.unwrap();
assert_eq!(matched_segments, 4);
match resolved.unwrap_complete() {
ResolvedNode::Projection(_, p) if &p == expected => {}
x => unreachable!("unexpected {:?}", x),
}
let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
assert!(remaining_path.is_empty(), "{:?}", remaining_path);
}
}
#[test]
fn resolve_cbor_locally_to_link() {
let (root, example_doc, target) = example_doc_and_cid();
let p = IpfsPath::try_from(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/foobar/trailer"
// counts: 1 2 3 4
).unwrap();
let (resolved, matched_segments) =
super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap();
match resolved.unwrap_complete() {
ResolvedNode::Link(_, cid) if cid == target => {}
x => unreachable!("{:?}", x),
}
assert_eq!(matched_segments, 4);
let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
assert_eq!(remaining_path, &["foobar", "trailer"]);
}
#[test]
fn resolve_cbor_locally_to_link_with_dot() {
let (root, example_doc, cid) = example_doc_and_cid();
let p = IpfsPath::try_from(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/2/or/./foobar/trailer",
// counts: 1 2 3 4 5
)
.unwrap();
let (resolved, matched_segments) =
super::resolve_local_ipld(root.clone(), example_doc, &mut p.iter().peekable()).unwrap();
assert_eq!(resolved.unwrap_complete(), ResolvedNode::Link(root, cid));
assert_eq!(matched_segments, 5);
let remaining_path = p.iter().skip(matched_segments).collect::<Vec<&str>>();
assert_eq!(remaining_path, &["foobar", "trailer"]);
}
#[test]
fn resolve_cbor_locally_not_found_map_key() {
let (root, example_doc, _) = example_doc_and_cid();
let p = IpfsPath::try_from(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/foobar/trailer",
)
.unwrap();
let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
assert!(
matches!(
e,
RawResolveLocalError::NotFound {
segment_index: 0,
..
}
),
"{:?}",
e
);
}
#[test]
fn resolve_cbor_locally_too_large_list_index() {
let (root, example_doc, _) = example_doc_and_cid();
let p = IpfsPath::try_from(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/3000",
)
.unwrap();
let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
assert!(
matches!(
e,
RawResolveLocalError::ListIndexOutOfRange {
segment_index: 2,
index: 3000,
elements: 4,
..
}
),
"{:?}",
e
);
}
#[test]
fn resolve_cbor_locally_non_usize_index() {
let (root, example_doc, _) = example_doc_and_cid();
let p = IpfsPath::try_from(
"bafyreielwgy762ox5ndmhx6kpi6go6il3gzahz3ngagb7xw3bj3aazeita/nested/even/-1",
)
.unwrap();
// FIXME: errors, again the number of matched
let e = super::resolve_local_ipld(root, example_doc, &mut p.iter().peekable()).unwrap_err();
assert!(
matches!(
e,
RawResolveLocalError::InvalidIndex {
segment_index: 2,
..
}
),
"{:?}",
e
);
}
#[tokio::test(max_threads = 1)]
async fn resolve_through_link() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let ipld = make_ipld!([1]);
let cid1 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let ipld = make_ipld!([cid1]);
let cid2 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let prefix = IpfsPath::from(cid2);
// the two should be equal, as dot can appear or not appear
// FIXME: validate that go-ipfs still does this
let equiv_paths = vec![
prefix.sub_path("0/0").unwrap(),
prefix.into_sub_path("0/./0").unwrap(),
];
for p in equiv_paths {
let cloned = p.clone();
match dag.resolve(p, true).await.unwrap() {
(ResolvedNode::Projection(_, Ipld::Integer(1)), remaining_path) => {
assert_eq!(remaining_path, ["0"][..], "{}", cloned);
}
x => unreachable!("{:?}", x),
}
}
}
#[tokio::test(max_threads = 1)]
async fn fail_resolving_first_segment() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let ipld = make_ipld!([1]);
let cid1 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let ipld = make_ipld!({ "0": cid1 });
let cid2 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let path = IpfsPath::from(cid2.clone()).sub_path("1/a").unwrap();
//let cloned = path.clone();
let e = dag.resolve(path, true).await.unwrap_err();
assert_eq!(e.to_string(), format!("no link named \"1\" under {}", cid2));
}
#[tokio::test(max_threads = 1)]
async fn fail_resolving_last_segment() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let dag = IpldDag::new(ipfs);
let ipld = make_ipld!([1]);
let cid1 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let ipld = make_ipld!([cid1.clone()]);
let cid2 = dag.put(ipld, Codec::DagCBOR).await.unwrap();
let path = IpfsPath::from(cid2).sub_path("0/a").unwrap();
//let cloned = path.clone();
let e = dag.resolve(path, true).await.unwrap_err();
assert_eq!(e.to_string(), format!("no link named \"a\" under {}", cid1));
}
#[tokio::test(max_threads = 1)]
async fn fail_resolving_through_file() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let mut adder = ipfs_unixfs::file::adder::FileAdder::default();
let (mut blocks, _) = adder.push(b"foobar\n");
assert_eq!(blocks.next(), None);
let mut blocks = adder.finish();
let (cid, data) = blocks.next().unwrap();
assert_eq!(blocks.next(), None);
ipfs.put_block(Block {
cid: cid.clone(),
data: data.into(),
})
.await
.unwrap();
let path = IpfsPath::from(cid.clone())
.sub_path("anything-here")
.unwrap();
let e = ipfs.dag().resolve(path, true).await.unwrap_err();
assert_eq!(
e.to_string(),
format!("no link named \"anything-here\" under {}", cid)
);
}
#[tokio::test(max_threads = 1)]
async fn fail_resolving_through_dir() {
let Node { ipfs, bg_task: _bt } = Node::new("test_node").await;
let mut adder = ipfs_unixfs::file::adder::FileAdder::default();
let (mut blocks, _) = adder.push(b"foobar\n");
assert_eq!(blocks.next(), None);
let mut blocks = adder.finish();
let (cid, data) = blocks.next().unwrap();
assert_eq!(blocks.next(), None);
let total_size = data.len();
ipfs.put_block(Block {
cid: cid.clone(),
data: data.into(),
})
.await
.unwrap();
let mut opts = ipfs_unixfs::dir::builder::TreeOptions::default();
opts.wrap_with_directory();
let mut tree = ipfs_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
tree.put_link("something/best-file-in-the-world", cid, total_size as u64)
.unwrap();
let mut iter = tree.build();
let mut cids = Vec::new();
while let Some(node) = iter.next_borrowed() {
let node = node.unwrap();
let block = Block {
cid: node.cid.to_owned(),
data: node.block.into(),
};
ipfs.put_block(block).await.unwrap();
cids.push(node.cid.to_owned());
}
// reverse the cids because they now contain the root cid as the last.
cids.reverse();
let path = IpfsPath::from(cids[0].to_owned())
.sub_path("something/second-best-file")
.unwrap();
let e = ipfs.dag().resolve(path, true).await.unwrap_err();
assert_eq!(
e.to_string(),
format!("no link named \"second-best-file\" under {}", cids[1])
);
}
}

View File

@ -37,7 +37,7 @@ use std::sync::{atomic::Ordering, Arc};
use std::task::{Context, Poll};
mod config;
mod dag;
pub mod dag;
pub mod error;
#[macro_use]
pub mod ipld;
@ -360,7 +360,7 @@ impl<Types: IpfsTypes> std::ops::Deref for Ipfs<Types> {
}
impl<Types: IpfsTypes> Ipfs<Types> {
fn dag(&self) -> IpldDag<Types> {
pub fn dag(&self) -> IpldDag<Types> {
IpldDag::new(self.clone())
}
@ -419,7 +419,11 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// Gets an ipld dag node from the ipfs repo.
pub async fn get_dag(&self, path: IpfsPath) -> Result<Ipld, Error> {
self.dag().get(path).instrument(self.span.clone()).await
self.dag()
.get(path)
.instrument(self.span.clone())
.await
.map_err(Error::new)
}
/// Creates a stream which will yield the bytes of an UnixFS file from the root Cid, with the
@ -429,13 +433,15 @@ impl<Types: IpfsTypes> Ipfs<Types> {
/// To create an owned version of the stream, please use `ipfs::unixfs::cat` directly.
pub async fn cat_unixfs(
&self,
cid: Cid,
starting_point: impl Into<unixfs::StartingPoint>,
range: Option<Range<u64>>,
) -> Result<
impl Stream<Item = Result<Vec<u8>, unixfs::TraversalFailed>> + Send + '_,
unixfs::TraversalFailed,
> {
unixfs::cat(self, cid, range)
// convert early not to worry about the lifetime of parameter
let starting_point = starting_point.into();
unixfs::cat(self, starting_point, range)
.instrument(self.span.clone())
.await
}

View File

@ -7,10 +7,13 @@ use std::fmt;
use std::str::FromStr;
use thiserror::Error;
// TODO: it might be useful to split this into CidPath and IpnsPath, then have Ipns resolve through
// latter into CidPath (recursively) and have dag.rs support only CidPath. Keep IpfsPath as a
// common abstraction which can be either.
#[derive(Clone, Debug, PartialEq)]
pub struct IpfsPath {
root: PathRoot,
path: Vec<String>,
pub(crate) path: SlashedPath,
}
impl FromStr for IpfsPath {
@ -42,7 +45,8 @@ impl FromStr for IpfsPath {
};
let mut path = IpfsPath::new(root);
path.push_split(subpath)
path.path
.push_split(subpath)
.map_err(|_| IpfsPathError::InvalidPath(string.to_owned()))?;
Ok(path)
}
@ -52,7 +56,7 @@ impl IpfsPath {
pub fn new(root: PathRoot) -> Self {
IpfsPath {
root,
path: Vec::new(),
path: Default::default(),
}
}
@ -65,28 +69,7 @@ impl IpfsPath {
}
pub fn push_str(&mut self, string: &str) -> Result<(), Error> {
if string.is_empty() {
return Ok(());
}
self.push_split(string.split('/'))
.map_err(|_| IpfsPathError::InvalidPath(string.to_owned()).into())
}
fn push_split<'a>(&mut self, split: impl Iterator<Item = &'a str>) -> Result<(), ()> {
let mut split = split.peekable();
while let Some(sub_path) = split.next() {
if sub_path == "" {
return if split.peek().is_none() {
// trim trailing
Ok(())
} else {
// no empty segments in the middle
Err(())
};
}
self.path.push(sub_path.to_owned());
}
self.path.push_path(string)?;
Ok(())
}
@ -101,20 +84,35 @@ impl IpfsPath {
Ok(self)
}
pub fn iter(&self) -> impl Iterator<Item = &String> {
self.path.iter()
/// Returns an iterator over the path segments following the root
pub fn iter(&self) -> impl Iterator<Item = &str> {
self.path.iter().map(|s| s.as_str())
}
pub fn path(&self) -> &[String] {
&self.path
pub(crate) fn into_shifted(self, shifted: usize) -> SlashedPath {
assert!(shifted <= self.path.len());
let mut p = self.path;
p.shift(shifted);
p
}
pub(crate) fn into_truncated(self, len: usize) -> SlashedPath {
assert!(len <= self.path.len());
let mut p = self.path;
p.truncate(len);
p
}
}
impl fmt::Display for IpfsPath {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.root)?;
for sub_path in &self.path {
write!(fmt, "/{}", sub_path)?;
if !self.path.is_empty() {
// slash is not included in the <SlashedPath as fmt::Display>::fmt impl as we need to,
// serialize it later in json *without* one
write!(fmt, "/{}", self.path)?;
}
Ok(())
}
@ -134,6 +132,7 @@ impl<T: Into<PathRoot>> From<T> for IpfsPath {
}
}
// FIXME: get rid of this; it would mean that there must be a clone to retain the rest of the path.
impl TryInto<Cid> for IpfsPath {
type Error = Error;
@ -145,6 +144,7 @@ impl TryInto<Cid> for IpfsPath {
}
}
// FIXME: get rid of this; it would mean that there must be a clone to retain the rest of the path.
impl TryInto<PeerId> for IpfsPath {
type Error = Error;
@ -156,6 +156,91 @@ impl TryInto<PeerId> for IpfsPath {
}
}
/// SlashedPath is internal to IpfsPath variants, and basically holds a unixfs-compatible path
/// where segments do not contain slashes but can pretty much contain all other valid UTF-8.
///
/// UTF-8 originates likely from UnixFS related protobuf descriptions, where dag-pb links have
/// UTF-8 names, which equal to SlashedPath segments.
#[derive(Debug, PartialEq, Eq, Clone, Default)]
pub struct SlashedPath {
path: Vec<String>,
}
impl SlashedPath {
fn push_path(&mut self, path: &str) -> Result<(), IpfsPathError> {
if path.is_empty() {
Ok(())
} else {
self.push_split(path.split('/'))
.map_err(|_| IpfsPathError::InvalidPath(path.to_owned()))
}
}
pub(crate) fn push_split<'a>(
&mut self,
split: impl Iterator<Item = &'a str>,
) -> Result<(), ()> {
let mut split = split.peekable();
while let Some(sub_path) = split.next() {
if sub_path == "" {
return if split.peek().is_none() {
// trim trailing
Ok(())
} else {
// no empty segments in the middle
Err(())
};
}
self.path.push(sub_path.to_owned());
}
Ok(())
}
pub fn iter(&self) -> impl Iterator<Item = &String> {
self.path.iter()
}
pub fn len(&self) -> usize {
// intentionally try to hide the fact that this is based on Vec<String> right now
self.path.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn shift(&mut self, n: usize) {
self.path.drain(0..n);
}
fn truncate(&mut self, len: usize) {
self.path.truncate(len);
}
}
impl fmt::Display for SlashedPath {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut first = true;
self.path.iter().try_for_each(move |s| {
if first {
first = false;
} else {
write!(fmt, "/")?;
}
write!(fmt, "{}", s)
})
}
}
impl<'a> PartialEq<[&'a str]> for SlashedPath {
fn eq(&self, other: &[&'a str]) -> bool {
// FIXME: failed at writing a blanket partialeq over anything which would PartialEq<str> or
// String
self.path.iter().zip(other.iter()).all(|(a, b)| a == b)
}
}
#[derive(Clone, PartialEq)]
pub enum PathRoot {
Ipld(Cid),
@ -324,6 +409,39 @@ mod tests {
assert_eq!(path.to_string(), res);
}*/
#[test]
fn display() {
let input = [
(
"/ipld/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n",
Some("/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
),
("/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n", None),
(
"/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n/a",
None,
),
(
"/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n/a/",
Some("/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n/a"),
),
(
"QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n",
Some("/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
),
("/ipns/foobar.com", None),
("/ipns/foobar.com/a", None),
("/ipns/foobar.com/a/", Some("/ipns/foobar.com/a")),
];
for (input, maybe_actual) in &input {
assert_eq!(
IpfsPath::try_from(*input).unwrap().to_string(),
maybe_actual.unwrap_or(input)
);
}
}
#[test]
fn good_paths() {
let good = [
@ -353,7 +471,7 @@ mod tests {
for &(good, len) in &good {
let p = IpfsPath::try_from(good).unwrap();
assert_eq!(p.path().len(), len);
assert_eq!(p.iter().count(), len);
}
}
@ -385,7 +503,7 @@ mod tests {
];
for &path in &paths {
let p = IpfsPath::try_from(path).unwrap();
assert_eq!(p.path().len(), 0, "{:?} from {:?}", p, path);
assert_eq!(p.iter().count(), 0, "{:?} from {:?}", p, path);
}
}
@ -394,4 +512,13 @@ mod tests {
// this used to be the behaviour in ipfs-http
IpfsPath::try_from("/ipfs/QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n///a").unwrap_err();
}
#[test]
fn shifting() {
let mut p = super::SlashedPath::default();
p.push_split(vec!["a", "b", "c"].into_iter()).unwrap();
p.shift(2);
assert_eq!(p.to_string(), "c");
}
}

View File

@ -1,11 +1,14 @@
use crate::{Error, Ipfs, IpfsTypes};
use crate::{
dag::{ResolveError, UnexpectedResolved},
Error, Ipfs, IpfsTypes,
};
use async_stream::stream;
use bitswap::Block;
use cid::Cid;
use core::borrow::Borrow;
use core::fmt;
use core::ops::Range;
use futures::stream::Stream;
use ipfs_unixfs::file::{visit::IdleFileVisit, FileReadFailed};
use std::borrow::Borrow;
use std::ops::Range;
/// IPFS cat operation, producing a stream of file bytes. This is generic over the different kinds
/// of ways to own an `Ipfs` value in order to support both operating with borrowed `Ipfs` value
@ -15,15 +18,13 @@ use ipfs_unixfs::file::{visit::IdleFileVisit, FileReadFailed};
/// Returns a stream of bytes on the file pointed with the Cid.
pub async fn cat<'a, Types, MaybeOwned>(
ipfs: MaybeOwned,
cid: Cid,
starting_point: impl Into<StartingPoint>,
range: Option<Range<u64>>,
) -> Result<impl Stream<Item = Result<Vec<u8>, TraversalFailed>> + Send + 'a, TraversalFailed>
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
{
use bitswap::Block;
let mut visit = IdleFileVisit::default();
if let Some(range) = range {
visit = visit.with_target_range(range);
@ -31,12 +32,19 @@ where
// Get the root block to start the traversal. The stream does not expose any of the file
// metadata. To get to it the user needs to create a Visitor over the first block.
let borrow = ipfs.borrow();
let Block { cid, data } = match borrow.get_block(&cid).await {
Ok(block) => block,
Err(e) => {
return Err(TraversalFailed::Loading(cid, e));
let Block { cid, data } = match starting_point.into() {
StartingPoint::Left(path) => {
let borrow = ipfs.borrow();
let dag = borrow.dag();
let (resolved, _) = dag
.resolve(path, true)
.await
.map_err(TraversalFailed::Resolving)?;
resolved
.into_unixfs_block()
.map_err(TraversalFailed::Path)?
}
StartingPoint::Right(block) => block,
};
let mut cache = None;
@ -111,35 +119,42 @@ where
})
}
/// The starting point for unixfs walks. Can be converted from IpfsPath and Blocks, and Cids can be
/// converted to IpfsPath.
pub enum StartingPoint {
Left(crate::IpfsPath),
Right(Block),
}
impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
fn from(a: T) -> Self {
Self::Left(a.into())
}
}
impl From<Block> for StartingPoint {
fn from(b: Block) -> Self {
Self::Right(b)
}
}
/// Types of failures which can occur while walking the UnixFS graph.
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum TraversalFailed {
/// Failure to load the block
Loading(Cid, Error),
/// Failure to resolve the given path; does not happen when given a block.
#[error("path resolving failed")]
Resolving(#[source] ResolveError),
/// The given path was resolved to non dag-pb block, does not happen when starting the walk
/// from a block.
#[error("path resolved to unexpected")]
Path(#[source] UnexpectedResolved),
/// Loading of a block during walk failed
#[error("loading of {} failed", .0)]
Loading(Cid, #[source] Error),
/// Processing of the block failed
Walking(Cid, FileReadFailed),
}
impl fmt::Display for TraversalFailed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use TraversalFailed::*;
match self {
Loading(cid, e) => write!(fmt, "loading of {} failed: {}", cid, e),
Walking(cid, e) => write!(fmt, "failed to walk {}: {}", cid, e),
}
}
}
impl std::error::Error for TraversalFailed {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use TraversalFailed::*;
match self {
Loading(_, _) => {
// FIXME: anyhow::Error cannot be given out as source.
None
}
Walking(_, e) => Some(e),
}
}
#[error("walk failed on {}", .0)]
Walking(Cid, #[source] FileReadFailed),
}

View File

@ -1,7 +1,7 @@
pub use ipfs_unixfs as ll;
mod cat;
pub use cat::{cat, TraversalFailed};
pub use cat::{cat, StartingPoint, TraversalFailed};
#[cfg(test)]
mod tests {

View File

@ -3,6 +3,8 @@
use crate::pb::PBNode;
use alloc::borrow::Cow;
use core::convert::TryFrom;
use core::fmt;
use core::ops::Range;
/// Extracts the PBNode::Data field from the block as it appears on the block.
pub fn node_data(block: &[u8]) -> Result<Option<&[u8]>, quick_protobuf::Error> {
@ -13,3 +15,125 @@ pub fn node_data(block: &[u8]) -> Result<Option<&[u8]>, quick_protobuf::Error> {
None => None,
})
}
/// Creates a wrapper around the given block representation which does not consume the block
/// representation but allows accessing the dag-pb node Data.
pub fn wrap_node_data<T>(block: T) -> Result<NodeData<T>, quick_protobuf::Error>
where
T: AsRef<[u8]>,
{
let full = block.as_ref();
let range = node_data(full)?
.map(|data| subslice_to_range(full, data).expect("this has to be in range"));
Ok(NodeData {
inner: block,
range,
})
}
fn subslice_to_range(full: &[u8], sub: &[u8]) -> Option<Range<usize>> {
// note this doesn't work for all types, for example () or similar ZSTs.
let max = full.len();
let amt = sub.len();
if max < amt {
// if the latter slice is larger than the first one, surely it isn't a subslice.
return None;
}
let full = full.as_ptr() as usize;
let sub = sub.as_ptr() as usize;
sub.checked_sub(full)
// not needed as it would divide by one: .map(|diff| diff / mem::size_of::<T>())
//
// if there are two slices of a continious chunk, [A|B] we need to make sure B will not be
// calculated as subslice of A
.and_then(|start| if start >= max { None } else { Some(start) })
.map(|start| start..(start + amt))
}
/// The wrapper returned from [`wrap_node_data`], allows accessing dag-pb nodes Data.
#[derive(PartialEq)]
pub struct NodeData<T> {
inner: T,
range: Option<Range<usize>>,
}
impl<T: AsRef<[u8]>> fmt::Debug for NodeData<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"NodeData<{}> {{ inner.len: {}, range: {:?} }}",
std::any::type_name::<T>(),
self.inner.as_ref().len(),
self.range
)
}
}
impl<T: AsRef<[u8]>> NodeData<T> {
/// Returns the dag-pb nodes Data field as slice
pub fn node_data(&self) -> &[u8] {
if let Some(range) = self.range.as_ref() {
&self.inner.as_ref()[range.clone()]
} else {
&[][..]
}
}
/// Returns access to the wrapped block representation
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Consumes self and returns the block representation
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: AsRef<[u8]>, B: AsRef<[u8]>> PartialEq<B> for NodeData<T> {
fn eq(&self, other: &B) -> bool {
self.node_data() == other.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::subslice_to_range;
#[test]
fn subslice_ranges() {
let full = &b"01234"[..];
for start in 0..(full.len() - 1) {
for end in start..(full.len() - 1) {
let sub = &full[start..end];
assert_eq!(subslice_to_range(full, sub), Some(start..end));
}
}
}
#[test]
fn not_in_following_subslice() {
// this could be done with two distinct/disjoint 'static slices but there might not be any
// guarantees it working in all rust released and unreleased versions, and with different
// linkers.
let full = &b"0123456789"[..];
let a = &full[0..4];
let b = &full[4..];
let a_sub = &a[1..3];
let b_sub = &b[0..2];
assert_eq!(subslice_to_range(a, a_sub), Some(1..3));
assert_eq!(subslice_to_range(b, b_sub), Some(0..2));
assert_eq!(subslice_to_range(a, b_sub), None);
assert_eq!(subslice_to_range(b, a_sub), None);
}
}

View File

@ -98,8 +98,7 @@ pub enum MaybeResolved<'needle> {
/// cases, there can be unexpected directories.
#[derive(Debug)]
pub enum ResolveError {
/// The target block was not a directory, hamt shard, or was a dag-pb node whose data could not be
/// parsed as UnixFS.
/// The target block was a UnixFs node that doesn't support resolving, e.g. a file.
UnexpectedType(UnexpectedNodeType),
/// A directory had unsupported properties. These are not encountered during walking sharded
/// directories.