Hook up IpldDag, currently only uses local blocks.

This commit is contained in:
David Craven 2019-02-25 16:49:25 +01:00
parent 7eeed01f85
commit ac36e50fe8
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
6 changed files with 162 additions and 91 deletions

11
Cargo.lock generated
View File

@ -2008,6 +2008,7 @@ dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-async-await 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2022,6 +2023,15 @@ dependencies = [
"tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-async-await"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-codec"
version = "0.1.1"
@ -2609,6 +2619,7 @@ dependencies = [
"checksum tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e9175261fbdb60781fcd388a4d6cc7e14764a2b629a7ad94abb439aed223a44f"
"checksum tk-listen 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5462b0f968c0457efe38fcd2df7e487096b992419e4f5337b06775a614bbda4b"
"checksum tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "e0500b88064f08bebddd0c0bed39e19f5c567a5f30975bee52b0c0d3e2eeb38c"
"checksum tokio-async-await 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "8221e9db7675e145fd4ecf5d8e017be9f198fb57ce1d11c7c1b6d7d9d73181bc"
"checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f"
"checksum tokio-current-thread 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "331c8acc267855ec06eb0c94618dcbbfea45bed2d20b77252940095273fb58f6"
"checksum tokio-dns-unofficial 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "82c65483db54eb91b4ef3a9389a3364558590faf30ce473141707c0e16fda975"

View File

@ -22,5 +22,5 @@ rustc-serialize = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio = "*"
tokio = { version = "*", features = ["async-await-preview"] }
xdg = "*"

View File

@ -1,128 +1,152 @@
use crate::block::{Cid, Block};
use crate::block::Cid;
use crate::ipld::{Ipld, IpldError, IpldPath, SubPath};
use std::collections::HashMap;
use crate::repo::{Repo, RepoTypes, BlockStore};
use futures::future::FutureObj;
pub struct IpldDag {
blocks: HashMap<Cid, Block>,
pub struct IpldDag<Types: RepoTypes> {
repo: Repo<Types>,
}
impl IpldDag {
pub fn new() -> Self {
impl<Types: RepoTypes> IpldDag<Types> {
pub fn new(repo: Repo<Types>) -> Self {
IpldDag {
blocks: HashMap::new(),
repo,
}
}
pub fn put(&mut self, data: &Ipld) -> Result<Cid, IpldError> {
let block = data.to_dag_cbor()?;
let cid = block.cid();
self.blocks.insert(block.cid(), block);
Ok(cid)
pub fn put(&self, data: Ipld) -> FutureObj<'static, Result<Cid, IpldError>> {
let block_store = self.repo.block_store.clone();
FutureObj::new(Box::new(async move {
let block = data.to_dag_cbor()?;
let cid = await!(block_store.put(block))?;
Ok(cid)
}))
}
fn get_ipld(&self, cid: &Cid) -> Result<Option<Ipld>, IpldError> {
let block = self.blocks.get(cid);
match block {
Some(block) => Ok(Some(Ipld::from(block)?)),
None => Ok(None),
}
}
pub fn get(&self, path: &IpldPath) -> Result<Option<Ipld>, IpldError> {
let mut ipld = self.get_ipld(&path.root())?;
for sub_path in path.iter() {
if ipld.is_none() {
return Ok(None);
}
let ipld_owned = ipld.take().unwrap();
let new_ipld = match sub_path {
SubPath::Key(ref key) => {
if let Ipld::Object(mut map) = ipld_owned {
map.remove(key)
} else {
None
}
pub fn get(&self, path: IpldPath) -> FutureObj<'static, Result<Option<Ipld>, IpldError>> {
let block_store = self.repo.block_store.clone();
FutureObj::new(Box::new(async move {
let mut ipld = match await!(block_store.get(path.root()))? {
Some(block) => Some(Ipld::from(&block)?),
None => None,
};
for sub_path in path.iter() {
if ipld.is_none() {
return Ok(None);
}
SubPath::Index(index) => {
if let Ipld::Array(mut vec) = ipld_owned {
if *index < vec.len() {
Some(vec.swap_remove(*index))
let ipld_owned = ipld.take().unwrap();
let new_ipld = match sub_path {
SubPath::Key(ref key) => {
if let Ipld::Object(mut map) = ipld_owned {
map.remove(key)
} else {
None
}
} else {
None
}
}
};
ipld = match new_ipld {
Some(Ipld::Cid(ref cid)) => self.get_ipld(cid)?,
_ => new_ipld,
};
}
Ok(ipld)
SubPath::Index(index) => {
if let Ipld::Array(mut vec) = ipld_owned {
if *index < vec.len() {
Some(vec.swap_remove(*index))
} else {
None
}
} else {
None
}
}
};
ipld = match new_ipld {
Some(Ipld::Cid(cid)) => {
match await!(block_store.get(cid))? {
Some(block) => Some(Ipld::from(&block)?),
None => None,
}
}
_ => new_ipld,
};
}
Ok(ipld)
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::repo::tests::create_mock_repo;
use std::collections::HashMap;
#[test]
fn test_resolve_root_cid() {
let mut dag = IpldDag::new();
let data = Ipld::Array(vec![Ipld::U64(1), Ipld::U64(2), Ipld::U64(3)]);
let cid = dag.put(&data).unwrap();
tokio::run_async(async {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let data = Ipld::Array(vec![Ipld::U64(1), Ipld::U64(2), Ipld::U64(3)]);
let cid = await!(dag.put(data.clone())).unwrap();
let path = IpldPath::new(cid);
let res = dag.get(&path).unwrap();
assert_eq!(res, Some(data));
let path = IpldPath::new(cid);
let res = await!(dag.get(path)).unwrap();
assert_eq!(res, Some(data));
});
}
#[test]
fn test_resolve_array_elem() {
let mut dag = IpldDag::new();
let data = vec![1, 2, 3].into();
let cid = dag.put(&data).unwrap();
tokio::run_async(async {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let data: Ipld = vec![1, 2, 3].into();
let cid = await!(dag.put(data.clone())).unwrap();
let path = IpldPath::from(cid, "1").unwrap();
let res = dag.get(&path).unwrap();
assert_eq!(res, Some(Ipld::U64(2)));
let path = IpldPath::from(cid, "1").unwrap();
let res = await!(dag.get(path)).unwrap();
assert_eq!(res, Some(Ipld::U64(2)));
});
}
#[test]
fn test_resolve_nested_array_elem() {
let mut dag = IpldDag::new();
let data = Ipld::Array(vec![Ipld::U64(1), Ipld::Array(vec![Ipld::U64(2)]), Ipld::U64(3)]);
let cid = dag.put(&data).unwrap();
tokio::run_async(async {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let data = Ipld::Array(vec![Ipld::U64(1), Ipld::Array(vec![Ipld::U64(2)]), Ipld::U64(3)]);
let cid = await!(dag.put(data.clone())).unwrap();
let path = IpldPath::from(cid, "1/0").unwrap();
let res = dag.get(&path).unwrap();
assert_eq!(res, Some(Ipld::U64(2)));
let path = IpldPath::from(cid, "1/0").unwrap();
let res = await!(dag.get(path)).unwrap();
assert_eq!(res, Some(Ipld::U64(2)));
});
}
#[test]
fn test_resolve_object_elem() {
let mut dag = IpldDag::new();
let mut data = HashMap::new();
data.insert("key", false);
let cid = dag.put(&data.into()).unwrap();
tokio::run_async(async {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let mut data = HashMap::new();
data.insert("key", false);
let cid = await!(dag.put(data.into())).unwrap();
let path = IpldPath::from(cid, "key").unwrap();
let res = dag.get(&path).unwrap();
assert_eq!(res, Some(Ipld::Bool(false)));
let path = IpldPath::from(cid, "key").unwrap();
let res = await!(dag.get(path)).unwrap();
assert_eq!(res, Some(Ipld::Bool(false)));
});
}
#[test]
fn test_resolve_cid_elem() {
let mut dag = IpldDag::new();
let data1 = vec![1].into();
let cid1 = dag.put(&data1).unwrap();
let data2 = vec![cid1].into();
let cid2 = dag.put(&data2).unwrap();
tokio::run_async(async {
let repo = create_mock_repo();
let dag = IpldDag::new(repo);
let data1 = vec![1].into();
let cid1 = await!(dag.put(data1)).unwrap();
let data2 = vec![cid1].into();
let cid2 = await!(dag.put(data2)).unwrap();
let path = IpldPath::from(cid2, "0/0").unwrap();
let res = dag.get(&path).unwrap();
assert_eq!(res, Some(Ipld::U64(1)));
let path = IpldPath::from(cid2, "0/0").unwrap();
let res = await!(dag.get(path)).unwrap();
assert_eq!(res, Some(Ipld::U64(1)));
});
}
}

View File

@ -8,6 +8,7 @@ pub enum IpldError {
UnsupportedCodec(Codec),
CodecError(Box<dyn CodecError>),
InvalidPath(String),
IoError(std::io::Error),
}
pub trait CodecError: Display + Debug + Error {}
@ -18,6 +19,7 @@ impl Error for IpldError {
IpldError::UnsupportedCodec(_) => "unsupported codec",
IpldError::CodecError(_) => "codec error",
IpldError::InvalidPath(_) => "invalid path",
IpldError::IoError(_) => "io error",
}
}
}
@ -34,6 +36,9 @@ impl Display for IpldError {
IpldError::InvalidPath(ref path) => {
write!(f, "Invalid path {:?}", path)
}
IpldError::IoError(ref err) => {
write!(f, "{}", err)
}
}
}
}
@ -46,3 +51,9 @@ impl<T: CodecError + 'static> From<T> for IpldError {
impl CodecError for CidError {}
impl CodecError for CborError {}
impl From<std::io::Error> for IpldError {
fn from(error: std::io::Error) -> Self {
IpldError::IoError(error)
}
}

View File

@ -25,15 +25,17 @@ pub mod repo;
pub use self::block::{Block, Cid};
use self::config::ConfigFile;
use self::future::BlockFuture;
use self::ipld::IpldDag;
pub use self::ipld::{Ipld, IpldError, IpldPath};
pub use self::p2p::SwarmTypes;
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
pub use self::repo::RepoTypes;
use self::repo::{create_repo, RepoOptions, Repo, BlockStore};
const IPFS_LOG: &str = "info";
const IPFS_PATH: &str = ".rust-ipfs";
const XDG_APP_NAME: &str = "rust-ipfs";
const CONFIG_FILE: &str = "config.json";
static IPFS_LOG: &str = "info";
static IPFS_PATH: &str = ".rust-ipfs";
static XDG_APP_NAME: &str = "rust-ipfs";
static CONFIG_FILE: &str = "config.json";
/// Default IPFS types.
#[derive(Clone)]
@ -102,6 +104,7 @@ impl IpfsOptions {
/// for interacting with IPFS.
pub struct Ipfs<Types: IpfsTypes> {
repo: Repo<Types>,
dag: IpldDag<Types>,
swarm: Option<TSwarm<Types>>,
events: Arc<Mutex<VecDeque<IpfsEvent>>>,
}
@ -119,9 +122,11 @@ impl<Types: IpfsTypes> Ipfs<Types> {
let repo = create_repo(repo_options);
let swarm_options = SwarmOptions::<Types>::from(&options);
let swarm = create_swarm(swarm_options, repo.clone());
let dag = IpldDag::new(repo.clone());
Ipfs {
repo,
dag,
swarm: Some(swarm),
events: Arc::new(Mutex::new(VecDeque::new())),
}
@ -166,6 +171,16 @@ impl<Types: IpfsTypes> Ipfs<Types> {
self.repo.block_store.remove(cid)
}
/// Puts an ipld dag node into the ipfs repo.
pub fn put_dag(&self, ipld: Ipld) -> FutureObj<'static, Result<Cid, IpldError>> {
self.dag.put(ipld)
}
/// Gets an ipld dag node from the ipfs repo.
pub fn get_dag(&self, path: IpldPath) -> FutureObj<'static, Result<Option<Ipld>, IpldError>> {
self.dag.get(path)
}
/// Start daemon.
pub fn start_daemon(&mut self) -> IpfsFuture<Types> {
let events = self.events.clone();

View File

@ -9,7 +9,7 @@ use std::path::PathBuf;
pub mod mem;
pub mod fs;
pub trait RepoTypes: Clone + Send + 'static {
pub trait RepoTypes: Clone + Send + Sync + 'static {
type TBlockStore: BlockStore;
type TDataStore: DataStore;
}
@ -33,7 +33,7 @@ pub fn create_repo<TRepoTypes: RepoTypes>(options: RepoOptions<TRepoTypes>) -> R
Repo::new(options)
}
pub trait BlockStore: Clone + Send + Unpin + 'static {
pub trait BlockStore: Clone + Send + Sync + Unpin + 'static {
fn new(path: PathBuf) -> Self;
fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>>;
fn open(&self) -> FutureObj<'static, Result<(), std::io::Error>>;
@ -43,7 +43,7 @@ pub trait BlockStore: Clone + Send + Unpin + 'static {
fn remove(&self, cid: Cid) -> FutureObj<'static, Result<(), std::io::Error>>;
}
pub trait DataStore: Clone + Send + Unpin + 'static {
pub trait DataStore: Clone + Send + Sync + Unpin + 'static {
fn new(path: PathBuf) -> Self;
fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>>;
}
@ -89,19 +89,29 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use futures::prelude::*;
use std::env::temp_dir;
#[derive(Clone)]
struct Types;
pub struct Types;
impl RepoTypes for Types {
type TBlockStore = mem::MemBlockStore;
type TDataStore = mem::MemDataStore;
}
pub fn create_mock_repo() -> Repo<Types> {
let mut tmp = temp_dir();
tmp.push("rust-ipfs-repo");
let options: RepoOptions<Types> = RepoOptions {
_marker: PhantomData,
path: tmp,
};
Repo::new(options)
}
#[test]
fn test_repo() {
let mut tmp = temp_dir();