repo: init and put return a result.

This commit is contained in:
David Craven 2019-02-22 13:00:28 +01:00
parent 999f986a82
commit 1adf81f4ff
No known key found for this signature in database
GPG Key ID: DF438712EA50DBB1
7 changed files with 37 additions and 33 deletions

View File

@ -7,7 +7,7 @@ use ipfs::{Block, Ipfs, IpfsOptions, RepoTypes, SwarmTypes, IpfsTypes};
struct Types;
impl RepoTypes for Types {
type TBlockStore = ipfs::repo::mem::MemBlockStore;
type TBlockStore = ipfs::repo::fs::FsBlockStore;
type TDataStore = ipfs::repo::mem::MemDataStore;
}
@ -27,7 +27,8 @@ fn main() {
tokio::run(FutureObj::new(Box::new(async move {
tokio::spawn(ipfs.start_daemon().compat());
await!(ipfs.put_block(block));
await!(ipfs.init_repo()).unwrap();
await!(ipfs.put_block(block)).unwrap();
let block = await!(ipfs.get_block(cid));
println!("Received block with contents: {:?}",
String::from_utf8_lossy(&block.data()));

View File

@ -13,7 +13,7 @@ fn main() {
tokio::run(FutureObj::new(Box::new(async move {
tokio::spawn(ipfs.start_daemon().compat());
await!(ipfs.put_block(block));
await!(ipfs.put_block(block)).unwrap();
let block = await!(ipfs.get_block(cid));
println!("Received block with contents: {:?}",
String::from_utf8_lossy(&block.data()));

View File

@ -61,7 +61,7 @@ impl<TRepoTypes: RepoTypes> Strategy<TRepoTypes> for AltruisticStrategy<TRepoTyp
source.to_base58());
let future = self.repo.block_store.put(block);
tokio::spawn(FutureObj::new(Box::new(async move {
await!(future);
await!(future).unwrap();
Ok(())
})).compat());
}

View File

@ -128,18 +128,18 @@ impl<Types: IpfsTypes> Ipfs<Types> {
}
/// Initialize the ipfs repo.
pub fn init_repo(&mut self) -> FutureObj<'static, ()> {
pub fn init_repo(&mut self) -> FutureObj<'static, Result<(), std::io::Error>> {
self.repo.init()
}
/// Puts a block into the ipfs repo.
pub fn put_block(&mut self, block: Block) -> FutureObj<'static, Cid> {
pub fn put_block(&mut self, block: Block) -> FutureObj<'static, Result<Cid, std::io::Error>> {
let events = self.events.clone();
let block_store = self.repo.block_store.clone();
FutureObj::new(Box::new(async move {
let cid = await!(block_store.put(block));
let cid = await!(block_store.put(block))?;
events.lock().unwrap().push_back(IpfsEvent::ProvideBlock(cid.clone()));
cid
Ok(cid)
}))
}

View File

@ -3,9 +3,10 @@ use crate::block::{Cid, Block};
use crate::repo::{BlockStore, DataStore};
use std::collections::HashSet;
use futures::future::FutureObj;
use futures::compat::*;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
//use tokio::fs;
use tokio::fs;
#[derive(Clone, Debug)]
pub struct FsBlockStore {
@ -22,9 +23,8 @@ impl BlockStore for FsBlockStore {
}
}
fn init(&self) -> FutureObj<'static, ()> {
//FutureObj::new(Box::new(fs::create_dir_all(self.path.clone())))
FutureObj::new(Box::new(futures::future::ready(()))
fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>> {
FutureObj::new(Box::new(fs::create_dir_all(self.path.clone()).compat()))
}
// TODO open
@ -39,18 +39,16 @@ impl BlockStore for FsBlockStore {
FutureObj::new(Box::new(futures::future::ready(None)))
}
fn put(&self, block: Block) -> FutureObj<'static, Cid> {
let _path = block_path(self.path.clone(), &block);
FutureObj::new(Box::new(futures::future::ready(block.cid())))
/*
fs::File::open(path)
.and_then(|file| {
tokio::io::write_all(file, *block.data())
}).map(|_| {
self.cids.lock().unwrap().insert(block.cid());
block.cid()
})
)*/
fn put(&self, block: Block) -> FutureObj<'static, Result<Cid, std::io::Error>> {
let path = block_path(self.path.clone(), &block);
let cids = self.cids.clone();
FutureObj::new(Box::new(async move {
let file = await!(fs::File::create(path).compat())?;
let data = block.data();
await!(tokio::io::write_all(file, &*data).compat())?;
cids.lock().unwrap().insert(block.cid());
Ok(block.cid())
}))
}
fn remove(&self, _cid: Cid) -> FutureObj<'static, ()> {

View File

@ -30,11 +30,11 @@ impl BlockStore for MemBlockStore {
FutureObj::new(Box::new(futures::future::ready(block)))
}
fn put(&self, block: Block) -> FutureObj<'static, Cid> {
fn put(&self, block: Block) -> FutureObj<'static, Result<Cid, std::io::Error>> {
let cid = block.cid();
self.blocks.lock().unwrap()
.insert(cid.clone(), block);
FutureObj::new(Box::new(futures::future::ready(cid)))
FutureObj::new(Box::new(futures::future::ok(cid)))
}
fn remove(&self, cid: Cid) -> FutureObj<'static, ()> {

View File

@ -38,19 +38,19 @@ pub fn create_repo<TRepoTypes: RepoTypes>(options: RepoOptions<TRepoTypes>) -> R
pub trait BlockStore: Clone + Send + Unpin + 'static {
fn new(path: PathBuf) -> Self;
fn init(&self) -> FutureObj<'static, ()> {
FutureObj::new(Box::new(futures::future::ready(())))
fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>> {
FutureObj::new(Box::new(futures::future::ok(())))
}
fn contains(&self, cid: Cid) -> FutureObj<'static, bool>;
fn get(&self, cid: Cid) -> FutureObj<'static, Option<Block>>;
fn put(&self, block: Block) -> FutureObj<'static, Cid>;
fn put(&self, block: Block) -> FutureObj<'static, Result<Cid, std::io::Error>>;
fn remove(&self, cid: Cid) -> FutureObj<'static, ()>;
}
pub trait DataStore: Clone + Send + Unpin + 'static {
fn new(path: PathBuf) -> Self;
fn init(&self) -> FutureObj<'static, ()> {
FutureObj::new(Box::new(futures::future::ready(())))
fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>> {
FutureObj::new(Box::new(futures::future::ok(())))
}
}
@ -68,13 +68,18 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
}
}
pub fn init(&self) -> FutureObj<'static, ()> {
pub fn init(&self) -> FutureObj<'static, Result<(), std::io::Error>> {
let block_store = self.block_store.clone();
let data_store = self.data_store.clone();
FutureObj::new(Box::new(async move {
let f1 = block_store.init();
let f2 = data_store.init();
join!(f1, f2);
let (r1, r2) = join!(f1, f2);
if r1.is_err() {
r1
} else {
r2
}
}))
}
}