refactor: move PinDocument and friends to repo::mem
This commit is contained in:
parent
ab4d9f1f52
commit
827dc1b35b
227
src/repo/mem.rs
227
src/repo/mem.rs
@ -1,11 +1,10 @@
|
||||
//! Volatile memory backed repo
|
||||
use crate::error::Error;
|
||||
use crate::repo::{
|
||||
BlockPut, BlockStore, Column, DataStore, PinDocument, PinKind, PinMode, PinStore, Recursive,
|
||||
};
|
||||
use crate::repo::{BlockPut, BlockStore, Column, DataStore, PinKind, PinMode, PinStore};
|
||||
use async_trait::async_trait;
|
||||
use bitswap::Block;
|
||||
use cid::Cid;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::{Mutex, OwnedMutexGuard};
|
||||
|
||||
@ -13,6 +12,7 @@ use super::{BlockRm, BlockRmError, RepoCid};
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
// FIXME: Transition to Persistent Map to make iterating more consistent
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
@ -432,6 +432,211 @@ impl DataStore for MemDataStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
enum Recursive {
|
||||
/// Persistent record of **completed** recursive pinning. All references now have indirect pins
|
||||
/// recorded.
|
||||
Count(u64),
|
||||
/// Persistent record of intent to add recursive pins to all indirect blocks or even not to
|
||||
/// keep the go-ipfs way which might not be a bad idea after all. Adding all the indirect pins
|
||||
/// on disk will cause massive write amplification in the end, but lets keep that way until we
|
||||
/// get everything working at least.
|
||||
Intent,
|
||||
/// Not pinned recursively.
|
||||
Not,
|
||||
}
|
||||
|
||||
impl Recursive {
|
||||
fn is_set(&self) -> bool {
|
||||
match self {
|
||||
Recursive::Count(_) | Recursive::Intent => true,
|
||||
Recursive::Not => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct PinDocument {
|
||||
version: u8,
|
||||
direct: bool,
|
||||
// how many descendants; something to check when walking
|
||||
recursive: Recursive,
|
||||
// no further metadata necessary; cids are pinned by full cid
|
||||
cid_version: u8,
|
||||
// using the cidv1 versions of all cids here, not sure if that makes sense or is important
|
||||
indirect_by: Vec<String>,
|
||||
}
|
||||
|
||||
impl PinDocument {
|
||||
fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result<bool, PinUpdateError> {
|
||||
// these update rules are a bit complex and there are cases we don't need to handle.
|
||||
// Updating on upon `PinKind` forces the caller to inspect what the current state is for
|
||||
// example to handle the case of failing "unpin currently recursively pinned as direct".
|
||||
// the ruleset seems quite strange to be honest.
|
||||
match kind {
|
||||
PinKind::IndirectFrom(root) => {
|
||||
let root = if root.version() == cid::Version::V1 {
|
||||
root.to_string()
|
||||
} else {
|
||||
// this is one more allocation
|
||||
Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string()
|
||||
};
|
||||
|
||||
let modified = if self.indirect_by.is_empty() {
|
||||
if add {
|
||||
self.indirect_by.push(root);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
let mut set = self
|
||||
.indirect_by
|
||||
.drain(..)
|
||||
.collect::<std::collections::BTreeSet<_>>();
|
||||
|
||||
let modified = if add {
|
||||
set.insert(root)
|
||||
} else {
|
||||
set.remove(&root)
|
||||
};
|
||||
|
||||
self.indirect_by.extend(set.into_iter());
|
||||
modified
|
||||
};
|
||||
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::Direct => {
|
||||
if self.recursive.is_set() && !self.direct && add {
|
||||
// go-ipfs: cannot make recursive pin also direct
|
||||
// not really sure why does this rule exist; the other way around is allowed
|
||||
return Err(PinUpdateError::AlreadyPinnedRecursive);
|
||||
}
|
||||
|
||||
if !self.direct && !add {
|
||||
panic!("this situation must be handled by the caller by checking that recursive pin is about to be removed as direct");
|
||||
}
|
||||
|
||||
let modified = self.direct != add;
|
||||
self.direct = add;
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::RecursiveIntention => {
|
||||
let modified = if add {
|
||||
match self.recursive {
|
||||
Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive),
|
||||
// can overwrite Intent with another Intent, as Ipfs::insert_pin is now moving to fix
|
||||
// the Intent into the "final form" of Recursive::Count.
|
||||
Recursive::Intent => false,
|
||||
Recursive::Not => {
|
||||
self.recursive = Recursive::Intent;
|
||||
self.direct = false;
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.recursive {
|
||||
Recursive::Count(_) | Recursive::Intent => {
|
||||
self.recursive = Recursive::Not;
|
||||
true
|
||||
}
|
||||
Recursive::Not => false,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::Recursive(descendants) => {
|
||||
let descendants = *descendants;
|
||||
let modified = if add {
|
||||
match self.recursive {
|
||||
Recursive::Count(other) if other != descendants => {
|
||||
return Err(PinUpdateError::UnexpectedNumberOfDescendants(
|
||||
other,
|
||||
descendants,
|
||||
))
|
||||
}
|
||||
Recursive::Count(_) => false,
|
||||
Recursive::Intent | Recursive::Not => {
|
||||
self.recursive = Recursive::Count(descendants);
|
||||
// the previously direct has now been upgraded to recursive, it can
|
||||
// still be indirect though
|
||||
self.direct = false;
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.recursive {
|
||||
Recursive::Count(other) if other != descendants => {
|
||||
return Err(PinUpdateError::UnexpectedNumberOfDescendants(
|
||||
other,
|
||||
descendants,
|
||||
))
|
||||
}
|
||||
Recursive::Count(_) | Recursive::Intent => {
|
||||
self.recursive = Recursive::Not;
|
||||
true
|
||||
}
|
||||
Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive),
|
||||
}
|
||||
// FIXME: removing ... not sure if this is an issue; was thinking that maybe
|
||||
// the update might need to be split to allow different api for removal than
|
||||
// addition.
|
||||
};
|
||||
Ok(modified)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn can_remove(&self) -> bool {
|
||||
!self.direct && !self.recursive.is_set() && self.indirect_by.is_empty()
|
||||
}
|
||||
|
||||
fn mode(&self) -> Option<PinMode> {
|
||||
if self.recursive.is_set() {
|
||||
Some(PinMode::Recursive)
|
||||
} else if !self.indirect_by.is_empty() {
|
||||
Some(PinMode::Indirect)
|
||||
} else if self.direct {
|
||||
Some(PinMode::Direct)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_kind(&self) -> Option<Result<PinKind<Cid>, cid::Error>> {
|
||||
self.mode().map(|p| {
|
||||
Ok(match p {
|
||||
PinMode::Recursive => match self.recursive {
|
||||
Recursive::Intent => PinKind::RecursiveIntention,
|
||||
Recursive::Count(total) => PinKind::Recursive(total),
|
||||
_ => unreachable!("mode shuold not have returned PinKind::Recursive"),
|
||||
},
|
||||
PinMode::Indirect => {
|
||||
// go-ipfs does seem to be doing a fifo looking, perhaps this is a list there, or
|
||||
// the indirect pins aren't being written down anywhere and they just refs from
|
||||
// recursive roots.
|
||||
let cid = Cid::try_from(self.indirect_by[0].as_str())?;
|
||||
PinKind::IndirectFrom(cid)
|
||||
}
|
||||
PinMode::Direct => PinKind::Direct,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PinUpdateError {
|
||||
#[error("unexpected number of descendants ({}), found {}", .1, .0)]
|
||||
UnexpectedNumberOfDescendants(u64, u64),
|
||||
#[error("not pinned recursively")]
|
||||
NotPinnedRecursive,
|
||||
/// Not allowed: Adding direct pin while pinned recursive
|
||||
#[error("already pinned recursively")]
|
||||
AlreadyPinnedRecursive,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -526,4 +731,20 @@ mod tests {
|
||||
let get = store.get(col, &key);
|
||||
assert_eq!(get.await.unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pindocument_on_direct_pin() {
|
||||
let mut doc = PinDocument {
|
||||
version: 0,
|
||||
direct: false,
|
||||
recursive: Recursive::Not,
|
||||
cid_version: 0,
|
||||
indirect_by: Vec::new(),
|
||||
};
|
||||
|
||||
assert!(doc.update(true, &PinKind::Direct).unwrap());
|
||||
|
||||
assert_eq!(doc.mode(), Some(PinMode::Direct));
|
||||
assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct);
|
||||
}
|
||||
}
|
||||
|
222
src/repo/mod.rs
222
src/repo/mod.rs
@ -15,7 +15,6 @@ use futures::channel::{
|
||||
};
|
||||
use futures::sink::SinkExt;
|
||||
use libp2p::core::PeerId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Borrow;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::path::PathBuf;
|
||||
@ -440,211 +439,6 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
enum Recursive {
|
||||
/// Persistent record of **completed** recursive pinning. All references now have indirect pins
|
||||
/// recorded.
|
||||
Count(u64),
|
||||
/// Persistent record of intent to add recursive pins to all indirect blocks or even not to
|
||||
/// keep the go-ipfs way which might not be a bad idea after all. Adding all the indirect pins
|
||||
/// on disk will cause massive write amplification in the end, but lets keep that way until we
|
||||
/// get everything working at least.
|
||||
Intent,
|
||||
/// Not pinned recursively.
|
||||
Not,
|
||||
}
|
||||
|
||||
impl Recursive {
|
||||
fn is_set(&self) -> bool {
|
||||
match self {
|
||||
Recursive::Count(_) | Recursive::Intent => true,
|
||||
Recursive::Not => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct PinDocument {
|
||||
version: u8,
|
||||
direct: bool,
|
||||
// how many descendants; something to check when walking
|
||||
recursive: Recursive,
|
||||
// no further metadata necessary; cids are pinned by full cid
|
||||
cid_version: u8,
|
||||
// using the cidv1 versions of all cids here, not sure if that makes sense or is important
|
||||
indirect_by: Vec<String>,
|
||||
}
|
||||
|
||||
impl PinDocument {
|
||||
fn update(&mut self, add: bool, kind: &PinKind<&'_ Cid>) -> Result<bool, PinUpdateError> {
|
||||
// these update rules are a bit complex and there are cases we don't need to handle.
|
||||
// Updating on upon `PinKind` forces the caller to inspect what the current state is for
|
||||
// example to handle the case of failing "unpin currently recursively pinned as direct".
|
||||
// the ruleset seems quite strange to be honest.
|
||||
match kind {
|
||||
PinKind::IndirectFrom(root) => {
|
||||
let root = if root.version() == cid::Version::V1 {
|
||||
root.to_string()
|
||||
} else {
|
||||
// this is one more allocation
|
||||
Cid::new_v1(root.codec(), (*root).hash().to_owned()).to_string()
|
||||
};
|
||||
|
||||
let modified = if self.indirect_by.is_empty() {
|
||||
if add {
|
||||
self.indirect_by.push(root);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
let mut set = self
|
||||
.indirect_by
|
||||
.drain(..)
|
||||
.collect::<std::collections::BTreeSet<_>>();
|
||||
|
||||
let modified = if add {
|
||||
set.insert(root)
|
||||
} else {
|
||||
set.remove(&root)
|
||||
};
|
||||
|
||||
self.indirect_by.extend(set.into_iter());
|
||||
modified
|
||||
};
|
||||
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::Direct => {
|
||||
if self.recursive.is_set() && !self.direct && add {
|
||||
// go-ipfs: cannot make recursive pin also direct
|
||||
// not really sure why does this rule exist; the other way around is allowed
|
||||
return Err(PinUpdateError::AlreadyPinnedRecursive);
|
||||
}
|
||||
|
||||
if !self.direct && !add {
|
||||
panic!("this situation must be handled by the caller by checking that recursive pin is about to be removed as direct");
|
||||
}
|
||||
|
||||
let modified = self.direct != add;
|
||||
self.direct = add;
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::RecursiveIntention => {
|
||||
let modified = if add {
|
||||
match self.recursive {
|
||||
Recursive::Count(_) => return Err(PinUpdateError::AlreadyPinnedRecursive),
|
||||
// can overwrite Intent with another Intent, as Ipfs::insert_pin is now moving to fix
|
||||
// the Intent into the "final form" of Recursive::Count.
|
||||
Recursive::Intent => false,
|
||||
Recursive::Not => {
|
||||
self.recursive = Recursive::Intent;
|
||||
self.direct = false;
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.recursive {
|
||||
Recursive::Count(_) | Recursive::Intent => {
|
||||
self.recursive = Recursive::Not;
|
||||
true
|
||||
}
|
||||
Recursive::Not => false,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(modified)
|
||||
}
|
||||
PinKind::Recursive(descendants) => {
|
||||
let descendants = *descendants;
|
||||
let modified = if add {
|
||||
match self.recursive {
|
||||
Recursive::Count(other) if other != descendants => {
|
||||
return Err(PinUpdateError::UnexpectedNumberOfDescendants(
|
||||
other,
|
||||
descendants,
|
||||
))
|
||||
}
|
||||
Recursive::Count(_) => false,
|
||||
Recursive::Intent | Recursive::Not => {
|
||||
self.recursive = Recursive::Count(descendants);
|
||||
// the previously direct has now been upgraded to recursive, it can
|
||||
// still be indirect though
|
||||
self.direct = false;
|
||||
true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match self.recursive {
|
||||
Recursive::Count(other) if other != descendants => {
|
||||
return Err(PinUpdateError::UnexpectedNumberOfDescendants(
|
||||
other,
|
||||
descendants,
|
||||
))
|
||||
}
|
||||
Recursive::Count(_) | Recursive::Intent => {
|
||||
self.recursive = Recursive::Not;
|
||||
true
|
||||
}
|
||||
Recursive::Not => return Err(PinUpdateError::NotPinnedRecursive),
|
||||
}
|
||||
// FIXME: removing ... not sure if this is an issue; was thinking that maybe
|
||||
// the update might need to be split to allow different api for removal than
|
||||
// addition.
|
||||
};
|
||||
Ok(modified)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn can_remove(&self) -> bool {
|
||||
!self.direct && !self.recursive.is_set() && self.indirect_by.is_empty()
|
||||
}
|
||||
|
||||
fn mode(&self) -> Option<PinMode> {
|
||||
if self.recursive.is_set() {
|
||||
Some(PinMode::Recursive)
|
||||
} else if !self.indirect_by.is_empty() {
|
||||
Some(PinMode::Indirect)
|
||||
} else if self.direct {
|
||||
Some(PinMode::Direct)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn pick_kind(&self) -> Option<Result<PinKind<Cid>, cid::Error>> {
|
||||
self.mode().map(|p| {
|
||||
Ok(match p {
|
||||
PinMode::Recursive => match self.recursive {
|
||||
Recursive::Intent => PinKind::RecursiveIntention,
|
||||
Recursive::Count(total) => PinKind::Recursive(total),
|
||||
_ => unreachable!("mode shuold not have returned PinKind::Recursive"),
|
||||
},
|
||||
PinMode::Indirect => {
|
||||
// go-ipfs does seem to be doing a fifo looking, perhaps this is a list there, or
|
||||
// the indirect pins aren't being written down anywhere and they just refs from
|
||||
// recursive roots.
|
||||
let cid = Cid::try_from(self.indirect_by[0].as_str())?;
|
||||
PinKind::IndirectFrom(cid)
|
||||
}
|
||||
PinMode::Direct => PinKind::Direct,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PinUpdateError {
|
||||
#[error("unexpected number of descendants ({}), found {}", .1, .0)]
|
||||
UnexpectedNumberOfDescendants(u64, u64),
|
||||
#[error("not pinned recursively")]
|
||||
NotPinnedRecursive,
|
||||
/// Not allowed: Adding direct pin while pinned recursive
|
||||
#[error("already pinned recursively")]
|
||||
AlreadyPinnedRecursive,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
@ -883,22 +677,6 @@ pub(crate) mod tests {
|
||||
assert_eq!(e.to_string(), "already pinned recursively");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pindocument_on_direct_pin() {
|
||||
let mut doc = PinDocument {
|
||||
version: 0,
|
||||
direct: false,
|
||||
recursive: Recursive::Not,
|
||||
cid_version: 0,
|
||||
indirect_by: Vec::new(),
|
||||
};
|
||||
|
||||
assert!(doc.update(true, &PinKind::Direct).unwrap());
|
||||
|
||||
assert_eq!(doc.mode(), Some(PinMode::Direct));
|
||||
assert_eq!(doc.pick_kind().unwrap().unwrap(), PinKind::Direct);
|
||||
}
|
||||
|
||||
#[tokio::test(max_threads = 1)]
|
||||
async fn can_pin_direct_as_recursive() {
|
||||
// the other way around doesn't work
|
||||
|
Loading…
x
Reference in New Issue
Block a user