refactor: move ipfs_unixfs::dir::walk to top level
This commit is contained in:
parent
d61b0424d8
commit
9b18427e16
@ -12,9 +12,9 @@ use bytes::{Bytes, BytesMut, buf::BufMut};
|
|||||||
use tar::{Header, EntryType};
|
use tar::{Header, EntryType};
|
||||||
use futures::stream::TryStream;
|
use futures::stream::TryStream;
|
||||||
use ipfs::unixfs::ll::file::FileMetadata;
|
use ipfs::unixfs::ll::file::FileMetadata;
|
||||||
use ipfs::unixfs::{ll::file::FileReadFailed, TraversalFailed, ll::file::visit::Cache};
|
use ipfs::unixfs::{ll::file::FileReadFailed, TraversalFailed};
|
||||||
use crate::v0::refs::{walk_path, IpfsPath};
|
use crate::v0::refs::{walk_path, IpfsPath};
|
||||||
use ipfs::unixfs::ll::dir::walk::{self, Walker, ContinuedWalk};
|
use ipfs::unixfs::ll::walk::{self, Walker, ContinuedWalk};
|
||||||
use ipfs::Block;
|
use ipfs::Block;
|
||||||
use async_stream::try_stream;
|
use async_stream::try_stream;
|
||||||
|
|
||||||
@ -104,7 +104,7 @@ async fn get_inner<T: IpfsTypes>(ipfs: Ipfs<T>, args: GetArgs) -> Result<impl Re
|
|||||||
fn walk<Types: IpfsTypes>(ipfs: Ipfs<Types>, root: Cid)
|
fn walk<Types: IpfsTypes>(ipfs: Ipfs<Types>, root: Cid)
|
||||||
-> impl TryStream<Ok = Bytes, Error = GetError> + 'static
|
-> impl TryStream<Ok = Bytes, Error = GetError> + 'static
|
||||||
{
|
{
|
||||||
let mut cache: Option<Cache> = None;
|
let mut cache = None;
|
||||||
let mut tar_helper = TarHelper::with_buffer_sizes(16 * 1024);
|
let mut tar_helper = TarHelper::with_buffer_sizes(16 * 1024);
|
||||||
|
|
||||||
let mut root = Some(root);
|
let mut root = Some(root);
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use cid::Cid;
|
use cid::Cid;
|
||||||
use ipfs_unixfs::file::{FileReadFailed};
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{Error as IoError, Read};
|
use std::io::{Error as IoError, Read};
|
||||||
@ -57,7 +56,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(), Error> {
|
fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(), Error> {
|
||||||
use ipfs_unixfs::dir::walk::{Walker, ContinuedWalk};
|
use ipfs_unixfs::walk::{Walker, ContinuedWalk};
|
||||||
use std::io::{stdout, Write};
|
use std::io::{stdout, Write};
|
||||||
|
|
||||||
let stdout = stdout();
|
let stdout = stdout();
|
||||||
@ -75,11 +74,11 @@ fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(), Error> {
|
|||||||
|
|
||||||
let mut cache = None;
|
let mut cache = None;
|
||||||
|
|
||||||
let mut visit = match Walker::start(&buf, "", &mut cache).unwrap() {
|
let mut visit = match Walker::start(&buf, "", &mut cache)? {
|
||||||
ContinuedWalk::Directory(item) => {
|
ContinuedWalk::Directory(item) => {
|
||||||
item.into_inner()
|
item.into_inner()
|
||||||
},
|
},
|
||||||
x => unreachable!("{:?}", x),
|
x => todo!("Only root level directories are supported in this exporter, not: {:?}", x),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut header = tar::Header::new_gnu();
|
let mut header = tar::Header::new_gnu();
|
||||||
@ -114,7 +113,7 @@ fn walk(blocks: ShardedBlockStore, start: &Cid) -> Result<(), Error> {
|
|||||||
// call to continue_walk.
|
// call to continue_walk.
|
||||||
let (next, _) = walker.pending_links();
|
let (next, _) = walker.pending_links();
|
||||||
blocks.as_file(&next.to_bytes())?.read_to_end(&mut buf)?;
|
blocks.as_file(&next.to_bytes())?.read_to_end(&mut buf)?;
|
||||||
visit = match walker.continue_walk(&buf, &mut cache).unwrap() {
|
visit = match walker.continue_walk(&buf, &mut cache)? {
|
||||||
ContinuedWalk::File(segment, item) => {
|
ContinuedWalk::File(segment, item) => {
|
||||||
|
|
||||||
let total_size = item.as_entry().total_file_size().unwrap();
|
let total_size = item.as_entry().total_file_size().unwrap();
|
||||||
@ -372,7 +371,7 @@ fn path2bytes(p: &Path) -> &[u8] {
|
|||||||
enum Error {
|
enum Error {
|
||||||
OpeningFailed(IoError),
|
OpeningFailed(IoError),
|
||||||
Other(IoError),
|
Other(IoError),
|
||||||
Traversal(ipfs_unixfs::file::FileReadFailed),
|
Walk(ipfs_unixfs::walk::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<IoError> for Error {
|
impl From<IoError> for Error {
|
||||||
@ -381,9 +380,9 @@ impl From<IoError> for Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<FileReadFailed> for Error {
|
impl From<ipfs_unixfs::walk::Error> for Error {
|
||||||
fn from(e: FileReadFailed) -> Error {
|
fn from(e: ipfs_unixfs::walk::Error) -> Error {
|
||||||
Error::Traversal(e)
|
Error::Walk(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,7 +392,7 @@ impl fmt::Display for Error {
|
|||||||
match self {
|
match self {
|
||||||
OpeningFailed(e) => write!(fmt, "File opening failed: {}", e),
|
OpeningFailed(e) => write!(fmt, "File opening failed: {}", e),
|
||||||
Other(e) => write!(fmt, "Other file related io error: {}", e),
|
Other(e) => write!(fmt, "Other file related io error: {}", e),
|
||||||
Traversal(e) => write!(fmt, "Traversal failed, please report this as a bug: {}", e),
|
Walk(e) => write!(fmt, "Walk failed, please report this as a bug: {}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
291
unixfs/examples/resolve.rs
Normal file
291
unixfs/examples/resolve.rs
Normal file
@ -0,0 +1,291 @@
|
|||||||
|
use cid::Cid;
|
||||||
|
use ipfs_unixfs::dir::{resolve, LookupError, ResolveError};
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
use std::fmt;
|
||||||
|
use std::io::{Error as IoError, Read};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let path = match std::env::args()
|
||||||
|
.nth(1)
|
||||||
|
.map(|s| IpfsPath::try_from(s.as_str()))
|
||||||
|
{
|
||||||
|
Some(Ok(path)) => path,
|
||||||
|
Some(Err(e)) => {
|
||||||
|
eprintln!("Invalid path given as argument: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
eprintln!("USAGE: {} IPFSPATH\n", std::env::args().next().unwrap());
|
||||||
|
eprintln!(
|
||||||
|
"Will resolve the given IPFSPATH to a CID through any UnixFS \
|
||||||
|
directories or HAMT shards from default go-ipfs 0.5 \
|
||||||
|
configuration flatfs blockstore and write the final CID into \
|
||||||
|
stdout"
|
||||||
|
);
|
||||||
|
std::process::exit(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let ipfs_path = match std::env::var("IPFS_PATH") {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("IPFS_PATH is not set or could not be read: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut blocks = PathBuf::from(ipfs_path);
|
||||||
|
blocks.push("blocks");
|
||||||
|
|
||||||
|
let blockstore = ShardedBlockStore { root: blocks };
|
||||||
|
|
||||||
|
match walk(blockstore, path) {
|
||||||
|
Ok(Some(cid)) => {
|
||||||
|
println!("{}", cid);
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
eprintln!("not found");
|
||||||
|
}
|
||||||
|
Err(Error::OpeningFailed(e)) => {
|
||||||
|
eprintln!("{}\n", e);
|
||||||
|
eprintln!("This is likely caused by either:");
|
||||||
|
eprintln!(" - ipfs does not have the block");
|
||||||
|
eprintln!(" - ipfs is configured to use non-flatfs storage");
|
||||||
|
eprintln!(" - ipfs is configured to use flatfs with different sharding");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to walk the merkle tree: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PathError {
|
||||||
|
InvalidCid(cid::Error),
|
||||||
|
InvalidPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PathError {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match *self {
|
||||||
|
PathError::InvalidCid(e) => write!(fmt, "{}", e),
|
||||||
|
PathError::InvalidPath => write!(fmt, "invalid path"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for PathError {}
|
||||||
|
|
||||||
|
/// Ipfs path following https://github.com/ipfs/go-path/
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct IpfsPath {
|
||||||
|
/// Option to support moving the cid
|
||||||
|
root: Option<Cid>,
|
||||||
|
path: std::vec::IntoIter<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Cid> for IpfsPath {
|
||||||
|
/// Creates a new IpfsPath from just the Cid, which is the same as parsing from a string
|
||||||
|
/// representation of a Cid but cannot fail.
|
||||||
|
fn from(root: Cid) -> IpfsPath {
|
||||||
|
IpfsPath {
|
||||||
|
root: Some(root),
|
||||||
|
path: Vec::new().into_iter(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<&str> for IpfsPath {
|
||||||
|
type Error = PathError;
|
||||||
|
|
||||||
|
fn try_from(path: &str) -> Result<Self, Self::Error> {
|
||||||
|
let mut split = path.splitn(2, "/ipfs/");
|
||||||
|
let first = split.next();
|
||||||
|
let (_root, path) = match first {
|
||||||
|
Some("") => {
|
||||||
|
/* started with /ipfs/ */
|
||||||
|
if let Some(x) = split.next() {
|
||||||
|
// was /ipfs/x
|
||||||
|
("ipfs", x)
|
||||||
|
} else {
|
||||||
|
// just the /ipfs/
|
||||||
|
return Err(PathError::InvalidPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(x) => {
|
||||||
|
/* maybe didn't start with /ipfs/, need to check second */
|
||||||
|
if split.next().is_some() {
|
||||||
|
// x/ipfs/_
|
||||||
|
return Err(PathError::InvalidPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
("", x)
|
||||||
|
}
|
||||||
|
None => return Err(PathError::InvalidPath),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut split = path.splitn(2, '/');
|
||||||
|
let root = split
|
||||||
|
.next()
|
||||||
|
.expect("first value from splitn(2, _) must exist");
|
||||||
|
|
||||||
|
let path = split
|
||||||
|
.next()
|
||||||
|
.iter()
|
||||||
|
.flat_map(|s| s.split('/').filter(|s| !s.is_empty()).map(String::from))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
|
let root = Some(Cid::try_from(root).map_err(PathError::InvalidCid)?);
|
||||||
|
|
||||||
|
Ok(IpfsPath { root, path })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IpfsPath {
|
||||||
|
pub fn take_root(&mut self) -> Option<Cid> {
|
||||||
|
self.root.take()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn walk(blocks: ShardedBlockStore, mut path: IpfsPath) -> Result<Option<Cid>, Error> {
|
||||||
|
use ipfs_unixfs::dir::MaybeResolved::*;
|
||||||
|
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
let mut root = path.take_root().unwrap();
|
||||||
|
|
||||||
|
let mut cache = None;
|
||||||
|
|
||||||
|
for segment in path.path {
|
||||||
|
println!("cache {:?}", cache);
|
||||||
|
buf.clear();
|
||||||
|
eprintln!("reading {} to resolve {:?}", root, segment);
|
||||||
|
blocks.as_file(&root.to_bytes())?.read_to_end(&mut buf)?;
|
||||||
|
|
||||||
|
let mut walker = match resolve(&buf, segment.as_str(), &mut cache)? {
|
||||||
|
Found(cid) => {
|
||||||
|
// either root was a Directory or we got lucky with a HAMT directory.
|
||||||
|
// With HAMTDirectories the top level can contain a direct link to the target, but
|
||||||
|
// it's more likely it will be found under some bucket, which would be the third
|
||||||
|
// case in this match.
|
||||||
|
println!("got lucky: found {} for {:?}", cid, segment);
|
||||||
|
println!("cache {:?}", cache);
|
||||||
|
root = cid;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
NotFound => return Ok(None),
|
||||||
|
|
||||||
|
// when we stumble upon a HAMT shard, we'll need to look up other blocks in order to
|
||||||
|
// find the final link. The current implementation cannot lookup the directory by
|
||||||
|
// hashing the name and looking it up, but the implementation can be changed underneath
|
||||||
|
// but the API would stay the same.
|
||||||
|
//
|
||||||
|
// HAMTDirecotories or HAMT shards are multi-block directories where the entires are
|
||||||
|
// bucketed per their hash value.
|
||||||
|
NeedToLoadMore(walker) => walker,
|
||||||
|
};
|
||||||
|
|
||||||
|
eprintln!("walking {} on {:?}", root, segment);
|
||||||
|
|
||||||
|
let mut other_blocks = 1;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (first, _) = walker.pending_links();
|
||||||
|
buf.clear();
|
||||||
|
eprintln!(" -> reading {} while searching for {:?}", first, segment);
|
||||||
|
blocks.as_file(&first.to_bytes())?.read_to_end(&mut buf)?;
|
||||||
|
|
||||||
|
match walker.continue_walk(&buf, &mut cache)? {
|
||||||
|
NotFound => {
|
||||||
|
println!("cache {:?}", cache);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Found(cid) => {
|
||||||
|
eprintln!(
|
||||||
|
" resolved {} from {} after {} blocks to {}",
|
||||||
|
segment, root, other_blocks, cid
|
||||||
|
);
|
||||||
|
root = cid;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
NeedToLoadMore(next) => walker = next,
|
||||||
|
}
|
||||||
|
other_blocks += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("cache {:?}", cache);
|
||||||
|
Ok(Some(root))
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Error {
|
||||||
|
OpeningFailed(IoError),
|
||||||
|
Other(IoError),
|
||||||
|
Traversal(ResolveError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<IoError> for Error {
|
||||||
|
fn from(e: IoError) -> Error {
|
||||||
|
Error::Other(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ResolveError> for Error {
|
||||||
|
fn from(e: ResolveError) -> Error {
|
||||||
|
Error::Traversal(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<LookupError> for Error {
|
||||||
|
fn from(e: LookupError) -> Error {
|
||||||
|
Error::Traversal(e.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for Error {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
use Error::*;
|
||||||
|
match self {
|
||||||
|
OpeningFailed(e) => write!(fmt, "File opening failed: {}", e),
|
||||||
|
Other(e) => write!(fmt, "Other file related io error: {}", e),
|
||||||
|
Traversal(e) => write!(fmt, "Walking failed, please report this as a bug: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ShardedBlockStore {
|
||||||
|
root: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShardedBlockStore {
|
||||||
|
fn as_path(&self, key: &[u8]) -> PathBuf {
|
||||||
|
// assume that we have a block store with second-to-last/2 sharding
|
||||||
|
// files in Base32Upper
|
||||||
|
|
||||||
|
let encoded = multibase::Base::Base32Upper.encode(key);
|
||||||
|
let len = encoded.len();
|
||||||
|
|
||||||
|
// this is safe because base32 is ascii
|
||||||
|
let dir = &encoded[(len - 3)..(len - 1)];
|
||||||
|
assert_eq!(dir.len(), 2);
|
||||||
|
|
||||||
|
let mut path = self.root.clone();
|
||||||
|
path.push(dir);
|
||||||
|
path.push(encoded);
|
||||||
|
path.set_extension("data");
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_file(&self, key: &[u8]) -> Result<std::fs::File, Error> {
|
||||||
|
let path = self.as_path(key);
|
||||||
|
|
||||||
|
std::fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(path)
|
||||||
|
.map_err(Error::OpeningFailed)
|
||||||
|
}
|
||||||
|
}
|
@ -7,9 +7,6 @@ use std::fmt;
|
|||||||
mod sharded_lookup;
|
mod sharded_lookup;
|
||||||
pub use sharded_lookup::{Cache, LookupError, ShardError, ShardedLookup};
|
pub use sharded_lookup::{Cache, LookupError, ShardError, ShardedLookup};
|
||||||
|
|
||||||
/// Support for walking over all UnixFs trees. TODO: This needs to be moved to top-level.
|
|
||||||
pub mod walk;
|
|
||||||
|
|
||||||
/// Resolves a single path segment on `dag-pb` or UnixFS directories (normal, sharded).
|
/// Resolves a single path segment on `dag-pb` or UnixFS directories (normal, sharded).
|
||||||
///
|
///
|
||||||
/// The third parameter can always be substituted with a None but when repeatedly resolving over
|
/// The third parameter can always be substituted with a None but when repeatedly resolving over
|
||||||
|
@ -18,6 +18,9 @@ use crate::pb::UnixFsType;
|
|||||||
/// Support operations for the dag-pb, the outer shell of UnixFS.
|
/// Support operations for the dag-pb, the outer shell of UnixFS.
|
||||||
pub mod dagpb;
|
pub mod dagpb;
|
||||||
|
|
||||||
|
/// Support for walking over all UnixFs trees.
|
||||||
|
pub mod walk;
|
||||||
|
|
||||||
/// A link could not be transformed into a Cid.
|
/// A link could not be transformed into a Cid.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct InvalidCidInLink {
|
pub struct InvalidCidInLink {
|
||||||
|
Loading…
Reference in New Issue
Block a user