diff --git a/Cargo.toml b/Cargo.toml index 04c62ac6..6495b40a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "proxmox-schema", "proxmox-section-config", "proxmox-serde", + "proxmox-shared-cache", "proxmox-shared-memory", "proxmox-simple-config", "proxmox-sortable-macro", diff --git a/proxmox-shared-cache/Cargo.toml b/proxmox-shared-cache/Cargo.toml new file mode 100644 index 00000000..b60cd558 --- /dev/null +++ b/proxmox-shared-cache/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "proxmox-shared-cache" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +exclude.workspace = true +description = "A cache that can be used from multiple processes simultaneously" + +[dependencies] +anyhow.workspace = true +proxmox-sys = { workspace = true, features = ["timer"] } +proxmox-schema = { workspace = true, features = ["api-types"]} +serde_json = { workspace = true, features = ["raw_value"] } +serde = { workspace = true, features = ["derive"]} +nix.workspace = true diff --git a/proxmox-shared-cache/debian/changelog b/proxmox-shared-cache/debian/changelog new file mode 100644 index 00000000..54d39f53 --- /dev/null +++ b/proxmox-shared-cache/debian/changelog @@ -0,0 +1,5 @@ +rust-proxmox-shared-cache (0.1.0-1) unstable; urgency=medium + + * initial Debian package + + -- Proxmox Support Team Thu, 04 May 2023 08:40:38 +0200 diff --git a/proxmox-shared-cache/debian/control b/proxmox-shared-cache/debian/control new file mode 100644 index 00000000..834ef43d --- /dev/null +++ b/proxmox-shared-cache/debian/control @@ -0,0 +1,50 @@ +Source: rust-proxmox-shared-cache +Section: rust +Priority: optional +Build-Depends: debhelper (>= 12), + dh-cargo (>= 25), + cargo:native , + rustc:native , + libstd-rust-dev , + librust-anyhow-1+default-dev , + librust-nix-0.26+default-dev (>= 0.26.1-~~) , + librust-proxmox-schema-3+api-types-dev (>= 3.1.1-~~) , + librust-proxmox-schema-3+default-dev (>= 3.1.1-~~) , + librust-proxmox-sys-0.5+default-dev (>= 0.5.5-~~) , + librust-proxmox-sys-0.5+timer-dev (>= 0.5.5-~~) , + librust-serde-1+default-dev , + librust-serde-1+derive-dev , + librust-serde-json-1+default-dev , + librust-serde-json-1+raw-value-dev +Maintainer: Proxmox Support Team +Standards-Version: 4.6.2 +Vcs-Git: https://salsa.debian.org/rust-team/debcargo-conf.git [src/proxmox-shared-cache] +Vcs-Browser: https://salsa.debian.org/rust-team/debcargo-conf/tree/master/src/proxmox-shared-cache +X-Cargo-Crate: proxmox-shared-cache +Rules-Requires-Root: no + +Package: librust-proxmox-shared-cache-dev +Architecture: any +Multi-Arch: same +Depends: + ${misc:Depends}, + librust-anyhow-1+default-dev, + librust-nix-0.26+default-dev (>= 0.26.1-~~), + librust-proxmox-schema-3+api-types-dev (>= 3.1.1-~~), + librust-proxmox-schema-3+default-dev (>= 3.1.1-~~), + librust-proxmox-sys-0.5+default-dev (>= 0.5.5-~~), + librust-proxmox-sys-0.5+timer-dev (>= 0.5.5-~~), + librust-serde-1+default-dev, + librust-serde-1+derive-dev, + librust-serde-json-1+default-dev, + librust-serde-json-1+raw-value-dev +Provides: + librust-proxmox-shared-cache+default-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0+default-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0.1-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0.1+default-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0.1.0-dev (= ${binary:Version}), + librust-proxmox-shared-cache-0.1.0+default-dev (= ${binary:Version}) +Description: Cache that can be used from multiple processes simultaneously - Rust source code + Source code for Debianized Rust crate "proxmox-shared-cache" diff --git a/proxmox-shared-cache/debian/copyright b/proxmox-shared-cache/debian/copyright new file mode 100644 index 00000000..869939c3 --- /dev/null +++ b/proxmox-shared-cache/debian/copyright @@ -0,0 +1,18 @@ +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ + +Files: + * +Copyright: 2024 Proxmox Server Solutions GmbH +License: AGPL-3.0-or-later + This program is free software: you can redistribute it and/or modify it under + the terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or (at your option) any + later version. + . + This program is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more + details. + . + You should have received a copy of the GNU Affero General Public License along + with this program. If not, see . diff --git a/proxmox-shared-cache/debian/debcargo.toml b/proxmox-shared-cache/debian/debcargo.toml new file mode 100644 index 00000000..14ad8000 --- /dev/null +++ b/proxmox-shared-cache/debian/debcargo.toml @@ -0,0 +1,7 @@ +overlay = "." +crate_src_path = ".." +maintainer = "Proxmox Support Team " + +[source] +#vcs_git = "git://git.proxmox.com/git/proxmox.git" +#vcs_browser = "https://git.proxmox.com/?p=proxmox.git" diff --git a/proxmox-shared-cache/src/lib.rs b/proxmox-shared-cache/src/lib.rs new file mode 100644 index 00000000..89c44102 --- /dev/null +++ b/proxmox-shared-cache/src/lib.rs @@ -0,0 +1,325 @@ +use std::fs::File; +use std::io::{BufRead, BufReader, ErrorKind}; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Error; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use proxmox_sys::fs::CreateOptions; + +/// A simple cache that can be used from multiple processes concurrently. +/// +/// The cache can be configured to keep a number of most recent values. +/// +/// ## Concurrency +/// `set` and `delete` lock the cache via an exclusive lock on a separate lock file. +/// `get` and `get_last` do not need a lock, since `set` and `delete` atomically +/// replace the cache file. +pub struct SharedCache { + path: PathBuf, + create_options: CreateOptions, + keep_old: u32, +} + +impl SharedCache { + /// Instantiate a new cache instance for a given `path`. + /// + /// The path containing the cache file must already exist. + /// The cache file itself will be created when first calling `set`. + /// The file permissions will be determined by `create_options`. + pub fn new>( + path: P, + create_options: CreateOptions, + keep_old: u32, + ) -> Result { + Ok(SharedCache { + path: path.into(), + create_options, + keep_old, + }) + } + + /// Returns the cache value. + /// + /// If `keep_old` > 0, this will return the value stored last. + /// + /// If the cache file does not exist or if it is empty, Ok(None) is returned. + /// If the file could not be read for some other reason, or if the + /// entry could not be deserialized an error is returned. + pub fn get(&self) -> Result, Error> { + match File::open(&self.path) { + Ok(f) => { + let mut lines = BufReader::new(f).lines(); + match lines.next() { + Some(Ok(line)) => Ok(Some(serde_json::from_str(&line)?)), + Some(Err(err)) => Err(err.into()), + None => Ok(None), + } + } + Err(err) => { + if err.kind() != ErrorKind::NotFound { + Err(err.into()) + } else { + Ok(None) + } + } + } + } + + /// Returns any last stored items, including `old_entries` of old items. + /// + /// If the cache file does not exist or if it is empty, Ok(vec![]) is returned. + /// If the file could not be read for some other reason, or if the + /// entry could not be deserialized an error is returned. + pub fn get_last(&self, old_entries: u32) -> Result, Error> { + let mut items = Vec::new(); + + let f = match File::open(&self.path) { + Ok(f) => f, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(items), + Err(err) => return Err(err.into()), + }; + + let mut lines = BufReader::new(f).lines(); + + for _ in 0..=old_entries { + if let Some(Ok(line)) = lines.next() { + let item = serde_json::from_str(&line)?; + items.push(item); + } else { + break; + } + } + + Ok(items) + } + + /// Stores a new value. + /// + /// If the number of stored items exceeds 1 + keep_old, the + /// least recently stored item will be dropped. + /// + /// Returns an error if the cache file could not be read/written + /// or if the new value could not be serialized. + pub fn set(&self, value: &V, lock_timeout: Duration) -> Result<(), Error> { + let _lock = self.lock(lock_timeout); + + let mut new_content = serde_json::to_string(value)?; + new_content.push('\n'); + + match File::open(&self.path) { + Ok(f) => { + let mut lines = BufReader::new(f).lines(); + + for _ in 0..self.keep_old { + if let Some(Ok(line)) = lines.next() { + new_content.push_str(&line); + new_content.push('\n'); + } else { + break; + } + } + } + Err(err) => { + if err.kind() != ErrorKind::NotFound { + return Err(err.into()); + } + } + }; + + proxmox_sys::fs::replace_file( + &self.path, + new_content.as_bytes(), + self.create_options.clone(), + true, + )?; + + Ok(()) + } + + /// Removes all items from the cache. + pub fn delete(&self, lock_timeout: Duration) -> Result<(), Error> { + let _lock = self.lock(lock_timeout)?; + proxmox_sys::fs::replace_file(&self.path, &[], self.create_options.clone(), true)?; + + Ok(()) + } + + fn lock(&self, lock_timeout: Duration) -> Result { + let mut lockfile_path = self.path.clone(); + lockfile_path.set_extension("lock"); + proxmox_sys::fs::open_file_locked( + lockfile_path, + lock_timeout, + true, + self.create_options.clone(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::Value; + + struct TestCache { + inner: SharedCache, + dir: PathBuf, + } + + impl TestCache { + fn new(keep_old: u32) -> Self { + let path = proxmox_sys::fs::make_tmp_dir("/tmp/", None).unwrap(); + + let options = CreateOptions::new() + .owner(nix::unistd::Uid::effective()) + .group(nix::unistd::Gid::effective()) + .perm(nix::sys::stat::Mode::from_bits_truncate(0o600)); + + let dir_options = CreateOptions::new() + .owner(nix::unistd::Uid::effective()) + .group(nix::unistd::Gid::effective()) + .perm(nix::sys::stat::Mode::from_bits_truncate(0o700)); + + proxmox_sys::fs::create_path( + &path, + Some(dir_options.clone()), + Some(dir_options.clone()), + ) + .unwrap(); + + let cache = SharedCache::new(path.join("somekey"), options, keep_old).unwrap(); + Self { + inner: cache, + dir: path, + } + } + } + + impl Drop for TestCache { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.dir); + } + } + + fn num(n: u32) -> Value { + Value::from(n) + } + + fn timeout() -> Duration { + Duration::from_secs(1) + } + + #[test] + fn test_get() -> Result<(), Error> { + let wrapper = TestCache::new(2); + let cache = &wrapper.inner; + + let result: Option = cache.get()?; + assert_eq!(result, None); + + cache.set(&num(0), timeout())?; + let result: Option = cache.get()?; + assert_eq!(result, Some(num(0))); + + cache.set(&num(1), timeout())?; + let result: Option = cache.get()?; + assert_eq!(result, Some(num(1))); + Ok(()) + } + + #[test] + fn test_get_without_history() -> Result<(), Error> { + let wrapper = TestCache::new(0); + let cache = &wrapper.inner; + + let result: Option = cache.get()?; + assert_eq!(result, None); + + cache.set(&num(0), timeout())?; + let result: Option = cache.get()?; + assert_eq!(result, Some(num(0))); + + cache.set(&num(1), timeout())?; + let result: Option = cache.get()?; + assert_eq!(result, Some(num(1))); + Ok(()) + } + + #[test] + fn test_get_last() -> Result<(), Error> { + let wrapper = TestCache::new(2); + let cache = &wrapper.inner; + let mut result: Vec; + + // 1 element added + cache.set(&num(0), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(0)]); + + // 2 elements added (1 current, 1 old) + cache.set(&num(1), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(1), num(0)]); + + // 3 elements added (1 current, 2 old) + cache.set(&num(2), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(2), num(1), num(0)]); + + // 4 elements added (1 current, 2 old, oldest one is pushed out) + cache.set(&num(3), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(3), num(2), num(1)]); + + result = cache.get_last(0)?; + assert_eq!(result, vec![num(3)]); + result = cache.get_last(1)?; + assert_eq!(result, vec![num(3), num(2)]); + result = cache.get_last(2)?; + assert_eq!(result, vec![num(3), num(2), num(1)]); + + Ok(()) + } + + #[test] + fn test_get_last_without_history() -> Result<(), Error> { + let wrapper = TestCache::new(0); + let cache = &wrapper.inner; + let mut result: Vec; + + cache.set(&num(0), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(0)]); + + cache.set(&num(1), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(1)]); + + cache.set(&num(2), timeout())?; + result = cache.get_last(10)?; + assert_eq!(result, vec![num(2)]); + + result = cache.get_last(0)?; + assert_eq!(result, vec![num(2)]); + + Ok(()) + } + + #[test] + fn test_deletion() -> Result<(), Error> { + let wrapper = TestCache::new(2); + let cache = &wrapper.inner; + + cache.set(&Value::String("bar".into()), timeout())?; + cache.set(&Value::String("baz".into()), timeout())?; + cache.delete(timeout())?; + + let result: Vec = cache.get_last(2)?; + assert!(result.is_empty()); + + Ok(()) + } +}