Merge pull request #189 from eqlabs/feat_initial_get

feat: ipfs-unixfs get or "walk over anything"
This commit is contained in:
Joonas Koivunen 2020-06-17 17:33:58 +03:00 committed by GitHub
commit f5bad262ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2780 additions and 244 deletions

View File

@ -80,8 +80,8 @@ jobs:
if: matrix.platform.cross == false
run: cargo build --workspace
- name: npm install
run: npm install
- name: Setup conformance tests
run: ./setup.sh
working-directory: ./conformance
- name: Symlink executable

29
Cargo.lock generated
View File

@ -895,6 +895,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "filetime"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "affc17579b132fc2461adf7c575cc6e8b134ebca52c51f5411388965227dc695"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"winapi 0.3.8",
]
[[package]]
name = "fixedbitset"
version = "0.2.0"
@ -1401,9 +1413,11 @@ version = "0.1.0"
dependencies = [
"async-stream",
"base64 0.12.0",
"bytes 0.5.4",
"env_logger",
"futures 0.3.4",
"hex",
"hex-literal",
"ipfs",
"libipld 0.1.0 (git+https://github.com/ipfs-rust/rust-ipld?rev=b2286c53c13f3eeec2a3766387f2926838e8e4c9)",
"log 0.4.8",
@ -1417,6 +1431,7 @@ dependencies = [
"serde",
"serde_json",
"structopt",
"tar",
"thiserror",
"tokio 0.2.16",
"url",
@ -1429,11 +1444,14 @@ name = "ipfs-unixfs"
version = "0.1.0"
dependencies = [
"cid",
"either",
"filetime",
"hex-literal",
"multibase",
"multihash 0.10.1",
"quick-protobuf",
"sha2",
"tar",
]
[[package]]
@ -3421,6 +3439,17 @@ dependencies = [
"unicode-xid 0.2.0",
]
[[package]]
name = "tar"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c058ad0bd6ccb84faa24cc44d4fc99bee8a5d7ba9ff33aa4d993122d1aeeac2"
dependencies = [
"filetime",
"libc",
"redox_syscall",
]
[[package]]
name = "tempfile"
version = "3.1.0"

View File

@ -31,6 +31,9 @@ warp = "0.2.2"
async-stream = "0.2.1"
pin-project = "0.4.8"
url = "2.1.1"
tar = { version = "0.4.28", default-features = false }
bytes = "0.5.4"
[dev-dependencies]
hex = "0.4.2"
hex-literal = "0.2.1"

View File

@ -70,8 +70,8 @@ pub fn routes<T: IpfsTypes>(
dag::put(ipfs),
dag::resolve(ipfs),
warp::path!("dht" / ..).and_then(not_implemented),
warp::path!("get").and_then(not_implemented),
root_files::cat(ipfs),
root_files::get(ipfs),
warp::path!("key" / ..).and_then(not_implemented),
warp::path!("name" / ..).and_then(not_implemented),
warp::path!("object" / ..).and_then(not_implemented),

View File

@ -51,12 +51,18 @@ async fn refs_inner<T: IpfsTypes>(
formatter
);
let paths = opts
let mut paths = opts
.arg
.iter()
.map(|s| IpfsPath::try_from(s.as_str()).map_err(StringError::from))
.collect::<Result<Vec<_>, _>>()?;
for path in paths.iter_mut() {
// this is needed because the paths should not error on matching on the final Data segment,
// it just becomes projected as `Loaded::Raw(_)`, however such items can have no links.
path.set_follow_dagpb_data(true);
}
let st = refs_paths(ipfs, paths, max_depth, opts.unique)
.await
.map_err(|e| {
@ -241,7 +247,7 @@ pub async fn walk_path<T: IpfsTypes>(
ipfs: &Ipfs<T>,
mut path: IpfsPath,
) -> Result<(Cid, Loaded, Vec<String>), WalkError> {
use ipfs::unixfs::ll::MaybeResolved;
use ipfs::unixfs::ll::{MaybeResolved, ResolveError};
let mut current = path.take_root().unwrap();
@ -280,6 +286,13 @@ pub async fn walk_path<T: IpfsTypes>(
Ok(MaybeResolved::NotFound) => {
return handle_dagpb_not_found(current, &data, needle, &path)
}
Err(ResolveError::UnexpectedType(_)) => {
// the conformance tests use a path which would end up going through a file
// and the returned error string is tested against listed alternatives.
// unexpected type is not one of them.
let e = WalkFailed::from(path::WalkFailed::UnmatchedNamedLink(needle));
return Err(WalkError::from((e, current)));
}
Err(e) => return Err(WalkError::from((WalkFailed::from(e), current))),
};
@ -360,10 +373,12 @@ fn handle_dagpb_not_found(
needle: String,
path: &IpfsPath,
) -> Result<(Cid, Loaded, Vec<String>), WalkError> {
if needle == "Data" && path.len() == 0 {
// /dag/resolve needs to "resolve through" a dag-pb node down to the "just
// data" even though we do not need to extract it ... however this might be
// good to just filter with refs, as no refs of such path exist
use ipfs::unixfs::ll::dagpb::node_data;
if needle == "Data" && path.len() == 0 && path.follow_dagpb_data() {
// /dag/resolve needs to "resolve through" a dag-pb node down to the "just data" even
// though we do not need to extract it ... however this might be good to just filter with
// refs, as no refs of such path can exist as the links are in the outer structure.
//
// testing with go-ipfs 0.5 reveals that dag resolve only follows links
// which are actually present in the dag-pb, not numeric links like Links/5
@ -372,7 +387,7 @@ fn handle_dagpb_not_found(
// comment on this special casing: there cannot be any other such
// special case as the Links do not work like Data so while this is not
// pretty, it's not terrible.
let data = ipfs::unixfs::ll::dagpb::node_data(&data)
let data = node_data(&data)
.expect("already parsed once, second time cannot fail")
.unwrap_or_default();
Ok((at, Loaded::Ipld(Ipld::Bytes(data.to_vec())), vec![needle]))

View File

@ -34,6 +34,9 @@ pub struct IpfsPath {
/// Option to support moving the cid
root: Option<Cid>,
path: std::vec::IntoIter<String>,
/// True by default, to allow "finding" `Data` under dag-pb node
/// TODO: document why this matters
follow_dagpb_data: bool,
}
impl From<Cid> for IpfsPath {
@ -43,6 +46,7 @@ impl From<Cid> for IpfsPath {
IpfsPath {
root: Some(root),
path: Vec::new().into_iter(),
follow_dagpb_data: true,
}
}
}
@ -90,7 +94,13 @@ impl TryFrom<&str> for IpfsPath {
let root = Some(Cid::try_from(root).map_err(PathError::InvalidCid)?);
Ok(IpfsPath { root, path })
let follow_dagpb_data = true;
Ok(IpfsPath {
root,
path,
follow_dagpb_data,
})
}
}
@ -99,6 +109,14 @@ impl IpfsPath {
self.root.take()
}
pub fn set_follow_dagpb_data(&mut self, follow: bool) {
self.follow_dagpb_data = follow;
}
pub fn follow_dagpb_data(&self) -> bool {
self.follow_dagpb_data
}
pub fn resolve(&mut self, ipld: Ipld) -> Result<WalkSuccess, WalkFailed> {
let key = match self.next() {
Some(key) => key,

View File

@ -1,11 +1,23 @@
use crate::v0::refs::{walk_path, IpfsPath};
use crate::v0::support::unshared::Unshared;
use crate::v0::support::{with_ipfs, StreamResponse, StringError};
use async_stream::try_stream;
use bytes::Bytes;
use futures::stream::TryStream;
use ipfs::unixfs::ll::walk::{self, ContinuedWalk, Walker};
use ipfs::unixfs::{ll::file::FileReadFailed, TraversalFailed};
use ipfs::Block;
use ipfs::{Ipfs, IpfsTypes};
use libipld::cid::Codec;
use libipld::cid::{Cid, Codec};
use serde::Deserialize;
use std::convert::TryFrom;
use std::fmt;
use std::path::Path;
use warp::{path, query, Filter, Rejection, Reply};
mod tar_helper;
use tar_helper::TarHelper;
#[derive(Debug, Deserialize)]
pub struct CatArgs {
// this could be an ipfs path
@ -25,10 +37,8 @@ pub fn cat<T: IpfsTypes>(
}
async fn cat_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: CatArgs) -> Result<impl Reply, Rejection> {
use crate::v0::refs::{walk_path, IpfsPath};
use ipfs::unixfs::{ll::file::FileReadFailed, TraversalFailed};
let path = IpfsPath::try_from(args.arg.as_str()).map_err(StringError::from)?;
let mut path = IpfsPath::try_from(args.arg.as_str()).map_err(StringError::from)?;
path.set_follow_dagpb_data(false);
let range = match (args.offset, args.length) {
(Some(start), Some(len)) => Some(start..(start + len)),
@ -58,3 +68,373 @@ async fn cat_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: CatArgs) -> Result<impl Re
Ok(StreamResponse(Unshared::new(stream)))
}
#[derive(Deserialize)]
struct GetArgs {
// this could be an ipfs path again
arg: String,
}
pub fn get<T: IpfsTypes>(
ipfs: &Ipfs<T>,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
path!("get")
.and(with_ipfs(ipfs))
.and(query::<GetArgs>())
.and_then(get_inner)
}
async fn get_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: GetArgs) -> Result<impl Reply, Rejection> {
use futures::stream::TryStreamExt;
let mut path = IpfsPath::try_from(args.arg.as_str()).map_err(StringError::from)?;
path.set_follow_dagpb_data(false);
// FIXME: this is here until we have IpfsPath back at ipfs
let (cid, _, _) = walk_path(&ipfs, path).await.map_err(StringError::from)?;
if cid.codec() != Codec::DagProtobuf {
return Err(StringError::from("unknown node type").into());
}
Ok(StreamResponse(Unshared::new(walk(ipfs, cid).into_stream())))
}
fn walk<Types: IpfsTypes>(
ipfs: Ipfs<Types>,
root: Cid,
) -> impl TryStream<Ok = Bytes, Error = GetError> + 'static {
let mut cache = None;
let mut tar_helper = TarHelper::with_capacity(16 * 1024);
// the HTTP api uses the final Cid name as the root name in the generated tar
// archive.
let name = root.to_string();
let mut visit: Option<Walker> = Some(Walker::new(root, name));
try_stream! {
while let Some(walker) = visit {
let (next, _) = walker.pending_links();
let Block { data, .. } = ipfs.get_block(next).await?;
visit = match walker.continue_walk(&data, &mut cache)? {
ContinuedWalk::File(segment, item) => {
let total_size = item.as_entry()
.total_file_size()
.expect("files do have total_size");
if segment.is_first() {
let path = item.as_entry().path();
let metadata = item
.as_entry()
.metadata()
.expect("files must have metadata");
for mut bytes in tar_helper.apply_file(path, metadata, total_size)?.iter_mut() {
if let Some(bytes) = bytes.take() {
yield bytes;
}
}
}
// even if the largest of files can have 256 kB blocks and about the same
// amount of content, try to consume it in small parts not to grow the buffers
// too much.
let mut n = 0usize;
let slice = segment.as_ref();
let total = slice.len();
while n < total {
let next = tar_helper.buffer_file_contents(&slice[n..]);
n += next.len();
yield next;
}
if segment.is_last() {
if let Some(zeroes) = tar_helper.pad(total_size) {
yield zeroes;
}
}
item.into_inner()
},
ContinuedWalk::Directory(item) => {
// only first instances of directories will have the metadata
if let Some(metadata) = item.as_entry().metadata() {
let path = item.as_entry().path();
for mut bytes in tar_helper.apply_directory(path, metadata)?.iter_mut() {
if let Some(bytes) = bytes.take() {
yield bytes;
}
}
}
item.into_inner()
},
ContinuedWalk::Symlink(bytes, item) => {
// converting a symlink is the most tricky part
let path = item.as_entry().path();
let target = std::str::from_utf8(bytes).map_err(|_| GetError::NonUtf8Symlink)?;
let target = Path::new(target);
let metadata = item.as_entry().metadata().expect("symlink must have metadata");
for mut bytes in tar_helper.apply_symlink(path, target, metadata)?.iter_mut() {
if let Some(bytes) = bytes.take() {
yield bytes;
}
}
item.into_inner()
},
};
}
}
}
#[derive(Debug)]
enum GetError {
NonUtf8Symlink,
InvalidFileName(Vec<u8>),
InvalidLinkName(Vec<u8>),
Walk(walk::Error),
Loading(ipfs::Error),
}
impl From<ipfs::Error> for GetError {
fn from(e: ipfs::Error) -> Self {
GetError::Loading(e)
}
}
impl From<walk::Error> for GetError {
fn from(e: walk::Error) -> Self {
GetError::Walk(e)
}
}
impl fmt::Display for GetError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use GetError::*;
match self {
NonUtf8Symlink => write!(fmt, "symlink target could not be converted to utf-8"),
Walk(e) => write!(fmt, "{}", e),
Loading(e) => write!(fmt, "loading failed: {}", e),
InvalidFileName(x) => write!(fmt, "filename cannot be put inside tar: {:?}", x),
InvalidLinkName(x) => write!(fmt, "symlink name cannot be put inside tar: {:?}", x),
}
}
}
impl std::error::Error for GetError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
GetError::Walk(e) => Some(e),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use futures::stream::{FuturesOrdered, TryStreamExt};
use hex_literal::hex;
use ipfs::{Block, Ipfs, IpfsTypes};
use libipld::cid::Cid;
use multihash::Sha2_256;
use std::convert::TryFrom;
use std::path::PathBuf;
// Entry we'll use in expectations
#[derive(Debug, PartialEq, Eq)]
enum Entry {
Dir(PathBuf),
File(PathBuf, u64, Vec<u8>),
Symlink(PathBuf, PathBuf),
}
impl<'a, R: std::io::Read> TryFrom<tar::Entry<'a, R>> for Entry {
type Error = std::io::Error;
fn try_from(mut entry: tar::Entry<'a, R>) -> Result<Self, Self::Error> {
let header = entry.header();
let entry = match header.entry_type() {
tar::EntryType::Directory => Entry::Dir(entry.path()?.into()),
tar::EntryType::Regular => {
let path = entry.path()?.into();
let size = header.size()?;
// writing to file is the only supported way to get the contents
let mut temp_file = std::env::temp_dir();
temp_file.push("temporary_file_for_testing.txt");
entry.unpack(&temp_file)?;
let bytes = std::fs::read(&temp_file);
// regardless of read success let's prefer deleting the file
std::fs::remove_file(&temp_file)?;
// and only later check if the read succeeded
Entry::File(path, size, bytes?)
}
tar::EntryType::Symlink => Entry::Symlink(
entry.path()?.into(),
entry.link_name()?.as_deref().unwrap().into(),
),
x => unreachable!("{:?}", x),
};
Ok(entry)
}
}
#[tokio::test]
async fn very_long_file_and_symlink_names() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
.await
.unwrap();
let blocks: &[&[u8]] = &[
// the root, QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD
&hex!("122f0a22122043217b633fbc72e90938d6dc43a34fcf8fe64e5c0d4f5d4b92a691e5a010c40912063337736f6d6518d90912aa020a2212208b346f875acc01ee2d277d2eadc00d42420a7a441a84247d5eb00e76e43e1fb8128102394634326133353338313761366265386231646561303130666138353663326664646236653131366338346561376132393766346637353163613462336461333637646435633638356566313931376232366334333961663765333032626538396163326432663936313132663363386631303262663766373531613836323738316630396536306536313831646333353565343466373664356162366662346463363265346438373532336538356663613765323965386463386333646438646336376133366239623761336134373031366138323761646336353839386635663930666161623632313861643139396234383161363031623433393261355f18280a1d0805121480000000000000000000000000800000000000002822308002"),
&hex!("12310a221220e2f1caec3161a8950e02ebcdfb3a9879a132041c48fc76fd003b7bcde1f68f08120845436e657374656418fd080a270805121e1000000000000000000000000000000000000000000000000000000000002822308002"),
&hex!("122e0a221220c710935f96b0b2fb670af9792570c560152ce4a60e7e6995605ebaccb84427371205324164697218bc080a0f080512060400000000002822308002"),
&hex!("12340a22122098e02ca8786af6b4895b66cfe5af6755b0ab0374c9c44b6ae62a6441cd399254120b324668696572617263687918f5070a0f080512068000000000002822308002"),
&hex!("12310a2212203a25dbf4a767b451a4c2601807f623f5e86e3758ba26a465c0a8675885537c55120833466c6f6e67657218af070a110805120880000000000000002822308002"),
&hex!("122e0a221220c16baf1b090a996a990e49136a205aad4e86da6279b20a7fabaf8b087f209f4812054634616e6418d5060a280805121f100000000000000000000000000000000000000000000000000000000000002822308002"),
&hex!("12310a2212206c28a669b54bb96b80a821162d1050e6c883f72020214c3a2b80afec9a27861e120833466c6f6e676572188f060a110805120880000000000000002822308002"),
&hex!("122e0a221220f4819507773b17e79ad51a0a56a1e2bfb58b880d8b81fc440fd932c29a6fce8e12054634616e6418b5050a280805121f100000000000000000000000000000000000000000000000000000000000002822308002"),
&hex!("12310a22122049ead3b060f85b80149d3bee5210b705618e13c40e4884f5a3153bf2f0aee535120833466c6f6e67657218ef040a110805120880000000000000002822308002"),
&hex!("12ab020a22122068080292b22f2ed1958079a9bcfdfa9affac9a092c141bda94d496af4b1712f8128102394634326133353338313761366265386231646561303130666138353663326664646236653131366338346561376132393766346637353163613462336461333637646435633638356566313931376232366334333961663765333032626538396163326432663936313132663363386631303262663766373531613836323738316630396536306536313831646333353565343466373664356162366662346463363265346438373532336538356663613765323965386463386333646438646336376133366239623761336134373031366138323761646336353839386635663930666161623632313861643139396234383161363031623433393261355f18a2020a1d0805121480000000000000000000000000000000000000002822308002"),
&hex!("0a9f020804129a022e2e2f2e2e2f2e2e2f2e2e2f2e2e2f2e2e2f2e2e2f2e2e2f2e2e2f34326133353338313761366265386231646561303130666138353663326664646236653131366338346561376132393766346637353163613462336461333637646435633638356566313931376232366334333961663765333032626538396163326432663936313132663363386631303262663766373531613836323738316630396536306536313831646333353565343466373664356162366662346463363265346438373532336538356663613765323965386463386333646438646336376133366239623761336134373031366138323761646336353839386635663930666161623632313861643139396234383161363031623433393261355f"),
&hex!("0a260802122077656c6c2068656c6c6f207468657265206c6f6e672066696c656e616d65730a1820"),
];
drop(put_all_blocks(&ipfs, &blocks).await.unwrap());
let filter = super::get(&ipfs);
let response = warp::test::request()
.method("POST")
.path("/get?arg=QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD")
.reply(&filter)
.await;
assert_eq!(response.status(), 200);
let found = get_archive_entries(response.body());
let long_filename = "42a353817a6be8b1dea010fa856c2fddb6e116c84ea7a297f4f751ca4b3da367dd5c685ef1917b26c439af7e302be89ac2d2f96112f3c8f102bf7f751a862781f09e60e6181dc355e44f76d5ab6fb4dc62e4d87523e85fca7e29e8dc8c3dd8dc67a36b9b7a3a47016a827adc65898f5f90faab6218ad199b481a601b4392a5_";
let expected = vec![
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer/and".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer/and/longer".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer/and/longer/and".into()),
Entry::Dir("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer/and/longer/and/longer".into()),
Entry::Symlink(
format!("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/some/nested/dir/hierarchy/longer/and/longer/and/longer/{}", long_filename).into(),
format!("../../../../../../../../../{}", long_filename).into()),
Entry::File(format!("QmdKuCuXDuVTsnGpzPgZEuJmiCEn6LZhGHHHwWPQH28DeD/{}", long_filename).into(), 32, b"well hello there long filenames\n".to_vec()),
];
assert_eq!(found, expected);
}
#[tokio::test]
async fn get_multiblock_file() {
let options = ipfs::IpfsOptions::inmemory_with_generated_keys(false);
let (ipfs, _) = ipfs::UninitializedIpfs::new(options)
.await
.start()
.await
.unwrap();
let blocks: &[&[u8]] = &[
// the root, QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6
&hex!("12280a221220fef9fe1804942b35e19e145a03f9c9d5ca9c997dda0a9416f3f515a52f1b3ce11200180a12280a221220dfb94b75acb208fd4873d84872af58bd65c731770a7d4c0deeb4088e87390bfe1200180a12280a221220054497ae4e89812c83276a48e3e679013a788b7c0eb02712df15095c02d6cd2c1200180a12280a221220cc332ceb37dea7d3d7c00d1393117638d3ed963575836c6d44a24951e444cf5d120018090a0c080218072002200220022001"),
// first bytes: fo
&hex!("0a0808021202666f1802"),
// ob
&hex!("0a08080212026f621802"),
// ar
&hex!("0a080802120261721802"),
// \n
&hex!("0a07080212010a1801"),
];
drop(put_all_blocks(&ipfs, &blocks).await.unwrap());
let filter = super::get(&ipfs);
let response = warp::test::request()
.method("POST")
.path("/get?arg=QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6")
.reply(&filter)
.await;
assert_eq!(response.status(), 200);
let found = get_archive_entries(response.body());
let expected = vec![Entry::File(
"QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6".into(),
7,
b"foobar\n".to_vec(),
)];
assert_eq!(found, expected);
}
fn get_archive_entries(bytes: impl AsRef<[u8]>) -> Vec<Entry> {
let mut cursor = std::io::Cursor::new(bytes.as_ref());
let mut archive = tar::Archive::new(&mut cursor);
archive
.entries()
.and_then(|entries| {
entries
.map(|res| res.and_then(Entry::try_from))
.collect::<Result<Vec<Entry>, _>>()
})
.unwrap()
}
fn put_all_blocks<'a, T: IpfsTypes>(
ipfs: &'a Ipfs<T>,
blocks: &'a [&'a [u8]],
) -> impl std::future::Future<Output = Result<Vec<Cid>, ipfs::Error>> + 'a {
let mut inorder = FuturesOrdered::new();
for block in blocks {
inorder.push(put_block(&ipfs, block));
}
inorder.try_collect::<Vec<_>>()
}
fn put_block<'a, T: IpfsTypes>(
ipfs: &'a Ipfs<T>,
block: &'a [u8],
) -> impl std::future::Future<Output = Result<Cid, ipfs::Error>> + 'a {
let cid = Cid::new_v0(Sha2_256::digest(block)).unwrap();
let block = Block {
cid,
data: block.into(),
};
ipfs.put_block(block)
}
}

View File

@ -0,0 +1,332 @@
///! Tar helper is internal to `/get` implementation. It uses some private parts of the `tar-rs`
///! crate to provide a `BytesMut` writing implementation instead of one using `std::io` interfaces.
///!
///! Code was originally taken and modified from the dependency version of `tar-rs`. The most
///! important copied parts are related to the long file name and long link name support. Issue
///! will be opened on the `tar-rs` to discuss if these could be made public, leaving us only the
///! `Bytes` (copying) code.
use super::GetError;
use bytes::{buf::BufMut, Bytes, BytesMut};
use ipfs::unixfs::ll::Metadata;
use std::borrow::Cow;
use std::path::{Path, PathBuf};
use tar::{EntryType, Header};
/// Tar helper is internal to `get` implementation. It uses some private parts of the `tar-rs`
/// crate to append the headers and the contents to a pair of `bytes::Bytes` operated in a
/// round-robin fashion.
pub(super) struct TarHelper {
bufsize: usize,
bytes: BytesMut,
header: Header,
long_filename_header: Header,
zeroes: Bytes,
}
impl TarHelper {
pub(super) fn with_capacity(n: usize) -> Self {
let bytes = BytesMut::with_capacity(n);
// these are 512 a piece
let header = Self::new_default_header();
let long_filename_header = Self::new_long_filename_header();
let mut zeroes = BytesMut::with_capacity(512);
for _ in 0..(512 / 8) {
zeroes.put_u64(0);
}
assert_eq!(zeroes.len(), 512);
let zeroes = zeroes.freeze();
Self {
bufsize: n,
bytes,
header,
long_filename_header,
zeroes,
}
}
fn new_default_header() -> tar::Header {
let mut header = tar::Header::new_gnu();
header.set_mtime(0);
header.set_uid(0);
header.set_gid(0);
header
}
fn new_long_filename_header() -> tar::Header {
let mut long_filename_header = tar::Header::new_gnu();
long_filename_header.set_mode(0o644);
{
let name = b"././@LongLink";
let gnu_header = long_filename_header.as_gnu_mut().unwrap();
// since we are reusing the header, zero out all of the bytes
let written = name
.iter()
.copied()
.chain(std::iter::repeat(0))
.enumerate()
.take(gnu_header.name.len());
// FIXME: could revert back to the slice copying code since we never change this
for (i, b) in written {
gnu_header.name[i] = b;
}
}
long_filename_header.set_mtime(0);
long_filename_header.set_uid(0);
long_filename_header.set_gid(0);
long_filename_header
}
pub(super) fn apply_file(
&mut self,
path: &Path,
metadata: &Metadata,
total_size: u64,
) -> Result<[Option<Bytes>; 3], GetError> {
let mut ret: [Option<Bytes>; 3] = Default::default();
if let Err(e) = self.header.set_path(path) {
let data =
prepare_long_header(&mut self.header, &mut self.long_filename_header, path, e)?;
self.bytes.put_slice(self.long_filename_header.as_bytes());
self.bytes.put_slice(data);
self.bytes.put_u8(0);
ret[0] = Some(self.bytes.split().freeze());
ret[1] = self.pad(data.len() as u64 + 1);
}
self.header.set_size(total_size);
self.header.set_entry_type(EntryType::Regular);
Self::set_metadata(&mut self.header, metadata, 0o0644);
self.header.set_cksum();
self.bytes.put_slice(self.header.as_bytes());
ret[2] = Some(self.bytes.split().freeze());
Ok(ret)
}
pub(super) fn buffer_file_contents(&mut self, contents: &[u8]) -> Bytes {
assert!(!contents.is_empty());
let remaining = contents.len();
let taken = self.bufsize.min(remaining);
// was initially thinking to check the capacity but we are round robining the buffers to
// get a lucky chance at either of them being empty at this point
self.bytes.put_slice(&contents[..taken]);
self.bytes.split().freeze()
}
pub(super) fn apply_directory(
&mut self,
path: &Path,
metadata: &Metadata,
) -> Result<[Option<Bytes>; 3], GetError> {
let mut ret: [Option<Bytes>; 3] = Default::default();
if let Err(e) = self.header.set_path(path) {
let data =
prepare_long_header(&mut self.header, &mut self.long_filename_header, path, e)?;
self.bytes.put_slice(self.long_filename_header.as_bytes());
self.bytes.put_slice(data);
self.bytes.put_u8(0);
ret[0] = Some(self.bytes.split().freeze());
ret[1] = self.pad(data.len() as u64 + 1);
}
self.header.set_size(0);
self.header.set_entry_type(EntryType::Directory);
Self::set_metadata(&mut self.header, metadata, 0o0755);
self.header.set_cksum();
self.bytes.put_slice(self.header.as_bytes());
ret[2] = Some(self.bytes.split().freeze());
Ok(ret)
}
pub(super) fn apply_symlink(
&mut self,
path: &Path,
target: &Path,
metadata: &Metadata,
) -> Result<[Option<Bytes>; 5], GetError> {
let mut ret: [Option<Bytes>; 5] = Default::default();
if let Err(e) = self.header.set_path(path) {
let data =
prepare_long_header(&mut self.header, &mut self.long_filename_header, path, e)?;
self.bytes.put_slice(self.long_filename_header.as_bytes());
self.bytes.put_slice(data);
self.bytes.put_u8(0);
ret[0] = Some(self.bytes.split().freeze());
ret[1] = self.pad(data.len() as u64 + 1);
}
if self.header.set_link_name(target).is_err() {
let data = path2bytes(target);
if data.len() < self.header.as_old().linkname.len() {
return Err(GetError::InvalidLinkName(data.to_vec()));
}
// this is another long header trick, but this time we have a different entry type and
// similarly the long file name is written as a separate entry with its own headers.
self.long_filename_header.set_size(data.len() as u64 + 1);
self.long_filename_header
.set_entry_type(tar::EntryType::new(b'K'));
self.long_filename_header.set_cksum();
self.bytes.put_slice(self.long_filename_header.as_bytes());
self.bytes.put_slice(data);
self.bytes.put_u8(0);
ret[2] = Some(self.bytes.split().freeze());
ret[3] = self.pad(data.len() as u64 + 1);
}
Self::set_metadata(&mut self.header, metadata, 0o0644);
self.header.set_size(0);
self.header.set_entry_type(tar::EntryType::Symlink);
self.header.set_cksum();
self.bytes.put_slice(self.header.as_bytes());
ret[4] = Some(self.bytes.split().freeze());
Ok(ret)
}
/// Content in tar is padded to 512 byte sectors which might be configurable as well.
pub(super) fn pad(&self, total_size: u64) -> Option<Bytes> {
let padding = 512 - (total_size % 512);
if padding < 512 {
Some(self.zeroes.slice(..padding as usize))
} else {
None
}
}
fn set_metadata(header: &mut tar::Header, metadata: &Metadata, default_mode: u32) {
header.set_mode(
metadata
.mode()
.map(|mode| mode & 0o7777)
.unwrap_or(default_mode),
);
header.set_mtime(
metadata
.mtime()
.and_then(|(seconds, _)| {
if seconds >= 0 {
Some(seconds as u64)
} else {
None
}
})
.unwrap_or(0),
);
}
}
/// Returns the raw bytes we need to write as a new entry into the tar.
fn prepare_long_header<'a>(
header: &mut tar::Header,
long_filename_header: &mut tar::Header,
path: &'a Path,
_error: std::io::Error,
) -> Result<&'a [u8], GetError> {
#[cfg(unix)]
/// On unix this operation can never fail.
pub(super) fn bytes2path(bytes: Cow<[u8]>) -> std::io::Result<Cow<Path>> {
use std::ffi::{OsStr, OsString};
use std::os::unix::prelude::*;
Ok(match bytes {
Cow::Borrowed(bytes) => Cow::Borrowed(Path::new(OsStr::from_bytes(bytes))),
Cow::Owned(bytes) => Cow::Owned(PathBuf::from(OsString::from_vec(bytes))),
})
}
#[cfg(windows)]
/// On windows we cannot accept non-Unicode bytes because it
/// is impossible to convert it to UTF-16.
pub(super) fn bytes2path(bytes: Cow<[u8]>) -> std::io::Result<Cow<Path>> {
match bytes {
Cow::Borrowed(bytes) => {
let s = std::str::from_utf8(bytes).map_err(|_| not_unicode(bytes))?;
Ok(Cow::Borrowed(Path::new(s)))
}
Cow::Owned(bytes) => {
let s = String::from_utf8(bytes).map_err(|uerr| not_unicode(&uerr.into_bytes()))?;
Ok(Cow::Owned(PathBuf::from(s)))
}
}
}
// Used with windows.
#[allow(dead_code)]
fn not_unicode(v: &[u8]) -> std::io::Error {
use std::io::{Error, ErrorKind};
Error::new(
ErrorKind::Other,
format!(
"only Unicode paths are supported on Windows: {}",
String::from_utf8_lossy(v)
),
)
}
// we **only** have utf8 paths as protobuf has already parsed this file
// name and all of the previous ones as utf8.
let data = path2bytes(path);
let max = header.as_old().name.len();
if data.len() < max {
return Err(GetError::InvalidFileName(data.to_vec()));
}
// the plus one is documented as compliance with GNU tar, probably the null byte
// termination?
long_filename_header.set_size(data.len() as u64 + 1);
long_filename_header.set_entry_type(tar::EntryType::new(b'L'));
long_filename_header.set_cksum();
// we still need to figure out the truncated path we put into the header
let path = bytes2path(Cow::Borrowed(&data[..max]))
.expect("quite certain we have no non-utf8 paths here");
header
.set_path(&path)
.expect("we already made sure the path is of fitting length");
Ok(data)
}
#[cfg(unix)]
fn path2bytes(p: &Path) -> &[u8] {
use std::os::unix::prelude::*;
p.as_os_str().as_bytes()
}
#[cfg(windows)]
fn path2bytes(p: &Path) -> &[u8] {
p.as_os_str()
.to_str()
.expect("we should only have unicode compatible bytes even on windows")
.as_bytes()
}

View File

@ -39,10 +39,11 @@ where
}
};
let mut cache = None;
// Start the visit from the root block. We need to move the both components as Options into the
// stream as we can't yet return them from this Future context.
let (visit, bytes) = match visit.start(&data) {
Ok((bytes, _, visit)) => {
Ok((bytes, _, _, visit)) => {
let bytes = if !bytes.is_empty() {
Some(bytes.to_vec())
} else {
@ -56,6 +57,9 @@ where
}
};
// FIXME: we could use the above file_size to set the content-length ... but calculating it
// with the ranges is not ... trivial?
// using async_stream here at least to get on faster; writing custom streams is not too easy
// but this might be easy enough to write open.
Ok(stream! {
@ -86,7 +90,7 @@ where
},
};
match visit.continue_walk(&data) {
match visit.continue_walk(&data, &mut cache) {
Ok((bytes, next_visit)) => {
if !bytes.is_empty() {
// TODO: manual implementation could allow returning just the slice

View File

@ -16,6 +16,8 @@ pub use ipfs_unixfs as ll;
mod cat;
pub use cat::{cat, TraversalFailed};
// No get provided at least as of now.
pub struct File {
data: Vec<u8>,
}

View File

@ -5,13 +5,19 @@ authors = ["Joonas Koivunen <joonas@equilibrium.co>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = [ "filetime" ]
[dependencies]
quick-protobuf = "0.7.0"
cid = "0.4.0"
filetime = { version = "0.2.10", optional = true }
either = ">=1.5.3"
[dev-dependencies]
multibase = "0.8.0"
multihash = "0.10.1"
sha2 = "0.8.1"
hex-literal = "0.2.1"
# we don't need the xattr and we didn't actually need any filetime either
tar = { version = "0.4.28", default-features = false }

View File

@ -9,5 +9,17 @@ Goals:
Status:
* first iteration of file reader has been implemented
* first iteration of resolving IpfsPath segments through directories has been implemented
* first iteration of resolving IpfsPath segments through directories has been
implemented
* as the HAMTShard structure is not fully understood, all buckets are
searched, however the API is expected to remain the same even if more
efficient lookup is implemented
* first iteration of `/get`-like tree walking implemented
* creation and alteration of dags has not been implemented
Usage:
* The main entry point to walking anything unixfs should be `ipfs_unixfs::walk::Walker`
* The main entry point to resolving links under dag-pb or unixfs should be `ipfs_unixfs::resolve`
* There is a `ipfs_unixfs::file::visit::FileVisit` utility but it should be
considered superceded by `ipfs_unixfs::walk::Walker`

View File

@ -73,7 +73,7 @@ fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(u64, u64), Error> {
read_bytes += blocks.as_file(&start.to_bytes())?.read_to_end(&mut buf)? as u64;
// First step of the walk can give content or continued visitation but not both.
let (content, _metadata, mut step) = IdleFileVisit::default().start(&buf)?;
let (content, _, _metadata, mut step) = IdleFileVisit::default().start(&buf)?;
stdout.write_all(content)?;
content_bytes += content.len() as u64;
@ -88,9 +88,9 @@ fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(u64, u64), Error> {
read_bytes += blocks.as_file(&first.to_bytes())?.read_to_end(&mut buf)? as u64;
// Similar to first step, except we no longer get the file metadata. It is still accessible
// from the `visit` via `AsRef<ipfs_unixfs::file::FileMetadata>` but likely only needed in
// from the `visit` via `AsRef<ipfs_unixfs::file::Metadata>` but likely only needed in
// the first step.
let (content, next_step) = visit.continue_walk(&buf)?;
let (content, next_step) = visit.continue_walk(&buf, &mut None)?;
stdout.write_all(content)?;
content_bytes += content.len() as u64;

193
unixfs/examples/get.rs Normal file
View File

@ -0,0 +1,193 @@
use cid::Cid;
use std::convert::TryFrom;
use std::fmt;
use std::io::{Error as IoError, Read};
use std::path::{Path, PathBuf};
fn main() {
let cid = match std::env::args().nth(1).map(Cid::try_from) {
Some(Ok(cid)) => cid,
Some(Err(e)) => {
eprintln!("Invalid cid given as argument: {}", e);
std::process::exit(1);
}
None => {
eprintln!("USAGE: {} CID\n", std::env::args().next().unwrap());
eprintln!(
"Will walk the unixfs file pointed out by the CID from default go-ipfs 0.5 \
configuration flatfs blockstore and write listing to stdout."
);
std::process::exit(0);
}
};
let ipfs_path = match std::env::var("IPFS_PATH") {
Ok(s) => s,
Err(e) => {
eprintln!("IPFS_PATH is not set or could not be read: {}", e);
std::process::exit(1);
}
};
let mut blocks = PathBuf::from(ipfs_path);
blocks.push("blocks");
let blockstore = ShardedBlockStore { root: blocks };
match walk(blockstore, &cid) {
Ok(()) => {}
Err(Error::OpeningFailed(e)) => {
eprintln!("{}\n", e);
eprintln!("This is likely caused by either:");
eprintln!(" - ipfs does not have the block");
eprintln!(" - ipfs is configured to use non-flatfs storage");
eprintln!(" - ipfs is configured to use flatfs with different sharding");
std::process::exit(1);
}
Err(e) => {
eprintln!("Failed to walk the merkle tree: {}", e);
std::process::exit(1);
}
}
}
fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(), Error> {
use ipfs_unixfs::walk::{ContinuedWalk, Walker};
let mut buf = Vec::new();
let mut cache = None;
let mut visit = Some(Walker::new(start.to_owned(), String::new()));
while let Some(walker) = visit {
buf.clear();
// Note: if you bind the pending or the "prefetchable", it must be dropped before the next
// call to continue_walk.
let (next, _) = walker.pending_links();
blocks.as_file(&next.to_bytes())?.read_to_end(&mut buf)?;
// FIXME: unwraps required below come from the fact that we cannot provide an uniform
// interface for all of the nodes. we could start moving Metadata to the ContinuedWalk in
// the future to have it matched immediatedly, or have an "Item" for different types of
// items.
visit = match walker.continue_walk(&buf, &mut cache)? {
ContinuedWalk::File(segment, item) => {
let entry = item.as_entry();
let total_size = entry.total_file_size().expect("all files have total size");
// metadata is picked up from the root file and carried until the last block
let metadata = entry.metadata().expect("all files have metadata");
if segment.is_first() {
// this is set on the root block, no actual bytes are present for multiblock
// files
}
if segment.is_last() {
let path = entry.path();
let mode = metadata.mode().unwrap_or(0o0644) & 0o7777;
let (seconds, _) = metadata.mtime().unwrap_or((0, 0));
println!("f {:o} {:>12} {:>16} {:?}", mode, seconds, total_size, path);
}
// continue the walk
item.into_inner()
}
ContinuedWalk::Directory(item) => {
// presense of metadata can be used to determine if this is the first apperiance of
// a directory by looking at the metadata: sibling hamt shard buckets do not have
// metadata.
if let Some(metadata) = item.as_entry().metadata() {
let path = item.as_entry().path();
let mode = metadata.mode().unwrap_or(0o0755) & 0o7777;
let (seconds, _) = metadata.mtime().unwrap_or((0, 0));
println!("d {:o} {:>12} {:>16} {:?}", mode, seconds, "-", path);
}
item.into_inner()
}
ContinuedWalk::Symlink(bytes, item) => {
let entry = item.as_entry();
let metadata = entry.metadata().expect("symlink must have metadata");
let path = entry.path();
let target = Path::new(std::str::from_utf8(bytes).unwrap());
let mode = metadata.mode().unwrap_or(0o0755) & 0o7777;
let (seconds, _) = metadata.mtime().unwrap_or((0, 0));
println!(
"s {:o} {:>12} {:>16} {:?} -> {:?}",
mode, seconds, "-", path, target
);
item.into_inner()
}
};
}
Ok(())
}
enum Error {
OpeningFailed(IoError),
Other(IoError),
Walk(ipfs_unixfs::walk::Error),
}
impl From<IoError> for Error {
fn from(e: IoError) -> Error {
Error::Other(e)
}
}
impl From<ipfs_unixfs::walk::Error> for Error {
fn from(e: ipfs_unixfs::walk::Error) -> Error {
Error::Walk(e)
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use Error::*;
match self {
OpeningFailed(e) => write!(fmt, "Failed to open file: {}", e),
Other(e) => write!(fmt, "A file-related IO error: {}", e),
Walk(e) => write!(fmt, "Walk failed, please report this as a bug: {}", e),
}
}
}
struct ShardedBlockStore {
root: PathBuf,
}
impl ShardedBlockStore {
fn as_path(&self, key: &[u8]) -> PathBuf {
// assume that we have a block store with second-to-last/2 sharding
// files in Base32Upper
let encoded = multibase::Base::Base32Upper.encode(key);
let len = encoded.len();
// this is safe because base32 is ascii
let dir = &encoded[(len - 3)..(len - 1)];
assert_eq!(dir.len(), 2);
let mut path = self.root.clone();
path.push(dir);
path.push(encoded);
path.set_extension("data");
path
}
fn as_file(&self, key: &[u8]) -> Result<std::fs::File, Error> {
let path = self.as_path(key);
std::fs::OpenOptions::new()
.read(true)
.open(path)
.map_err(Error::OpeningFailed)
}
}

291
unixfs/examples/resolve.rs Normal file
View File

@ -0,0 +1,291 @@
use cid::Cid;
use ipfs_unixfs::dir::{resolve, LookupError, ResolveError};
use std::convert::TryFrom;
use std::fmt;
use std::io::{Error as IoError, Read};
use std::path::PathBuf;
fn main() {
let path = match std::env::args()
.nth(1)
.map(|s| IpfsPath::try_from(s.as_str()))
{
Some(Ok(path)) => path,
Some(Err(e)) => {
eprintln!("Invalid path given as argument: {}", e);
std::process::exit(1);
}
None => {
eprintln!("USAGE: {} IPFSPATH\n", std::env::args().next().unwrap());
eprintln!(
"Will resolve the given IPFSPATH to a CID through any UnixFS \
directories or HAMT shards from default go-ipfs 0.5 \
configuration flatfs blockstore and write the final CID into \
stdout"
);
std::process::exit(0);
}
};
let ipfs_path = match std::env::var("IPFS_PATH") {
Ok(s) => s,
Err(e) => {
eprintln!("IPFS_PATH is not set or could not be read: {}", e);
std::process::exit(1);
}
};
let mut blocks = PathBuf::from(ipfs_path);
blocks.push("blocks");
let blockstore = ShardedBlockStore { root: blocks };
match walk(blockstore, path) {
Ok(Some(cid)) => {
println!("{}", cid);
}
Ok(None) => {
eprintln!("not found");
}
Err(Error::OpeningFailed(e)) => {
eprintln!("{}\n", e);
eprintln!("This is likely caused by either:");
eprintln!(" - ipfs does not have the block");
eprintln!(" - ipfs is configured to use non-flatfs storage");
eprintln!(" - ipfs is configured to use flatfs with different sharding");
std::process::exit(1);
}
Err(e) => {
eprintln!("Failed to walk the merkle tree: {}", e);
std::process::exit(1);
}
}
}
#[derive(Debug)]
pub enum PathError {
InvalidCid(cid::Error),
InvalidPath,
}
impl fmt::Display for PathError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
PathError::InvalidCid(e) => write!(fmt, "{}", e),
PathError::InvalidPath => write!(fmt, "invalid path"),
}
}
}
impl std::error::Error for PathError {}
/// Ipfs path following https://github.com/ipfs/go-path/
#[derive(Debug)]
pub struct IpfsPath {
/// Option to support moving the cid
root: Option<Cid>,
path: std::vec::IntoIter<String>,
}
impl From<Cid> for IpfsPath {
/// Creates a new `IpfsPath` from just the `Cid`, which is the same as parsing from a string
/// representation of a `Cid`, but cannot fail.
fn from(root: Cid) -> IpfsPath {
IpfsPath {
root: Some(root),
path: Vec::new().into_iter(),
}
}
}
impl TryFrom<&str> for IpfsPath {
type Error = PathError;
fn try_from(path: &str) -> Result<Self, Self::Error> {
let mut split = path.splitn(2, "/ipfs/");
let first = split.next();
let (_root, path) = match first {
Some("") => {
/* started with /ipfs/ */
if let Some(x) = split.next() {
// was /ipfs/x
("ipfs", x)
} else {
// just the /ipfs/
return Err(PathError::InvalidPath);
}
}
Some(x) => {
/* maybe didn't start with /ipfs/, need to check second */
if split.next().is_some() {
// x/ipfs/_
return Err(PathError::InvalidPath);
}
("", x)
}
None => return Err(PathError::InvalidPath),
};
let mut split = path.splitn(2, '/');
let root = split
.next()
.expect("first value from splitn(2, _) must exist");
let path = split
.next()
.iter()
.flat_map(|s| s.split('/').filter(|s| !s.is_empty()).map(String::from))
.collect::<Vec<_>>()
.into_iter();
let root = Some(Cid::try_from(root).map_err(PathError::InvalidCid)?);
Ok(IpfsPath { root, path })
}
}
impl IpfsPath {
pub fn take_root(&mut self) -> Option<Cid> {
self.root.take()
}
}
fn walk(blocks: ShardedBlockStore, mut path: IpfsPath) -> Result<Option<Cid>, Error> {
use ipfs_unixfs::dir::MaybeResolved::*;
let mut buf = Vec::new();
let mut root = path.take_root().unwrap();
let mut cache = None;
for segment in path.path {
println!("cache {:?}", cache);
buf.clear();
eprintln!("reading {} to resolve {:?}", root, segment);
blocks.as_file(&root.to_bytes())?.read_to_end(&mut buf)?;
let mut walker = match resolve(&buf, segment.as_str(), &mut cache)? {
Found(cid) => {
// either root was a Directory or we got lucky with a HAMT directory.
// With HAMTDirectories the top level can contain a direct link to the target, but
// it's more likely it will be found under some bucket, which would be the third
// case in this match.
println!("got lucky: found {} for {:?}", cid, segment);
println!("cache {:?}", cache);
root = cid;
continue;
}
NotFound => return Ok(None),
// when we stumble upon a HAMT shard, we'll need to look up other blocks in order to
// find the final link. The current implementation cannot search for the directory by
// hashing the name and looking it up, but the implementation can be changed underneath
// without changes to the API.
//
// HAMTDirecotories or HAMT shards are multi-block directories where the entires are
// bucketed per their hash value.
NeedToLoadMore(walker) => walker,
};
eprintln!("walking {} on {:?}", root, segment);
let mut other_blocks = 1;
loop {
let (first, _) = walker.pending_links();
buf.clear();
eprintln!(" -> reading {} while searching for {:?}", first, segment);
blocks.as_file(&first.to_bytes())?.read_to_end(&mut buf)?;
match walker.continue_walk(&buf, &mut cache)? {
NotFound => {
println!("cache {:?}", cache);
return Ok(None);
}
Found(cid) => {
eprintln!(
" resolved {} from {} after {} blocks to {}",
segment, root, other_blocks, cid
);
root = cid;
break;
}
NeedToLoadMore(next) => walker = next,
}
other_blocks += 1;
}
}
println!("cache {:?}", cache);
Ok(Some(root))
}
enum Error {
OpeningFailed(IoError),
Other(IoError),
Traversal(ResolveError),
}
impl From<IoError> for Error {
fn from(e: IoError) -> Error {
Error::Other(e)
}
}
impl From<ResolveError> for Error {
fn from(e: ResolveError) -> Error {
Error::Traversal(e)
}
}
impl From<LookupError> for Error {
fn from(e: LookupError) -> Error {
Error::Traversal(e.into())
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use Error::*;
match self {
OpeningFailed(e) => write!(fmt, "File opening failed: {}", e),
Other(e) => write!(fmt, "Other file related io error: {}", e),
Traversal(e) => write!(fmt, "Walking failed, please report this as a bug: {:?}", e),
}
}
}
struct ShardedBlockStore {
root: PathBuf,
}
impl ShardedBlockStore {
fn as_path(&self, key: &[u8]) -> PathBuf {
// assume that we have a block store with second-to-last/2 sharding
// files in Base32Upper
let encoded = multibase::Base::Base32Upper.encode(key);
let len = encoded.len();
// this is safe because base32 is ascii
let dir = &encoded[(len - 3)..(len - 1)];
assert_eq!(dir.len(), 2);
let mut path = self.root.clone();
path.push(dir);
path.push(encoded);
path.set_extension("data");
path
}
fn as_file(&self, key: &[u8]) -> Result<std::fs::File, Error> {
let path = self.as_path(key);
std::fs::OpenOptions::new()
.read(true)
.open(path)
.map_err(Error::OpeningFailed)
}
}

View File

@ -1,9 +1,7 @@
use crate::pb::PBNode;
use crate::InvalidCidInLink;
use cid::Cid;
use std::borrow::Cow;
///! dag-pb support operations. Placing this module inside unixfs module is a bit unfortunate but
///! follows from the inseparability of dag-pb and UnixFS.
use crate::pb::PBNode;
use std::borrow::Cow;
use std::convert::TryFrom;
/// Extracts the PBNode::Data field from the block as it appears on the block.
@ -15,20 +13,3 @@ pub fn node_data(block: &[u8]) -> Result<Option<&[u8]>, quick_protobuf::Error> {
None => None,
})
}
/// Extracts the PBNode::Links as Cids usable for the ipfs `refs` operation.
pub fn nameless_links(
block: &[u8],
) -> Result<Result<Vec<Cid>, InvalidCidInLink>, quick_protobuf::Error> {
let doc = PBNode::try_from(block)?;
Ok(doc
.Links
.into_iter()
.enumerate()
.map(|(nth, link)| {
let hash = link.Hash.as_deref().unwrap_or_default();
Cid::try_from(hash).map_err(|e| InvalidCidInLink::from((nth, link, e)))
})
.collect::<Result<Vec<_>, _>>())
}

View File

@ -7,6 +7,16 @@ use std::fmt;
mod sharded_lookup;
pub use sharded_lookup::{Cache, LookupError, ShardError, ShardedLookup};
mod directory;
pub(crate) use directory::{check_directory_supported, UnexpectedDirectoryProperties};
pub(crate) fn check_hamtshard_supported(
mut flat: FlatUnixFs<'_>,
) -> Result<FlatUnixFs<'_>, ShardError> {
ShardedLookup::check_supported(&mut flat)?;
Ok(flat)
}
/// Resolves a single path segment on `dag-pb` or UnixFS directories (normal, sharded).
///
/// The third parameter can always be substituted with a None but when repeatedly resolving over
@ -30,20 +40,7 @@ pub fn resolve<'needle>(
return Ok(ShardedLookup::lookup_or_start(hamt, needle, cache)?)
}
Ok(flat) if flat.data.Type == UnixFsType::Directory => {
if flat.data.filesize.is_some()
|| !flat.data.blocksizes.is_empty()
|| flat.data.hashType.is_some()
|| flat.data.fanout.is_some()
{
return Err(ResolveError::UnexpectedDirProperties {
filesize: flat.data.filesize,
blocksizes: flat.data.blocksizes,
hash_type: flat.data.hashType,
fanout: flat.data.fanout,
});
}
flat.links
check_directory_supported(flat)?.links
}
Err(ParsingFailed::InvalidUnixFs(_, PBNode { Links: links, .. }))
| Err(ParsingFailed::NoData(PBNode { Links: links, .. })) => links,
@ -103,16 +100,7 @@ pub enum ResolveError {
UnexpectedType(UnexpectedNodeType),
/// A directory had unsupported properties. These are not encountered during walking sharded
/// directories.
UnexpectedDirProperties {
/// filesize is a property of Files
filesize: Option<u64>,
/// blocksizes is a property of Files
blocksizes: Vec<u64>,
/// hash_type is a property of HAMT Shards
hash_type: Option<u64>,
/// fanout is a property of HAMT shards
fanout: Option<u64>,
},
UnexpectedDirProperties(UnexpectedDirectoryProperties),
/// Failed to read the block as a dag-pb node. Failure to read an inner UnixFS node is ignored
/// and links of the outer dag-pb are processed.
Read(quick_protobuf::Error),
@ -120,23 +108,18 @@ pub enum ResolveError {
Lookup(LookupError),
}
impl From<UnexpectedDirectoryProperties> for ResolveError {
fn from(e: UnexpectedDirectoryProperties) -> Self {
ResolveError::UnexpectedDirProperties(e)
}
}
impl fmt::Display for ResolveError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use ResolveError::*;
match self {
UnexpectedType(ut) => write!(
fmt,
"unexpected type for UnixFs: {:?}",
ut
),
UnexpectedDirProperties { filesize, blocksizes, hash_type, fanout } => write!(
fmt,
"unexpected directory properties: filesize={:?}, {} blocksizes, hash_type={:?}, fanout={:?}",
filesize,
blocksizes.len(),
hash_type,
fanout
),
UnexpectedType(ut) => write!(fmt, "unexpected type for UnixFs: {:?}", ut),
UnexpectedDirProperties(udp) => write!(fmt, "unexpected directory properties: {}", udp),
Read(e) => write!(fmt, "parsing failed: {}", e),
Lookup(e) => write!(fmt, "{}", e),
}
@ -228,6 +211,7 @@ impl MultipleMatchingLinks {
mod tests {
use super::{resolve, MaybeResolved};
use crate::test_support::FakeBlockstore;
use cid::Cid;
use hex_literal::hex;
use std::convert::TryFrom;
@ -283,4 +267,31 @@ mod tests {
// that we dont know how to resolve through this
resolve(&payload[..], "anything", &mut None).unwrap_err();
}
#[test]
fn sharded_directory_linking_to_non_sharded() {
// created this test case out of doubt that we could fail a traversal as ShardedLookup
// expects the linked cids to be hamt shards. However that cannot happen as we only resolve
// a single step.
let blocks = FakeBlockstore::with_fixtures();
let block = blocks.get_by_str("QmQXUANxYGpkwMTWQUdZBPx9jqfFP7acNgL4FHRWkndKCe");
let next = match resolve(&block[..], "non_sharded_dir", &mut None).unwrap() {
MaybeResolved::Found(cid) => cid,
x => unreachable!("{:?}", x),
};
let block = blocks.get_by_cid(&next);
let next = match resolve(&block[..], "foobar", &mut None).unwrap() {
MaybeResolved::Found(cid) => cid,
x => unreachable!("{:?}", x),
};
assert_eq!(
&next.to_string(),
"QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"
);
}
}

View File

@ -0,0 +1,60 @@
use crate::pb::FlatUnixFs;
use std::fmt;
/// Ensures the directory looks like something we actually support.
pub(crate) fn check_directory_supported(
flat: FlatUnixFs<'_>,
) -> Result<FlatUnixFs<'_>, UnexpectedDirectoryProperties> {
let data = flat.data.Data.as_deref();
if flat.data.filesize.is_some()
|| !flat.data.blocksizes.is_empty()
|| flat.data.hashType.is_some()
|| flat.data.fanout.is_some()
|| !data.unwrap_or_default().is_empty()
{
let data = data.map(|s| s.to_vec());
Err(UnexpectedDirectoryProperties {
filesize: flat.data.filesize,
blocksizes: flat.data.blocksizes,
hash_type: flat.data.hashType,
fanout: flat.data.fanout,
data,
})
} else {
Ok(flat)
}
}
/// Error case for checking if we support this directory.
#[derive(Debug)]
pub struct UnexpectedDirectoryProperties {
/// filesize is a property of Files
filesize: Option<u64>,
/// blocksizes is a property of Files
blocksizes: Vec<u64>,
/// hash_type is a property of HAMT Shards
hash_type: Option<u64>,
/// fanout is a property of HAMT shards
fanout: Option<u64>,
/// directories should have no Data
data: Option<Vec<u8>>,
}
impl fmt::Display for UnexpectedDirectoryProperties {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"filesize={:?}, {} blocksizes, hash_type={:?}, fanout={:?}, data=[",
self.filesize,
self.blocksizes.len(),
self.hash_type,
self.fanout,
)?;
for b in self.data.as_deref().unwrap_or_default() {
write!(fmt, "{:02x}", b)?;
}
write!(fmt, "]")
}
}

View File

@ -137,7 +137,7 @@ impl<'needle> ShardedLookup<'needle> {
/// Takes the validated object as mutable reference to move data out of it in case of error.
///
/// Returns an error if we don't support the properties on the HAMTShard-typed node
fn check_supported(hamt: &mut FlatUnixFs<'_>) -> Result<(), ShardError> {
pub(crate) fn check_supported(hamt: &mut FlatUnixFs<'_>) -> Result<(), ShardError> {
assert_eq!(hamt.data.Type, UnixFsType::HAMTShard);
if hamt.data.fanout != Some(256) || hamt.data.hashType != Some(34) {

View File

@ -2,8 +2,8 @@
///!
///! Most usable for walking UnixFS file trees provided by the `visit::IdleFileVisit` and
///! `visit::FileVisit` types.
use crate::pb::{ParsingFailed, UnixFs};
use crate::{InvalidCidInLink, UnexpectedNodeType};
use crate::pb::ParsingFailed;
use crate::{InvalidCidInLink, Metadata, UnexpectedNodeType};
use std::borrow::Cow;
use std::fmt;
@ -13,49 +13,6 @@ pub mod reader;
/// Higher level API for visiting the file tree.
pub mod visit;
/// Container for the unixfs metadata, which can be present at the root of the file trees.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct FileMetadata {
mode: Option<u32>,
mtime: Option<(i64, u32)>,
}
// TODO: add way to get std::fs::Permissions out of this, or maybe some libc like type?
impl FileMetadata {
/// Returns the full file mode, if one has been specified.
///
/// The full file mode is originally read through `st_mode` field of `stat` struct defined in
/// `sys/stat.h` and it's defining OpenGroup standard. Lowest 3 bytes will correspond to read,
/// write, and execute rights per user, group, and other and 4th byte determines sticky bits,
/// set user id or set group id. Following two bytes correspond to the different file types, as
/// defined by the same OpenGroup standard:
/// https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_stat.h.html
pub fn mode(&self) -> Option<u32> {
self.mode
}
/// Returns the raw timestamp of last modification time, if specified.
///
/// The timestamp is `(seconds, nanos)` similar to `std::time::Duration` with the exception of
/// allowing seconds to be negative. The seconds are calculated from `1970-01-01 00:00:00` or
/// the common "unix epoch".
pub fn mtime(&self) -> Option<(i64, u32)> {
self.mtime
}
}
impl<'a> From<&'a UnixFs<'_>> for FileMetadata {
fn from(data: &'a UnixFs<'_>) -> Self {
let mode = data.mode;
let mtime = data
.mtime
.clone()
.map(|ut| (ut.Seconds, ut.FractionalNanoseconds.unwrap_or(0)));
FileMetadata { mode, mtime }
}
}
/// Describes the errors which can happen during a visit or lower level block-by-block walking of
/// the DAG.
#[derive(Debug)]
@ -117,7 +74,7 @@ pub enum FileError {
/// Errored when the filesize is non-zero.
NoLinksNoContent,
/// Unsupported: non-root block defines metadata.
NonRootDefinesMetadata(FileMetadata),
NonRootDefinesMetadata(Metadata),
/// A non-leaf node in the tree has no filesize value which is used to determine the file range
/// for this tree.
IntermediateNodeWithoutFileSize,
@ -220,12 +177,9 @@ impl<'a> UnwrapBorrowedExt<'a> for Option<Cow<'a, [u8]>> {
}
#[cfg(test)]
mod tests {
use cid::Cid;
use std::collections::HashMap;
use std::convert::TryFrom;
pub(crate) mod tests {
use super::{reader::*, visit::*, UnwrapBorrowedExt};
use crate::test_support::FakeBlockstore;
use hex_literal::hex;
const CONTENT_FILE: &[u8] = &hex!("0a0d08021207636f6e74656e741807");
@ -244,7 +198,7 @@ mod tests {
#[test]
fn visiting_just_content() {
let res = IdleFileVisit::default().start(CONTENT_FILE);
assert!(matches!(res, Ok((b"content", _, None))), "{:?}", res);
assert!(matches!(res, Ok((b"content", _, _, None))), "{:?}", res);
}
#[test]
@ -253,7 +207,7 @@ mod tests {
.with_target_range(500_000..600_000)
.start(CONTENT_FILE);
assert!(matches!(res, Ok((b"", _, None))), "{:?}", res);
assert!(matches!(res, Ok((b"", _, _, None))), "{:?}", res);
}
#[test]
@ -264,79 +218,6 @@ mod tests {
assert!(matches!(content, FileContent::Bytes(b"")), "{:?}", content);
}
#[derive(Default)]
struct FakeBlockstore {
blocks: HashMap<Cid, Vec<u8>>,
}
impl FakeBlockstore {
fn get_by_cid<'a>(&'a self, cid: &Cid) -> &'a [u8] {
self.blocks.get(cid).unwrap()
}
fn get_by_raw<'a>(&'a self, key: &[u8]) -> &'a [u8] {
self.get_by_cid(&Cid::try_from(key).unwrap())
}
fn get_by_str<'a>(&'a self, key: &str) -> &'a [u8] {
self.get_by_cid(&Cid::try_from(key).unwrap())
}
fn insert_v0(&mut self, block: &[u8]) -> Cid {
use sha2::Digest;
let mut sha = sha2::Sha256::new();
sha.input(block);
let result = sha.result();
let mh = multihash::wrap(multihash::Code::Sha2_256, &result[..]);
let cid = Cid::new_v0(mh).unwrap();
assert!(
self.blocks.insert(cid.clone(), block.to_vec()).is_none(),
"duplicate cid {}",
cid
);
cid
}
fn with_fixtures() -> Self {
let mut this = Self::default();
let foobar_blocks: &[&[u8]] = &[
// root for "foobar\n" from go-ipfs 0.5 add -s size-2
// root
// |
// ----+-----
// | | | |
// fo ob ar \n
// QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6
&hex!("12280a221220fef9fe1804942b35e19e145a03f9c9d5ca9c997dda0a9416f3f515a52f1b3ce11200180a12280a221220dfb94b75acb208fd4873d84872af58bd65c731770a7d4c0deeb4088e87390bfe1200180a12280a221220054497ae4e89812c83276a48e3e679013a788b7c0eb02712df15095c02d6cd2c1200180a12280a221220cc332ceb37dea7d3d7c00d1393117638d3ed963575836c6d44a24951e444cf5d120018090a0c080218072002200220022001"),
// first bytes: fo
&hex!("0a0808021202666f1802"),
// ob
&hex!("0a08080212026f621802"),
// ar
&hex!("0a080802120261721802"),
// \n
&hex!("0a07080212010a1801"),
// same "foobar\n" but with go-ipfs 0.5 add --trickle -s size-2
&hex!("12280a2212200f20a024ce0152161bc23e7234573374dfc3999143deaebf9b07b9c67318f9bd1200180a12280a221220b424253c25b5a7345fc7945732e363a12a790341b7c2d758516bbad5bbaab4461200180a12280a221220b7ab6350c604a885be9bd72d833f026b1915d11abe7e8dda5d0bca689342b7411200180a12280a221220a8a826652c2a3e93a751456e71139df086a1fedfd3bd9f232ad52ea1d813720e120018090a0c080218072002200220022001"),
// the blocks have type raw instead of file, for some unknown reason
&hex!("0a0808001202666f1802"),
&hex!("0a08080012026f621802"),
&hex!("0a080800120261721802"),
&hex!("0a07080012010a1801"),
];
for block in foobar_blocks {
this.insert_v0(block);
}
this
}
}
#[test]
fn balanced_traversal() {
let target = "QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6";
@ -379,14 +260,14 @@ mod tests {
fn collect_bytes(blocks: &FakeBlockstore, visit: IdleFileVisit, start: &str) -> Vec<u8> {
let mut ret = Vec::new();
let (content, _, mut step) = visit.start(blocks.get_by_str(start)).unwrap();
let (content, _, _, mut step) = visit.start(blocks.get_by_str(start)).unwrap();
ret.extend(content);
while let Some(visit) = step {
let (first, _) = visit.pending_links();
let block = blocks.get_by_cid(first);
let (content, next_step) = visit.continue_walk(block).unwrap();
let (content, next_step) = visit.continue_walk(block, &mut None).unwrap();
ret.extend(content);
step = next_step;
}

View File

@ -3,7 +3,7 @@ use std::convert::TryFrom;
use std::fmt;
use std::ops::Range;
use crate::file::{FileError, FileMetadata, FileReadFailed, UnwrapBorrowedExt};
use crate::file::{FileError, FileReadFailed, Metadata, UnwrapBorrowedExt};
/// Navigates the UnixFs files, which are either:
/// - single block files which have everything needed to all of the contents
@ -22,11 +22,12 @@ pub struct FileReader<'a> {
links: Vec<PBLink<'a>>,
data: &'a [u8],
blocksizes: Vec<u64>,
metadata: FileMetadata,
metadata: Metadata,
file_size: u64,
}
impl AsRef<FileMetadata> for FileReader<'_> {
fn as_ref(&self) -> &FileMetadata {
impl AsRef<Metadata> for FileReader<'_> {
fn as_ref(&self) -> &Metadata {
&self.metadata
}
}
@ -74,7 +75,12 @@ impl<'a> FileReader<'a> {
/// Method for starting the file traversal. `data` is the raw data from unixfs block.
pub fn from_block(data: &'a [u8]) -> Result<Self, FileReadFailed> {
let inner = FlatUnixFs::try_from(data)?;
let metadata = FileMetadata::from(&inner.data);
let metadata = Metadata::from(&inner.data);
Self::from_parts(inner, 0, metadata)
}
pub(crate) fn from_parsed(inner: FlatUnixFs<'a>) -> Result<Self, FileReadFailed> {
let metadata = Metadata::from(&inner.data);
Self::from_parts(inner, 0, metadata)
}
@ -87,7 +93,7 @@ impl<'a> FileReader<'a> {
let inner = FlatUnixFs::try_from(data)?;
if inner.data.mode.is_some() || inner.data.mtime.is_some() {
let metadata = FileMetadata::from(&inner.data);
let metadata = Metadata::from(&inner.data);
return Err(FileError::NonRootDefinesMetadata(metadata).into());
}
@ -97,7 +103,7 @@ impl<'a> FileReader<'a> {
fn from_parts(
inner: FlatUnixFs<'a>,
offset: u64,
metadata: FileMetadata,
metadata: Metadata,
) -> Result<Self, FileReadFailed> {
let empty_or_no_content = inner
.data
@ -143,6 +149,7 @@ impl<'a> FileReader<'a> {
data,
blocksizes: inner.data.blocksizes,
metadata,
file_size: inner.data.filesize.unwrap(),
})
}
}
@ -160,6 +167,7 @@ impl<'a> FileReader<'a> {
last_offset: self.offset,
metadata: self.metadata,
file_size: self.file_size,
};
let fc = if self.links.is_empty() {
@ -174,6 +182,11 @@ impl<'a> FileReader<'a> {
(fc, traversal)
}
/// Returns the total size of the file.
pub fn file_size(&self) -> u64 {
self.file_size
}
}
/// Carrier of validation data used between blocks during a walk on the merkle tree.
@ -181,8 +194,9 @@ impl<'a> FileReader<'a> {
pub struct Traversal {
last_ending: Ending,
last_offset: u64,
file_size: u64,
metadata: FileMetadata,
metadata: Metadata,
}
impl Traversal {
@ -202,10 +216,15 @@ impl Traversal {
.check_is_suitable_next(self.last_offset, tree_range)?;
FileReader::from_continued(self, tree_range.start, next_block)
}
/// Returns the total size of the file.
pub fn file_size(&self) -> u64 {
self.file_size
}
}
impl AsRef<FileMetadata> for Traversal {
fn as_ref(&self) -> &FileMetadata {
impl AsRef<Metadata> for Traversal {
fn as_ref(&self) -> &Metadata {
&self.metadata
}
}

View File

@ -3,12 +3,15 @@ use std::convert::TryFrom;
use std::ops::Range;
use crate::file::reader::{FileContent, FileReader, Traversal};
use crate::file::{FileMetadata, FileReadFailed};
use crate::pb::merkledag::PBLink;
use crate::file::{FileReadFailed, Metadata};
use crate::pb::{merkledag::PBLink, FlatUnixFs};
use crate::InvalidCidInLink;
/// IdleFileVisit represents a prepared file visit over a tree. The user has to know the CID and be
/// able to get the block for the visit.
///
/// **Note**: For easier to use interface, you should consider using `ipfs_unixfs::walk::Walker`.
/// It uses `IdleFileVisit` and `FileVisit` internally but has a better API.
#[derive(Default, Debug)]
pub struct IdleFileVisit {
range: Option<Range<u64>>,
@ -22,14 +25,33 @@ impl IdleFileVisit {
/// Begins the visitation by processing the first block to be visited.
///
/// Returns on success a tuple of file bytes, any metadata associated, and optionally a
/// `FileVisit` to continue the walk.
/// Returns (on success) a tuple of file bytes, total file size, any metadata associated, and
/// optionally a `FileVisit` to continue the walk.
#[allow(clippy::type_complexity)]
pub fn start(
self,
block: &[u8],
) -> Result<(&[u8], FileMetadata, Option<FileVisit>), FileReadFailed> {
) -> Result<(&[u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
let fr = FileReader::from_block(block)?;
self.start_from_reader(fr, &mut None)
}
#[allow(clippy::type_complexity)]
pub(crate) fn start_from_parsed<'a>(
self,
block: FlatUnixFs<'a>,
cache: &'_ mut Option<Cache>,
) -> Result<(&'a [u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
let fr = FileReader::from_parsed(block)?;
self.start_from_reader(fr, cache)
}
#[allow(clippy::type_complexity)]
fn start_from_reader<'a>(
self,
fr: FileReader<'a>,
cache: &'_ mut Option<Cache>,
) -> Result<(&'a [u8], u64, Metadata, Option<FileVisit>), FileReadFailed> {
let metadata = fr.as_ref().to_owned();
let (content, traversal) = fr.content();
@ -38,32 +60,37 @@ impl IdleFileVisit {
FileContent::Bytes(content) => {
let block = 0..content.len() as u64;
let content = maybe_target_slice(content, &block, self.range.as_ref());
Ok((content, metadata, None))
Ok((content, traversal.file_size(), metadata, None))
}
FileContent::Links(iter) => {
// we need to select suitable here
let mut pending = iter
.enumerate()
.filter_map(|(i, (link, range))| {
if !block_is_in_target_range(&range, self.range.as_ref()) {
return None;
}
let mut links = cache.take().unwrap_or_default().inner;
Some(to_pending(i, link, range))
})
.collect::<Result<Vec<(Cid, Range<u64>)>, _>>()?;
let pending = iter.enumerate().filter_map(|(i, (link, range))| {
if !block_is_in_target_range(&range, self.range.as_ref()) {
return None;
}
Some(to_pending(i, link, range))
});
for item in pending {
links.push(item?);
}
// order is reversed to consume them in the depth first order
pending.reverse();
links.reverse();
if pending.is_empty() {
Ok((&[][..], metadata, None))
if links.is_empty() {
*cache = Some(links.into());
Ok((&[][..], traversal.file_size(), metadata, None))
} else {
Ok((
&[][..],
traversal.file_size(),
metadata,
Some(FileVisit {
pending,
pending: links,
state: traversal,
range: self.range,
}),
@ -74,10 +101,27 @@ impl IdleFileVisit {
}
}
/// Optional cache for datastructures which can be re-used without re-allocation between walks of
/// different files.
#[derive(Default)]
pub struct Cache {
inner: Vec<(Cid, Range<u64>)>,
}
impl From<Vec<(Cid, Range<u64>)>> for Cache {
fn from(mut inner: Vec<(Cid, Range<u64>)>) -> Self {
inner.clear();
Cache { inner }
}
}
/// FileVisit represents an ongoing visitation over an UnixFs File tree.
///
/// The file visitor does **not** implement size validation of merkledag links at the moment. This
/// could be implmented with generational storage and it would require an u64 per link.
///
/// **Note**: For easier to use interface, you should consider using `ipfs_unixfs::walk::Walker`.
/// It uses `IdleFileVisit` and `FileVisit` internally but has a better API.
#[derive(Debug)]
pub struct FileVisit {
/// The internal cache for pending work. Order is such that the next is always the last item,
@ -109,7 +153,11 @@ impl FileVisit {
///
/// Returns on success a tuple of bytes and new version of `FileVisit` to continue the visit,
/// when there is something more to visit.
pub fn continue_walk(mut self, next: &[u8]) -> Result<(&[u8], Option<Self>), FileReadFailed> {
pub fn continue_walk<'a>(
mut self,
next: &'a [u8],
cache: &mut Option<Cache>,
) -> Result<(&'a [u8], Option<Self>), FileReadFailed> {
let traversal = self.state;
let (_, range) = self
.pending
@ -127,6 +175,7 @@ impl FileVisit {
self.state = traversal;
Ok((content, Some(self)))
} else {
*cache = Some(self.pending.into());
Ok((content, None))
}
}
@ -149,10 +198,15 @@ impl FileVisit {
}
}
}
/// Returns the total size of the file in bytes.
pub fn file_size(&self) -> u64 {
self.state.file_size()
}
}
impl AsRef<FileMetadata> for FileVisit {
fn as_ref(&self) -> &FileMetadata {
impl AsRef<Metadata> for FileVisit {
fn as_ref(&self) -> &Metadata {
self.state.as_ref()
}
}

View File

@ -1,5 +1,12 @@
#![warn(rust_2018_idioms, missing_docs)]
//! ipfs-unixfs
//! ipfs-unixfs: UnixFs tree support in Rust.
//!
//! The crate aims to provide a blockstore implementation independent of the UnixFs implementation by
//! working on slices and not doing any IO operations.
//!
//! The main entry point for extracting information and/or data out of UnixFs trees is
//! `ipfs_unixfs::walk::Walker`. To resolve `IpfsPath` segments over dag-pb nodes,
//! `ipfs_unixfs::resolve` should be used.
use std::borrow::Cow;
use std::fmt;
@ -7,17 +14,23 @@ use std::fmt;
/// UnixFS file support.
pub mod file;
/// UnixFS directory support.
/// UnixFS directory support, currently only the resolving re-exported at root level.
pub mod dir;
pub use dir::{resolve, LookupError, MaybeResolved, ResolveError};
mod pb;
use crate::pb::UnixFsType;
use pb::{UnixFs, UnixFsType};
/// Support operations for the dag-pb, the outer shell of UnixFS.
pub mod dagpb;
/// Support for walking over all UnixFs trees.
pub mod walk;
#[cfg(test)]
pub(crate) mod test_support;
/// A link could not be transformed into a Cid.
#[derive(Debug)]
pub struct InvalidCidInLink {
@ -113,3 +126,52 @@ impl UnexpectedNodeType {
}
}
}
/// A container for the UnixFs metadata, which can be present at the root of the file, directory, or symlink trees.
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct Metadata {
mode: Option<u32>,
mtime: Option<(i64, u32)>,
}
impl Metadata {
/// Returns the full file mode, if one has been specified.
///
/// The full file mode is originally read through `st_mode` field of `stat` struct defined in
/// `sys/stat.h` and its defining OpenGroup standard. The lowest 3 bytes correspond to read,
/// write, and execute rights per user, group, and other, while the 4th byte determines sticky bits,
/// set user id or set group id. The following two bytes correspond to the different file types, as
/// defined by the same OpenGroup standard:
/// https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/sys_stat.h.html
pub fn mode(&self) -> Option<u32> {
self.mode
}
/// Returns the raw timestamp of last modification time, if specified.
///
/// The timestamp is `(seconds, nanos)` - similar to `std::time::Duration`, with the exception of
/// allowing seconds to be negative. The seconds are calculated from `1970-01-01 00:00:00` or
/// the common "unix epoch".
pub fn mtime(&self) -> Option<(i64, u32)> {
self.mtime
}
/// Returns the mtime metadata as a `FileTime`. Enabled only in the `filetime` feature.
#[cfg(feature = "filetime")]
pub fn mtime_as_filetime(&self) -> Option<filetime::FileTime> {
self.mtime()
.map(|(seconds, nanos)| filetime::FileTime::from_unix_time(seconds, nanos))
}
}
impl<'a> From<&'a UnixFs<'_>> for Metadata {
fn from(data: &'a UnixFs<'_>) -> Self {
let mode = data.mode;
let mtime = data
.mtime
.clone()
.map(|ut| (ut.Seconds, ut.FractionalNanoseconds.unwrap_or(0)));
Metadata { mode, mtime }
}
}

116
unixfs/src/test_support.rs Normal file
View File

@ -0,0 +1,116 @@
use cid::Cid;
use hex_literal::hex;
use std::collections::HashMap;
use std::convert::TryFrom;
#[derive(Default)]
pub struct FakeBlockstore {
blocks: HashMap<Cid, Vec<u8>>,
}
impl FakeBlockstore {
pub fn get_by_cid<'a>(&'a self, cid: &Cid) -> &'a [u8] {
self.blocks
.get(cid)
.unwrap_or_else(|| panic!("cid not found: {}", cid))
}
pub fn get_by_raw<'a>(&'a self, key: &[u8]) -> &'a [u8] {
self.get_by_cid(&Cid::try_from(key).unwrap())
}
pub fn get_by_str<'a>(&'a self, key: &str) -> &'a [u8] {
self.get_by_cid(&Cid::try_from(key).unwrap())
}
pub fn insert_v0(&mut self, block: &[u8]) -> Cid {
use sha2::Digest;
let mut sha = sha2::Sha256::new();
sha.input(block);
let result = sha.result();
let mh = multihash::wrap(multihash::Code::Sha2_256, &result[..]);
let cid = Cid::new_v0(mh).unwrap();
assert!(
self.blocks.insert(cid.clone(), block.to_vec()).is_none(),
"duplicate cid {}",
cid
);
cid
}
pub fn with_fixtures() -> Self {
let mut this = Self::default();
let foobar_blocks: &[&[u8]] = &[
// root for "foobar\n" from go-ipfs 0.5 add -s size-2
// root
// |
// ----+-----
// | | | |
// fo ob ar \n
// QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6
&hex!("12280a221220fef9fe1804942b35e19e145a03f9c9d5ca9c997dda0a9416f3f515a52f1b3ce11200180a12280a221220dfb94b75acb208fd4873d84872af58bd65c731770a7d4c0deeb4088e87390bfe1200180a12280a221220054497ae4e89812c83276a48e3e679013a788b7c0eb02712df15095c02d6cd2c1200180a12280a221220cc332ceb37dea7d3d7c00d1393117638d3ed963575836c6d44a24951e444cf5d120018090a0c080218072002200220022001"),
// first bytes: fo
&hex!("0a0808021202666f1802"),
// ob
&hex!("0a08080212026f621802"),
// ar
&hex!("0a080802120261721802"),
// \n
&hex!("0a07080212010a1801"),
// same "foobar\n" but with go-ipfs 0.5 add --trickle -s size-2
// QmWfQ48ChJUj4vWKFsUDe4646xCBmXgdmNfhjz9T7crywd
&hex!("12280a2212200f20a024ce0152161bc23e7234573374dfc3999143deaebf9b07b9c67318f9bd1200180a12280a221220b424253c25b5a7345fc7945732e363a12a790341b7c2d758516bbad5bbaab4461200180a12280a221220b7ab6350c604a885be9bd72d833f026b1915d11abe7e8dda5d0bca689342b7411200180a12280a221220a8a826652c2a3e93a751456e71139df086a1fedfd3bd9f232ad52ea1d813720e120018090a0c080218072002200220022001"),
// the blocks have type raw instead of file, for some unknown reason
&hex!("0a0808001202666f1802"),
&hex!("0a08080012026f621802"),
&hex!("0a080800120261721802"),
&hex!("0a07080012010a1801"),
// directory of the above two: QmVkvLsSEm2uJx1h5Fqukje8mMPYg393o5C2kMCkF2bBTA
&hex!("12380a2212202bf7f75b76e336f34a04abd86af423b5063628ffd91e5392444078851dc31655120f666f6f6261722e62616c616e63656418dd0112370a2212207baaf5e250ba1352f97eddc95840705890dc5d3fc37084a4c1aa052abcf4ac58120e666f6f6261722e747269636b6c6518dd010a020801"),
// a directory with the above directory of two: QmPTotyhVnnfCu9R4qwR4cdhpi5ENaiP8ZJfdqsm8Dw2jB
&hex!("12570a2212206e396cd762f0ab55cc48e10b3c9d5a8428fc2888f4ccda86b72d6aa9fc020cb5122e516d566b764c7353456d32754a783168354671756b6a65386d4d5059673339336f3543326b4d436b46326242544118b1040a020801"),
// sharded directory where all are collisions with small number of links, each
// bucket has two links: QmZbFPTnDBMWbQ6iBxQAhuhLz8Nu9XptYS96e7cuf5wvbk
&hex!("122b0a221220904e1485d68b56a71f79d44cd306d536ee52adb6a90c29b6a1fa95a504a038f71202303718b501122b0a221220772026a2c0e021710f8d0d8f72080255d5133556d3ae881e3405e673692f79a81202313318b001122b0a22122075e9df118a625120006c63b75c8f25f1e28397555ccf8c107029332d5e9b648a1202353418aa01122b0a221220db916fd000e12decdf0724965cbf419233a187ae415d59fbafea2c3851e584ad1202353618b101122b0a2212209adc67f730bd8b2f7eff8f2910ec8391814da9d7ae08d076165a9832bce99f921202383218af01122b0a221220bb48edba8f029483a6983ba70aef2cd86d14aa633f33007ce175680105da8d811202433118af01122b0a22122047b1f317152eb425d878e5e3577dd7c40af4bc2b005083c4bc9ec19157a8605c1202443418a601122b0a2212207b7e161cf9246d7fca2e2986aac98bbf2fef4f13f6fea497fc8f43d8899e0de51202463118a6010a280805121f020000001000020000000000000004000000000050000000000000000800802822308002"),
// file linked to by all names (empty): QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH
&hex!("0a0408021800"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121534386c6f6e672d6e616d65642d66696c652d3031361806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121546426c6f6e672d6e616d65642d66696c652d30333718060a290805122008000000000000000000000000000000000000000000010000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121538376c6f6e672d6e616d65642d66696c652d3035381806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121544446c6f6e672d6e616d65642d66696c652d30303918060a250805121c200000000000000000000080000000000000000000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121536396c6f6e672d6e616d65642d66696c652d3033381806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121544356c6f6e672d6e616d65642d66696c652d30353018060a240805121b2000000000000000000000000002000000000000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121532416c6f6e672d6e616d65642d66696c652d3034391806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121543436c6f6e672d6e616d65642d66696c652d30303418060a230805121a10000000000000000000000000000000000000000400000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121543346c6f6e672d6e616d65642d66696c652d3032351806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121543416c6f6e672d6e616d65642d66696c652d30333418060a230805121a04100000000000000000000000000000000000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121534356c6f6e672d6e616d65642d66696c652d3034311806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121541366c6f6e672d6e616d65642d66696c652d30333318060a1e080512154000000000000000000000002000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121536396c6f6e672d6e616d65642d66696c652d3031371806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121538306c6f6e672d6e616d65642d66696c652d30343018060a1a0805121101000002000000000000000000000000002822308002"),
&hex!("123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121536376c6f6e672d6e616d65642d66696c652d3030331806123d0a221220bfccda787baba32b59c78450ac3d20b633360b43992c77289f9ed46d843561e6121538356c6f6e672d6e616d65642d66696c652d30343818060a1a0805121120000000800000000000000000000000002822308002"),
// symlink linking to "foobar": QmNgQEdXVdLw79nH2bnxLMxnyWMaXrijfqMTiDVat3iyuz
&hex!("0a0a08041206666f6f626172"),
// sharded directory with single link to a non-sharded directory
// QmQXUANxYGpkwMTWQUdZBPx9jqfFP7acNgL4FHRWkndKCe
&hex!("12390a2212209b04586b8bdc01a7e0db04b8358a3717954572720f6b6803af5eec781cf73801121146416e6f6e5f736861726465645f64697218430a290805122004000000000000000000000000000000000000000000000000000000000000002822308002"),
// the non-sharded directory linked by the the above sharded directory
// QmYmmkD3dGZjuozuqSzDYjU4ZyhAgc4T4P4SUgY6qjzBi8
&hex!("122e0a22122031c3d57080d8463a3c63b2923df5a1d40ad7a73eae5a14af584213e5f504ac331206666f6f626172180f0a020801"),
// single block version of "foobar\n" linked to by above non-sharded directory
&hex!("0a0d08021207666f6f6261720a1807"),
];
for block in foobar_blocks {
this.insert_v0(block);
}
this
}
}

1067
unixfs/src/walk.rs Normal file

File diff suppressed because it is too large Load Diff