refactor: implement Lock trait on FsLock and MemLock
This commit is contained in:
parent
b483d7a724
commit
02d98ad923
10
src/lib.rs
10
src/lib.rs
@ -56,7 +56,7 @@ use tracing_futures::Instrument;
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{HashMap, HashSet},
|
||||
fmt,
|
||||
env, fmt,
|
||||
future::Future,
|
||||
ops::{Deref, DerefMut, Range},
|
||||
path::PathBuf,
|
||||
@ -104,6 +104,7 @@ pub struct Types;
|
||||
impl RepoTypes for Types {
|
||||
type TBlockStore = repo::fs::FsBlockStore;
|
||||
type TDataStore = repo::fs::FsDataStore;
|
||||
type TLock = repo::fs::FsLock;
|
||||
}
|
||||
|
||||
/// In-memory testing configuration used in tests.
|
||||
@ -112,6 +113,7 @@ pub struct TestTypes;
|
||||
impl RepoTypes for TestTypes {
|
||||
type TBlockStore = repo::mem::MemBlockStore;
|
||||
type TDataStore = repo::mem::MemDataStore;
|
||||
type TLock = repo::mem::MemLock;
|
||||
}
|
||||
|
||||
/// Ipfs node options used to configure the node to be created with [`UninitializedIpfs`].
|
||||
@ -177,12 +179,8 @@ impl IpfsOptions {
|
||||
///
|
||||
/// Also used from examples.
|
||||
pub fn inmemory_with_generated_keys() -> Self {
|
||||
use tempfile::TempDir;
|
||||
|
||||
let tempdir = TempDir::new().expect("tempdir creation failed").into_path();
|
||||
|
||||
Self {
|
||||
ipfs_path: tempdir,
|
||||
ipfs_path: env::temp_dir(),
|
||||
keypair: Keypair::generate_ed25519(),
|
||||
mdns: Default::default(),
|
||||
bootstrap: Default::default(),
|
||||
|
@ -8,7 +8,7 @@ use std::path::PathBuf;
|
||||
use std::sync::{atomic::AtomicU64, Arc};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use super::{BlockRm, BlockRmError, Column, DataStore, RepoCid};
|
||||
use super::{BlockRm, BlockRmError, Column, DataStore, Lock, RepoCid};
|
||||
|
||||
/// The PinStore implementation for FsDataStore
|
||||
mod pinstore;
|
||||
@ -88,5 +88,51 @@ impl DataStore for FsDataStore {
|
||||
}
|
||||
}
|
||||
|
||||
use fs2::FileExt;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FsLock {
|
||||
file: File,
|
||||
}
|
||||
|
||||
impl Lock for FsLock {
|
||||
fn new(path: &Path) -> std::io::Result<Self> {
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(path)
|
||||
.unwrap();
|
||||
|
||||
file.try_lock_exclusive()?;
|
||||
|
||||
Ok(Self { file })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
crate::pinstore_interface_tests!(common_tests, crate::repo::fs::FsDataStore::new);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{FsLock, Lock};
|
||||
|
||||
#[test]
|
||||
fn creates_an_exclusive_repo_lock() {
|
||||
let temp_dir = std::env::temp_dir();
|
||||
let lockfile_path = temp_dir.join("repo_lock");
|
||||
|
||||
let lock = FsLock::new(&lockfile_path);
|
||||
assert_eq!(lock.is_ok(), true);
|
||||
|
||||
let failing_lock = FsLock::new(&lockfile_path);
|
||||
assert_eq!(failing_lock.is_err(), true);
|
||||
|
||||
// Clean-up.
|
||||
std::fs::remove_file(lockfile_path).unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
//! Volatile memory backed repo
|
||||
use crate::error::Error;
|
||||
use crate::repo::{BlockPut, BlockStore, Column, DataStore, PinKind, PinMode, PinStore};
|
||||
use crate::repo::{BlockPut, BlockStore, Column, DataStore, Lock, PinKind, PinMode, PinStore};
|
||||
use crate::Block;
|
||||
use async_trait::async_trait;
|
||||
use cid::Cid;
|
||||
@ -648,6 +648,17 @@ pub enum PinUpdateError {
|
||||
CannotUnpinDirectOnRecursivelyPinned,
|
||||
}
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MemLock;
|
||||
|
||||
impl Lock for MemLock {
|
||||
fn new(_path: &Path) -> std::io::Result<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
crate::pinstore_interface_tests!(common_tests, crate::repo::mem::MemDataStore::new);
|
||||
|
||||
|
@ -8,7 +8,6 @@ use async_trait::async_trait;
|
||||
use cid::{self, Cid};
|
||||
use core::convert::TryFrom;
|
||||
use core::fmt::Debug;
|
||||
use fs2::FileExt;
|
||||
use futures::channel::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
oneshot,
|
||||
@ -16,7 +15,6 @@ use futures::channel::{
|
||||
use futures::sink::SinkExt;
|
||||
use libp2p::core::PeerId;
|
||||
use std::borrow::Borrow;
|
||||
use std::fs::File;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
@ -30,6 +28,7 @@ pub mod mem;
|
||||
pub trait RepoTypes: Send + Sync + 'static {
|
||||
type TBlockStore: BlockStore;
|
||||
type TDataStore: DataStore;
|
||||
type TLock: Lock;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@ -118,6 +117,13 @@ pub trait DataStore: PinStore + Debug + Send + Sync + Unpin + 'static {
|
||||
async fn wipe(&self);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Lock: Debug + Send + Sync {
|
||||
fn new(path: &Path) -> std::io::Result<Self>
|
||||
where
|
||||
Self: std::marker::Sized;
|
||||
}
|
||||
|
||||
type References<'a> = futures::stream::BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
|
||||
|
||||
#[async_trait]
|
||||
@ -211,7 +217,7 @@ pub struct Repo<TRepoTypes: RepoTypes> {
|
||||
data_store: TRepoTypes::TDataStore,
|
||||
events: Sender<RepoEvent>,
|
||||
pub(crate) subscriptions: SubscriptionRegistry<Block, String>,
|
||||
lockfile: Lock,
|
||||
lockfile: TRepoTypes::TLock,
|
||||
}
|
||||
|
||||
/// Events used to communicate to the swarm on repo changes.
|
||||
@ -238,40 +244,20 @@ impl TryFrom<RequestKind> for RepoEvent {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Lock {
|
||||
file: File,
|
||||
}
|
||||
|
||||
impl Lock {
|
||||
fn new(path: &Path) -> std::io::Result<Lock> {
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(path)
|
||||
.unwrap();
|
||||
|
||||
file.try_lock_exclusive()?;
|
||||
|
||||
Ok(Lock { file })
|
||||
}
|
||||
}
|
||||
|
||||
impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
|
||||
pub fn new(options: RepoOptions) -> (Self, Receiver<RepoEvent>) {
|
||||
let lockfile_path = options.path.join("repo_lock");
|
||||
let lockfile = Lock::new(&lockfile_path).expect("lock creation failed");
|
||||
|
||||
let mut blockstore_path = options.path.clone();
|
||||
let mut datastore_path = options.path;
|
||||
let mut datastore_path = options.path.clone();
|
||||
let mut lockfile_path = options.path;
|
||||
blockstore_path.push("blockstore");
|
||||
datastore_path.push("datastore");
|
||||
lockfile_path.push("repo_lock");
|
||||
|
||||
let block_store = TRepoTypes::TBlockStore::new(blockstore_path);
|
||||
let data_store = TRepoTypes::TDataStore::new(datastore_path);
|
||||
let lockfile = TRepoTypes::TLock::new(&lockfile_path).expect("lock creation failed");
|
||||
let (sender, receiver) = channel(1);
|
||||
|
||||
(
|
||||
Repo {
|
||||
block_store,
|
||||
@ -469,20 +455,3 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
|
||||
self.data_store.query(cids, requirement).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Lock;
|
||||
|
||||
#[test]
|
||||
fn creates_an_exclusive_repo_lock() {
|
||||
let temp_dir = std::env::temp_dir();
|
||||
let lockfile_path = temp_dir.join("repo_lock");
|
||||
|
||||
let lock = Lock::new(&lockfile_path);
|
||||
assert_eq!(lock.is_ok(), true);
|
||||
|
||||
let failing_lock = Lock::new(&lockfile_path);
|
||||
assert_eq!(failing_lock.is_err(), true);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user