diff --git a/Cargo.lock b/Cargo.lock index 395b44c9..39c981da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2008,6 +2008,7 @@ dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-async-await 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2022,6 +2023,15 @@ dependencies = [ "tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-async-await" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-codec" version = "0.1.1" @@ -2609,6 +2619,7 @@ dependencies = [ "checksum tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e9175261fbdb60781fcd388a4d6cc7e14764a2b629a7ad94abb439aed223a44f" "checksum tk-listen 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5462b0f968c0457efe38fcd2df7e487096b992419e4f5337b06775a614bbda4b" "checksum tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "e0500b88064f08bebddd0c0bed39e19f5c567a5f30975bee52b0c0d3e2eeb38c" +"checksum tokio-async-await 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8221e9db7675e145fd4ecf5d8e017be9f198fb57ce1d11c7c1b6d7d9d73181bc" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "331c8acc267855ec06eb0c94618dcbbfea45bed2d20b77252940095273fb58f6" "checksum tokio-dns-unofficial 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "82c65483db54eb91b4ef3a9389a3364558590faf30ce473141707c0e16fda975" diff --git a/Cargo.toml b/Cargo.toml index 457334e3..1bc88b5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,5 @@ rustc-serialize = "0.3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -tokio = "*" +tokio = { version = "*", features = ["async-await-preview"] } xdg = "*" \ No newline at end of file diff --git a/src/ipld/dag.rs b/src/ipld/dag.rs index baa5a7a0..177f1654 100644 --- a/src/ipld/dag.rs +++ b/src/ipld/dag.rs @@ -1,128 +1,152 @@ -use crate::block::{Cid, Block}; +use crate::block::Cid; use crate::ipld::{Ipld, IpldError, IpldPath, SubPath}; -use std::collections::HashMap; +use crate::repo::{Repo, RepoTypes, BlockStore}; +use futures::future::FutureObj; -pub struct IpldDag { - blocks: HashMap, +pub struct IpldDag { + repo: Repo, } -impl IpldDag { - pub fn new() -> Self { +impl IpldDag { + pub fn new(repo: Repo) -> Self { IpldDag { - blocks: HashMap::new(), + repo, } } - pub fn put(&mut self, data: &Ipld) -> Result { - let block = data.to_dag_cbor()?; - let cid = block.cid(); - self.blocks.insert(block.cid(), block); - Ok(cid) + pub fn put(&self, data: Ipld) -> FutureObj<'static, Result> { + let block_store = self.repo.block_store.clone(); + FutureObj::new(Box::new(async move { + let block = data.to_dag_cbor()?; + let cid = await!(block_store.put(block))?; + Ok(cid) + })) } - fn get_ipld(&self, cid: &Cid) -> Result, IpldError> { - let block = self.blocks.get(cid); - match block { - Some(block) => Ok(Some(Ipld::from(block)?)), - None => Ok(None), - } - } - - pub fn get(&self, path: &IpldPath) -> Result, IpldError> { - let mut ipld = self.get_ipld(&path.root())?; - for sub_path in path.iter() { - if ipld.is_none() { - return Ok(None); - } - let ipld_owned = ipld.take().unwrap(); - let new_ipld = match sub_path { - SubPath::Key(ref key) => { - if let Ipld::Object(mut map) = ipld_owned { - map.remove(key) - } else { - None - } + pub fn get(&self, path: IpldPath) -> FutureObj<'static, Result, IpldError>> { + let block_store = self.repo.block_store.clone(); + FutureObj::new(Box::new(async move { + let mut ipld = match await!(block_store.get(path.root()))? { + Some(block) => Some(Ipld::from(&block)?), + None => None, + }; + for sub_path in path.iter() { + if ipld.is_none() { + return Ok(None); } - SubPath::Index(index) => { - if let Ipld::Array(mut vec) = ipld_owned { - if *index < vec.len() { - Some(vec.swap_remove(*index)) + let ipld_owned = ipld.take().unwrap(); + let new_ipld = match sub_path { + SubPath::Key(ref key) => { + if let Ipld::Object(mut map) = ipld_owned { + map.remove(key) } else { None } - } else { - None } - } - }; - ipld = match new_ipld { - Some(Ipld::Cid(ref cid)) => self.get_ipld(cid)?, - _ => new_ipld, - }; - } - Ok(ipld) + SubPath::Index(index) => { + if let Ipld::Array(mut vec) = ipld_owned { + if *index < vec.len() { + Some(vec.swap_remove(*index)) + } else { + None + } + } else { + None + } + } + }; + ipld = match new_ipld { + Some(Ipld::Cid(cid)) => { + match await!(block_store.get(cid))? { + Some(block) => Some(Ipld::from(&block)?), + None => None, + } + } + _ => new_ipld, + }; + } + Ok(ipld) + })) } } #[cfg(test)] mod tests { use super::*; + use crate::repo::tests::create_mock_repo; + use std::collections::HashMap; #[test] fn test_resolve_root_cid() { - let mut dag = IpldDag::new(); - let data = Ipld::Array(vec![Ipld::U64(1), Ipld::U64(2), Ipld::U64(3)]); - let cid = dag.put(&data).unwrap(); + tokio::run_async(async { + let repo = create_mock_repo(); + let dag = IpldDag::new(repo); + let data = Ipld::Array(vec![Ipld::U64(1), Ipld::U64(2), Ipld::U64(3)]); + let cid = await!(dag.put(data.clone())).unwrap(); - let path = IpldPath::new(cid); - let res = dag.get(&path).unwrap(); - assert_eq!(res, Some(data)); + let path = IpldPath::new(cid); + let res = await!(dag.get(path)).unwrap(); + assert_eq!(res, Some(data)); + + }); } #[test] fn test_resolve_array_elem() { - let mut dag = IpldDag::new(); - let data = vec![1, 2, 3].into(); - let cid = dag.put(&data).unwrap(); + tokio::run_async(async { + let repo = create_mock_repo(); + let dag = IpldDag::new(repo); + let data: Ipld = vec![1, 2, 3].into(); + let cid = await!(dag.put(data.clone())).unwrap(); - let path = IpldPath::from(cid, "1").unwrap(); - let res = dag.get(&path).unwrap(); - assert_eq!(res, Some(Ipld::U64(2))); + let path = IpldPath::from(cid, "1").unwrap(); + let res = await!(dag.get(path)).unwrap(); + assert_eq!(res, Some(Ipld::U64(2))); + }); } #[test] fn test_resolve_nested_array_elem() { - let mut dag = IpldDag::new(); - let data = Ipld::Array(vec![Ipld::U64(1), Ipld::Array(vec![Ipld::U64(2)]), Ipld::U64(3)]); - let cid = dag.put(&data).unwrap(); + tokio::run_async(async { + let repo = create_mock_repo(); + let dag = IpldDag::new(repo); + let data = Ipld::Array(vec![Ipld::U64(1), Ipld::Array(vec![Ipld::U64(2)]), Ipld::U64(3)]); + let cid = await!(dag.put(data.clone())).unwrap(); - let path = IpldPath::from(cid, "1/0").unwrap(); - let res = dag.get(&path).unwrap(); - assert_eq!(res, Some(Ipld::U64(2))); + let path = IpldPath::from(cid, "1/0").unwrap(); + let res = await!(dag.get(path)).unwrap(); + assert_eq!(res, Some(Ipld::U64(2))); + }); } #[test] fn test_resolve_object_elem() { - let mut dag = IpldDag::new(); - let mut data = HashMap::new(); - data.insert("key", false); - let cid = dag.put(&data.into()).unwrap(); + tokio::run_async(async { + let repo = create_mock_repo(); + let dag = IpldDag::new(repo); + let mut data = HashMap::new(); + data.insert("key", false); + let cid = await!(dag.put(data.into())).unwrap(); - let path = IpldPath::from(cid, "key").unwrap(); - let res = dag.get(&path).unwrap(); - assert_eq!(res, Some(Ipld::Bool(false))); + let path = IpldPath::from(cid, "key").unwrap(); + let res = await!(dag.get(path)).unwrap(); + assert_eq!(res, Some(Ipld::Bool(false))); + }); } #[test] fn test_resolve_cid_elem() { - let mut dag = IpldDag::new(); - let data1 = vec![1].into(); - let cid1 = dag.put(&data1).unwrap(); - let data2 = vec![cid1].into(); - let cid2 = dag.put(&data2).unwrap(); + tokio::run_async(async { + let repo = create_mock_repo(); + let dag = IpldDag::new(repo); + let data1 = vec![1].into(); + let cid1 = await!(dag.put(data1)).unwrap(); + let data2 = vec![cid1].into(); + let cid2 = await!(dag.put(data2)).unwrap(); - let path = IpldPath::from(cid2, "0/0").unwrap(); - let res = dag.get(&path).unwrap(); - assert_eq!(res, Some(Ipld::U64(1))); + let path = IpldPath::from(cid2, "0/0").unwrap(); + let res = await!(dag.get(path)).unwrap(); + assert_eq!(res, Some(Ipld::U64(1))); + }); } } diff --git a/src/ipld/error.rs b/src/ipld/error.rs index 3c97329e..8fcb6699 100644 --- a/src/ipld/error.rs +++ b/src/ipld/error.rs @@ -8,6 +8,7 @@ pub enum IpldError { UnsupportedCodec(Codec), CodecError(Box), InvalidPath(String), + IoError(std::io::Error), } pub trait CodecError: Display + Debug + Error {} @@ -18,6 +19,7 @@ impl Error for IpldError { IpldError::UnsupportedCodec(_) => "unsupported codec", IpldError::CodecError(_) => "codec error", IpldError::InvalidPath(_) => "invalid path", + IpldError::IoError(_) => "io error", } } } @@ -34,6 +36,9 @@ impl Display for IpldError { IpldError::InvalidPath(ref path) => { write!(f, "Invalid path {:?}", path) } + IpldError::IoError(ref err) => { + write!(f, "{}", err) + } } } } @@ -46,3 +51,9 @@ impl From for IpldError { impl CodecError for CidError {} impl CodecError for CborError {} + +impl From for IpldError { + fn from(error: std::io::Error) -> Self { + IpldError::IoError(error) + } +} diff --git a/src/lib.rs b/src/lib.rs index d45b7044..2351f6a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,15 +25,17 @@ pub mod repo; pub use self::block::{Block, Cid}; use self::config::ConfigFile; use self::future::BlockFuture; +use self::ipld::IpldDag; +pub use self::ipld::{Ipld, IpldError, IpldPath}; pub use self::p2p::SwarmTypes; use self::p2p::{create_swarm, SwarmOptions, TSwarm}; pub use self::repo::RepoTypes; use self::repo::{create_repo, RepoOptions, Repo, BlockStore}; -const IPFS_LOG: &str = "info"; -const IPFS_PATH: &str = ".rust-ipfs"; -const XDG_APP_NAME: &str = "rust-ipfs"; -const CONFIG_FILE: &str = "config.json"; +static IPFS_LOG: &str = "info"; +static IPFS_PATH: &str = ".rust-ipfs"; +static XDG_APP_NAME: &str = "rust-ipfs"; +static CONFIG_FILE: &str = "config.json"; /// Default IPFS types. #[derive(Clone)] @@ -102,6 +104,7 @@ impl IpfsOptions { /// for interacting with IPFS. pub struct Ipfs { repo: Repo, + dag: IpldDag, swarm: Option>, events: Arc>>, } @@ -119,9 +122,11 @@ impl Ipfs { let repo = create_repo(repo_options); let swarm_options = SwarmOptions::::from(&options); let swarm = create_swarm(swarm_options, repo.clone()); + let dag = IpldDag::new(repo.clone()); Ipfs { repo, + dag, swarm: Some(swarm), events: Arc::new(Mutex::new(VecDeque::new())), } @@ -166,6 +171,16 @@ impl Ipfs { self.repo.block_store.remove(cid) } + /// Puts an ipld dag node into the ipfs repo. + pub fn put_dag(&self, ipld: Ipld) -> FutureObj<'static, Result> { + self.dag.put(ipld) + } + + /// Gets an ipld dag node from the ipfs repo. + pub fn get_dag(&self, path: IpldPath) -> FutureObj<'static, Result, IpldError>> { + self.dag.get(path) + } + /// Start daemon. pub fn start_daemon(&mut self) -> IpfsFuture { let events = self.events.clone(); diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 740a3d3f..89253ee1 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; pub mod mem; pub mod fs; -pub trait RepoTypes: Clone + Send + 'static { +pub trait RepoTypes: Clone + Send + Sync + 'static { type TBlockStore: BlockStore; type TDataStore: DataStore; } @@ -33,7 +33,7 @@ pub fn create_repo(options: RepoOptions) -> R Repo::new(options) } -pub trait BlockStore: Clone + Send + Unpin + 'static { +pub trait BlockStore: Clone + Send + Sync + Unpin + 'static { fn new(path: PathBuf) -> Self; fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>>; fn open(&self) -> FutureObj<'static, Result<(), std::io::Error>>; @@ -43,7 +43,7 @@ pub trait BlockStore: Clone + Send + Unpin + 'static { fn remove(&self, cid: Cid) -> FutureObj<'static, Result<(), std::io::Error>>; } -pub trait DataStore: Clone + Send + Unpin + 'static { +pub trait DataStore: Clone + Send + Sync + Unpin + 'static { fn new(path: PathBuf) -> Self; fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>>; } @@ -89,19 +89,29 @@ impl Repo { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use futures::prelude::*; use std::env::temp_dir; #[derive(Clone)] - struct Types; + pub struct Types; impl RepoTypes for Types { type TBlockStore = mem::MemBlockStore; type TDataStore = mem::MemDataStore; } + pub fn create_mock_repo() -> Repo { + let mut tmp = temp_dir(); + tmp.push("rust-ipfs-repo"); + let options: RepoOptions = RepoOptions { + _marker: PhantomData, + path: tmp, + }; + Repo::new(options) + } + #[test] fn test_repo() { let mut tmp = temp_dir();