refactor: better code style and use OnceCell to init sled db
This commit is contained in:
parent
81f2f24494
commit
d5ee450797
5
Cargo.lock
generated
5
Cargo.lock
generated
@ -1272,6 +1272,7 @@ dependencies = [
|
||||
"libp2p",
|
||||
"multibase",
|
||||
"multihash",
|
||||
"once_cell",
|
||||
"prost",
|
||||
"prost-build",
|
||||
"rand 0.7.3",
|
||||
@ -1937,9 +1938,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.4.1"
|
||||
version = "1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad"
|
||||
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
|
||||
|
||||
[[package]]
|
||||
name = "oorandom"
|
||||
|
@ -10,6 +10,7 @@ version = "0.2.1"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
sled_data_store = []
|
||||
test_go_interop = []
|
||||
test_js_interop = []
|
||||
|
||||
@ -43,6 +44,7 @@ void = { default-features = false, version = "1.0" }
|
||||
fs2 = "0.4.3"
|
||||
tempfile = "3.1.0"
|
||||
sled = "0.34"
|
||||
once_cell = "1.5.2"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
# required for DNS resolution
|
||||
|
@ -103,7 +103,10 @@ impl<T: RepoTypes> IpfsTypes for T {}
|
||||
pub struct Types;
|
||||
impl RepoTypes for Types {
|
||||
type TBlockStore = repo::fs::FsBlockStore;
|
||||
#[cfg(feature = "sled_data_store")]
|
||||
type TDataStore = repo::kv::KvDataStore;
|
||||
#[cfg(not(feature = "sled_data_store"))]
|
||||
type TDataStore = repo::fs::FsDataStore;
|
||||
type TLock = repo::fs::FsLock;
|
||||
}
|
||||
|
||||
|
@ -4,17 +4,19 @@ use crate::repo::{PinKind, PinMode, PinStore, References};
|
||||
use async_trait::async_trait;
|
||||
use cid::{self, Cid};
|
||||
use futures::stream::{StreamExt, TryStreamExt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use sled::{self, Config as DbConfig, Db, Mode as DbMode};
|
||||
use std::collections::BTreeSet;
|
||||
use std::convert::Into;
|
||||
use std::path::PathBuf;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Mutex;
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct KvDataStore {
|
||||
path: PathBuf,
|
||||
db: Mutex<Option<Db>>,
|
||||
// it is a trick for not modifying the Data:init
|
||||
db: OnceCell<Db>,
|
||||
}
|
||||
|
||||
impl KvDataStore {
|
||||
@ -41,8 +43,8 @@ impl KvDataStore {
|
||||
Ok(db.apply_batch(batch)?)
|
||||
}
|
||||
|
||||
fn get_db(&self) -> Db {
|
||||
self.db.lock().unwrap().as_ref().unwrap().clone()
|
||||
fn get_db(&self) -> &Db {
|
||||
self.db.get().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@ -63,11 +65,10 @@ impl DataStore for KvDataStore {
|
||||
.path(self.path.as_path())
|
||||
.open()?;
|
||||
|
||||
let mut g = self.db.lock().unwrap();
|
||||
assert!(g.is_none());
|
||||
*g = Some(db);
|
||||
|
||||
Ok(())
|
||||
match self.db.set(db) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => Err(anyhow::anyhow!("failed to init sled")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn open(&self) -> Result<(), Error> {
|
||||
@ -118,7 +119,7 @@ impl PinStore for KvDataStore {
|
||||
let pin_key = get_pin_key(target, &PinMode::Indirect);
|
||||
batch.remove(pin_key.as_str());
|
||||
}
|
||||
_ => {}
|
||||
None => {}
|
||||
}
|
||||
|
||||
let direct_key = get_pin_key(target, &PinMode::Direct);
|
||||
@ -133,9 +134,7 @@ impl PinStore for KvDataStore {
|
||||
target: &Cid,
|
||||
referenced: References<'_>,
|
||||
) -> Result<(), Error> {
|
||||
let set = referenced
|
||||
.try_collect::<std::collections::BTreeSet<_>>()
|
||||
.await?;
|
||||
let set = referenced.try_collect::<BTreeSet<_>>().await?;
|
||||
|
||||
let mut batch = sled::Batch::default();
|
||||
let already_pinned = get_pinned_mode(self, target)?;
|
||||
@ -146,7 +145,7 @@ impl PinStore for KvDataStore {
|
||||
let key = get_pin_key(target, &mode);
|
||||
batch.remove(key.as_str());
|
||||
}
|
||||
_ => {}
|
||||
None => {}
|
||||
}
|
||||
|
||||
let recursive_key = get_pin_key(target, &PinMode::Recursive);
|
||||
@ -183,14 +182,12 @@ impl PinStore for KvDataStore {
|
||||
target: &Cid,
|
||||
referenced: References<'_>,
|
||||
) -> Result<(), Error> {
|
||||
let set = referenced
|
||||
.try_collect::<std::collections::BTreeSet<_>>()
|
||||
.await?;
|
||||
|
||||
if is_not_pinned_or_pinned_indirectly(self, target)? {
|
||||
return Err(anyhow::anyhow!("not pinned or pinned indirectly"));
|
||||
}
|
||||
|
||||
let set = referenced.try_collect::<BTreeSet<_>>().await?;
|
||||
|
||||
let mut batch = sled::Batch::default();
|
||||
|
||||
let recursive_key = get_pin_key(target, &PinMode::Recursive);
|
||||
@ -206,7 +203,7 @@ impl PinStore for KvDataStore {
|
||||
let indirect_key = get_pin_key(cid, &PinMode::Indirect);
|
||||
batch.remove(indirect_key.as_str());
|
||||
}
|
||||
_ => {}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,19 +223,15 @@ impl PinStore for KvDataStore {
|
||||
let iter = db.range(min_key..);
|
||||
let mut all_keys: Vec<String> = vec![];
|
||||
|
||||
for item in iter {
|
||||
if item.is_err() {
|
||||
continue;
|
||||
}
|
||||
|
||||
for item in iter.filter(|item| item.is_ok()) {
|
||||
let (raw_key, _) = item.unwrap();
|
||||
let key = String::from(String::from_utf8_lossy(raw_key.as_ref()));
|
||||
let key = String::from_utf8_lossy(raw_key.as_ref());
|
||||
|
||||
if !key.starts_with("pin.") {
|
||||
continue;
|
||||
}
|
||||
|
||||
all_keys.push(key);
|
||||
all_keys.push(key.to_owned().to_string());
|
||||
}
|
||||
|
||||
let st = async_stream::try_stream! {
|
||||
@ -318,19 +311,19 @@ impl PinStore for KvDataStore {
|
||||
id.clone(),
|
||||
PinKind::IndirectFrom(indirect_from_cid),
|
||||
)),
|
||||
_ => {
|
||||
Err(_) => {
|
||||
warn!("invalid indirect from cid of {}", id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Ok(None) => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Ok(None) => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
@ -348,7 +341,7 @@ fn pin_mode_literal(pin_mode: &PinMode) -> &'static str {
|
||||
}
|
||||
|
||||
fn get_pin_key(cid: &Cid, pin_mode: &PinMode) -> String {
|
||||
format!("pin.{}.{}", pin_mode_literal(pin_mode), cid.to_string())
|
||||
format!("pin.{}.{}", pin_mode_literal(pin_mode), cid)
|
||||
}
|
||||
|
||||
fn get_pinned_mode(kv_db: &KvDataStore, block: &Cid) -> Result<Option<PinMode>, Error> {
|
||||
@ -359,7 +352,7 @@ fn get_pinned_mode(kv_db: &KvDataStore, block: &Cid) -> Result<Option<PinMode>,
|
||||
|
||||
match db.get(key.as_str()) {
|
||||
Ok(Some(_)) => return Ok(Some(mode.clone())),
|
||||
Ok(_) => {}
|
||||
Ok(None) => {}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
@ -370,7 +363,7 @@ fn get_pinned_mode(kv_db: &KvDataStore, block: &Cid) -> Result<Option<PinMode>,
|
||||
fn is_pinned(db: &KvDataStore, block: &Cid) -> Result<bool, Error> {
|
||||
match get_pinned_mode(db, block) {
|
||||
Ok(Some(_)) => Ok(true),
|
||||
Ok(_) => Ok(false),
|
||||
Ok(None) => Ok(false),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user