Merge branch proxmox-rrd

The proxmox-backup repo was filtered using `git filter-repo` using the
following paths:

proxmox-rrd
proxmox-rrd-api-types
src/rrd

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
This commit is contained in:
Lukas Wagner 2024-01-31 12:40:20 +01:00
commit b8c56e7b6c
11 changed files with 2221 additions and 0 deletions

25
proxmox-rrd/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "proxmox-rrd"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
description = "Simple RRD database implementation."
[dev-dependencies]
proxmox-router = { workspace = true, features = ["cli", "server"] }
[dependencies]
anyhow.workspace = true
bitflags.workspace = true
crossbeam-channel.workspace = true
libc.workspace = true
log.workspace = true
nix.workspace = true
serde.workspace = true
serde_cbor.workspace = true
serde_json.workspace = true
proxmox-schema = { workspace = true, features = [ "api-macro" ] }
proxmox-sys.workspace = true
proxmox-time.workspace = true

View File

@ -0,0 +1,390 @@
//! RRD toolkit - create/manage/update proxmox RRD (v2) file
use std::path::PathBuf;
use anyhow::{bail, Error};
use serde::{Deserialize, Serialize};
use serde_json::json;
use proxmox_router::cli::{
complete_file_name, run_cli_command, CliCommand, CliCommandMap, CliEnvironment,
};
use proxmox_router::RpcEnvironment;
use proxmox_schema::{api, ApiStringFormat, ApiType, IntegerSchema, Schema, StringSchema};
use proxmox_sys::fs::CreateOptions;
use proxmox_rrd::rrd::{CF, DST, RRA, RRD};
pub const RRA_INDEX_SCHEMA: Schema = IntegerSchema::new("Index of the RRA.").minimum(0).schema();
pub const RRA_CONFIG_STRING_SCHEMA: Schema = StringSchema::new("RRA configuration")
.format(&ApiStringFormat::PropertyString(&RRAConfig::API_SCHEMA))
.schema();
#[api(
properties: {},
default_key: "cf",
)]
#[derive(Debug, Serialize, Deserialize)]
/// RRA configuration
pub struct RRAConfig {
/// Time resolution
pub r: u64,
pub cf: CF,
/// Number of data points
pub n: u64,
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
},
},
)]
/// Dump the RRD file in JSON format
pub fn dump_rrd(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
serde_json::to_writer_pretty(std::io::stdout(), &rrd)?;
println!();
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
},
},
)]
/// RRD file information
pub fn rrd_info(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
println!("DST: {:?}", rrd.source.dst);
for (i, rra) in rrd.rra_list.iter().enumerate() {
// use RRAConfig property string format
println!(
"RRA[{}]: {:?},r={},n={}",
i,
rra.cf,
rra.resolution,
rra.data.len()
);
}
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
time: {
description: "Update time.",
optional: true,
},
value: {
description: "Update value.",
},
},
},
)]
/// Update the RRD database
pub fn update_rrd(path: String, time: Option<u64>, value: f64) -> Result<(), Error> {
let path = PathBuf::from(path);
let time = time
.map(|v| v as f64)
.unwrap_or_else(proxmox_time::epoch_f64);
let mut rrd = RRD::load(&path, false)?;
rrd.update(time, value);
rrd.save(&path, CreateOptions::new(), false)?;
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
cf: {
type: CF,
},
resolution: {
description: "Time resolution",
},
start: {
description: "Start time. If not specified, we simply extract 10 data points.",
optional: true,
},
end: {
description: "End time (Unix Epoch). Default is the last update time.",
optional: true,
},
},
},
)]
/// Fetch data from the RRD file
pub fn fetch_rrd(
path: String,
cf: CF,
resolution: u64,
start: Option<u64>,
end: Option<u64>,
) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
let data = rrd.extract_data(cf, resolution, start, end)?;
println!("{}", serde_json::to_string_pretty(&data)?);
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
"rra-index": {
schema: RRA_INDEX_SCHEMA,
},
},
},
)]
/// Return the Unix timestamp of the first time slot inside the
/// specified RRA (slot start time)
pub fn first_update_time(path: String, rra_index: usize) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
if rra_index >= rrd.rra_list.len() {
bail!("rra-index is out of range");
}
let rra = &rrd.rra_list[rra_index];
let duration = (rra.data.len() as u64) * rra.resolution;
let first = rra.slot_start_time((rrd.source.last_update as u64).saturating_sub(duration));
println!("{}", first);
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
},
},
)]
/// Return the Unix timestamp of the last update
pub fn last_update_time(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
println!("{}", rrd.source.last_update);
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
},
},
)]
/// Return the time and value from the last update
pub fn last_update(path: String) -> Result<(), Error> {
let rrd = RRD::load(&PathBuf::from(path), false)?;
let result = json!({
"time": rrd.source.last_update,
"value": rrd.source.last_value,
});
println!("{}", serde_json::to_string_pretty(&result)?);
Ok(())
}
#[api(
input: {
properties: {
dst: {
type: DST,
},
path: {
description: "The filename to create."
},
rra: {
description: "Configuration of contained RRAs.",
type: Array,
items: {
schema: RRA_CONFIG_STRING_SCHEMA,
}
},
},
},
)]
/// Create a new RRD file
pub fn create_rrd(dst: DST, path: String, rra: Vec<String>) -> Result<(), Error> {
let mut rra_list = Vec::new();
for item in rra.iter() {
let rra: RRAConfig =
serde_json::from_value(RRAConfig::API_SCHEMA.parse_property_string(item)?)?;
println!("GOT {:?}", rra);
rra_list.push(RRA::new(rra.cf, rra.r, rra.n as usize));
}
let path = PathBuf::from(path);
let rrd = RRD::new(dst, rra_list);
rrd.save(&path, CreateOptions::new(), false)?;
Ok(())
}
#[api(
input: {
properties: {
path: {
description: "The filename."
},
"rra-index": {
schema: RRA_INDEX_SCHEMA,
},
slots: {
description: "The number of slots you want to add or remove.",
type: i64,
},
},
},
)]
/// Resize. Change the number of data slots for the specified RRA.
pub fn resize_rrd(path: String, rra_index: usize, slots: i64) -> Result<(), Error> {
let path = PathBuf::from(&path);
let mut rrd = RRD::load(&path, false)?;
if rra_index >= rrd.rra_list.len() {
bail!("rra-index is out of range");
}
let rra = &rrd.rra_list[rra_index];
let new_slots = (rra.data.len() as i64) + slots;
if new_slots < 1 {
bail!("number of new slots is too small ('{}' < 1)", new_slots);
}
if new_slots > 1024 * 1024 {
bail!("number of new slots is too big ('{}' > 1M)", new_slots);
}
let rra_end = rra.slot_end_time(rrd.source.last_update as u64);
let rra_start = rra_end - rra.resolution * (rra.data.len() as u64);
let (start, reso, data) = rra
.extract_data(rra_start, rra_end, rrd.source.last_update)
.into();
let mut new_rra = RRA::new(rra.cf, rra.resolution, new_slots as usize);
new_rra.last_count = rra.last_count;
new_rra.insert_data(start, reso, data)?;
rrd.rra_list[rra_index] = new_rra;
rrd.save(&path, CreateOptions::new(), false)?;
Ok(())
}
fn main() -> Result<(), Error> {
let uid = nix::unistd::Uid::current();
let username = match nix::unistd::User::from_uid(uid)? {
Some(user) => user.name,
None => bail!("unable to get user name"),
};
let cmd_def = CliCommandMap::new()
.insert(
"create",
CliCommand::new(&API_METHOD_CREATE_RRD)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"dump",
CliCommand::new(&API_METHOD_DUMP_RRD)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"fetch",
CliCommand::new(&API_METHOD_FETCH_RRD)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"first",
CliCommand::new(&API_METHOD_FIRST_UPDATE_TIME)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"info",
CliCommand::new(&API_METHOD_RRD_INFO)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"last",
CliCommand::new(&API_METHOD_LAST_UPDATE_TIME)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"lastupdate",
CliCommand::new(&API_METHOD_LAST_UPDATE)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"resize",
CliCommand::new(&API_METHOD_RESIZE_RRD)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
)
.insert(
"update",
CliCommand::new(&API_METHOD_UPDATE_RRD)
.arg_param(&["path"])
.completion_cb("path", complete_file_name),
);
let mut rpcenv = CliEnvironment::new();
rpcenv.set_auth_id(Some(format!("{}@pam", username)));
run_cli_command(cmd_def, rpcenv, None);
Ok(())
}

448
proxmox-rrd/src/cache.rs Normal file
View File

@ -0,0 +1,448 @@
use std::collections::BTreeSet;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::thread::spawn;
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use crossbeam_channel::{bounded, TryRecvError};
use proxmox_sys::fs::{create_path, CreateOptions};
use crate::rrd::{CF, DST, RRA, RRD};
use crate::Entry;
mod journal;
use journal::*;
mod rrd_map;
use rrd_map::*;
/// RRD cache - keep RRD data in RAM, but write updates to disk
///
/// This cache is designed to run as single instance (no concurrent
/// access from other processes).
pub struct RRDCache {
config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
}
pub(crate) struct CacheConfig {
apply_interval: f64,
basedir: PathBuf,
file_options: CreateOptions,
dir_options: CreateOptions,
}
impl RRDCache {
/// Creates a new instance
///
/// `basedir`: All files are stored relative to this path.
///
/// `file_options`: Files are created with this options.
///
/// `dir_options`: Directories are created with this options.
///
/// `apply_interval`: Commit journal after `apply_interval` seconds.
///
/// `load_rrd_cb`; The callback function is used to load RRD files,
/// and should return a newly generated RRD if the file does not
/// exists (or is unreadable). This may generate RRDs with
/// different configurations (dependent on `rel_path`).
pub fn new<P: AsRef<Path>>(
basedir: P,
file_options: Option<CreateOptions>,
dir_options: Option<CreateOptions>,
apply_interval: f64,
load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
) -> Result<Self, Error> {
let basedir = basedir.as_ref().to_owned();
let file_options = file_options.unwrap_or_else(CreateOptions::new);
let dir_options = dir_options.unwrap_or_else(CreateOptions::new);
create_path(
&basedir,
Some(dir_options.clone()),
Some(dir_options.clone()),
)
.map_err(|err: Error| format_err!("unable to create rrdb stat dir - {}", err))?;
let config = Arc::new(CacheConfig {
basedir,
file_options,
dir_options,
apply_interval,
});
let state = JournalState::new(Arc::clone(&config))?;
let rrd_map = RRDMap::new(Arc::clone(&config), load_rrd_cb);
Ok(Self {
config: Arc::clone(&config),
state: Arc::new(RwLock::new(state)),
rrd_map: Arc::new(RwLock::new(rrd_map)),
})
}
/// Create a new RRD as used by the proxmox backup server
///
/// It contains the following RRAs:
///
/// * cf=average,r=60,n=1440 => 1day
/// * cf=maximum,r=60,n=1440 => 1day
/// * cf=average,r=30*60,n=1440 => 1month
/// * cf=maximum,r=30*60,n=1440 => 1month
/// * cf=average,r=6*3600,n=1440 => 1year
/// * cf=maximum,r=6*3600,n=1440 => 1year
/// * cf=average,r=7*86400,n=570 => 10years
/// * cf=maximum,r=7*86400,n=570 => 10year
///
/// The resulting data file size is about 80KB.
pub fn create_proxmox_backup_default_rrd(dst: DST) -> RRD {
let rra_list = vec![
// 1 min * 1440 => 1 day
RRA::new(CF::Average, 60, 1440),
RRA::new(CF::Maximum, 60, 1440),
// 30 min * 1440 => 30 days ~ 1 month
RRA::new(CF::Average, 30 * 60, 1440),
RRA::new(CF::Maximum, 30 * 60, 1440),
// 6 h * 1440 => 360 days ~ 1 year
RRA::new(CF::Average, 6 * 3600, 1440),
RRA::new(CF::Maximum, 6 * 3600, 1440),
// 1 week * 570 => 10 years
RRA::new(CF::Average, 7 * 86400, 570),
RRA::new(CF::Maximum, 7 * 86400, 570),
];
RRD::new(dst, rra_list)
}
/// Sync the journal data to disk (using `fdatasync` syscall)
pub fn sync_journal(&self) -> Result<(), Error> {
self.state.read().unwrap().sync_journal()
}
/// Apply and commit the journal. Should be used at server startup.
pub fn apply_journal(&self) -> Result<bool, Error> {
let config = Arc::clone(&self.config);
let state = Arc::clone(&self.state);
let rrd_map = Arc::clone(&self.rrd_map);
let mut state_guard = self.state.write().unwrap();
let journal_applied = state_guard.journal_applied;
if let Some(ref recv) = state_guard.apply_thread_result {
match recv.try_recv() {
Ok(Ok(())) => {
// finished without errors, OK
state_guard.apply_thread_result = None;
}
Ok(Err(err)) => {
// finished with errors, log them
log::error!("{}", err);
state_guard.apply_thread_result = None;
}
Err(TryRecvError::Empty) => {
// still running
return Ok(journal_applied);
}
Err(TryRecvError::Disconnected) => {
// crashed, start again
log::error!("apply journal thread crashed - try again");
state_guard.apply_thread_result = None;
}
}
}
let now = proxmox_time::epoch_f64();
let wants_commit = (now - state_guard.last_journal_flush) > self.config.apply_interval;
if journal_applied && !wants_commit {
return Ok(journal_applied);
}
state_guard.last_journal_flush = proxmox_time::epoch_f64();
let (sender, receiver) = bounded(1);
state_guard.apply_thread_result = Some(receiver);
spawn(move || {
let result = apply_and_commit_journal_thread(config, state, rrd_map, journal_applied)
.map_err(|err| err.to_string());
sender.send(result).unwrap();
});
Ok(journal_applied)
}
/// Update data in RAM and write file back to disk (journal)
pub fn update_value(
&self,
rel_path: &str,
time: f64,
value: f64,
dst: DST,
) -> Result<(), Error> {
let journal_applied = self.apply_journal()?;
self.state
.write()
.unwrap()
.append_journal_entry(time, value, dst, rel_path)?;
if journal_applied {
self.rrd_map
.write()
.unwrap()
.update(rel_path, time, value, dst, false)?;
}
Ok(())
}
/// Extract data from cached RRD
///
/// `start`: Start time. If not specified, we simply extract 10 data points.
///
/// `end`: End time. Default is to use the current time.
pub fn extract_cached_data(
&self,
base: &str,
name: &str,
cf: CF,
resolution: u64,
start: Option<u64>,
end: Option<u64>,
) -> Result<Option<Entry>, Error> {
self.rrd_map
.read()
.unwrap()
.extract_cached_data(base, name, cf, resolution, start, end)
}
}
fn apply_and_commit_journal_thread(
config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
commit_only: bool,
) -> Result<(), Error> {
if commit_only {
state.write().unwrap().rotate_journal()?; // start new journal, keep old one
} else {
let start_time = SystemTime::now();
log::debug!("applying rrd journal");
match apply_journal_impl(Arc::clone(&state), Arc::clone(&rrd_map)) {
Ok(entries) => {
let elapsed = start_time.elapsed().unwrap().as_secs_f64();
log::info!(
"applied rrd journal ({} entries in {:.3} seconds)",
entries,
elapsed
);
}
Err(err) => bail!("apply rrd journal failed - {}", err),
}
}
let start_time = SystemTime::now();
log::debug!("commit rrd journal");
match commit_journal_impl(config, state, rrd_map) {
Ok(rrd_file_count) => {
let elapsed = start_time.elapsed().unwrap().as_secs_f64();
log::info!(
"rrd journal successfully committed ({} files in {:.3} seconds)",
rrd_file_count,
elapsed
);
}
Err(err) => bail!("rrd journal commit failed: {}", err),
}
Ok(())
}
fn apply_journal_lines(
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
journal_name: &str, // used for logging
reader: &mut BufReader<File>,
lock_read_line: bool,
) -> Result<usize, Error> {
let mut linenr = 0;
loop {
linenr += 1;
let mut line = String::new();
let len = if lock_read_line {
let _lock = state.read().unwrap(); // make sure we read entire lines
reader.read_line(&mut line)?
} else {
reader.read_line(&mut line)?
};
if len == 0 {
break;
}
let entry: JournalEntry = match line.parse() {
Ok(entry) => entry,
Err(err) => {
log::warn!(
"unable to parse rrd journal '{}' line {} (skip) - {}",
journal_name,
linenr,
err,
);
continue; // skip unparsable lines
}
};
rrd_map.write().unwrap().update(
&entry.rel_path,
entry.time,
entry.value,
entry.dst,
true,
)?;
}
Ok(linenr)
}
fn apply_journal_impl(
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
) -> Result<usize, Error> {
let mut lines = 0;
// Apply old journals first
let journal_list = state.read().unwrap().list_old_journals()?;
for entry in journal_list {
log::info!("apply old journal log {}", entry.name);
let file = std::fs::OpenOptions::new().read(true).open(&entry.path)?;
let mut reader = BufReader::new(file);
lines += apply_journal_lines(
Arc::clone(&state),
Arc::clone(&rrd_map),
&entry.name,
&mut reader,
false,
)?;
}
let mut journal = state.read().unwrap().open_journal_reader()?;
lines += apply_journal_lines(
Arc::clone(&state),
Arc::clone(&rrd_map),
"rrd.journal",
&mut journal,
true,
)?;
{
let mut state_guard = state.write().unwrap(); // block other writers
lines += apply_journal_lines(
Arc::clone(&state),
Arc::clone(&rrd_map),
"rrd.journal",
&mut journal,
false,
)?;
state_guard.rotate_journal()?; // start new journal, keep old one
// We need to apply the journal only once, because further updates
// are always directly applied.
state_guard.journal_applied = true;
}
Ok(lines)
}
fn fsync_file_or_dir(path: &Path) -> Result<(), Error> {
let file = std::fs::File::open(path)?;
nix::unistd::fsync(file.as_raw_fd())?;
Ok(())
}
pub(crate) fn fsync_file_and_parent(path: &Path) -> Result<(), Error> {
let file = std::fs::File::open(path)?;
nix::unistd::fsync(file.as_raw_fd())?;
if let Some(parent) = path.parent() {
fsync_file_or_dir(parent)?;
}
Ok(())
}
fn rrd_parent_dir(basedir: &Path, rel_path: &str) -> PathBuf {
let mut path = basedir.to_owned();
let rel_path = Path::new(rel_path);
if let Some(parent) = rel_path.parent() {
path.push(parent);
}
path
}
fn commit_journal_impl(
config: Arc<CacheConfig>,
state: Arc<RwLock<JournalState>>,
rrd_map: Arc<RwLock<RRDMap>>,
) -> Result<usize, Error> {
let files = rrd_map.read().unwrap().file_list();
let mut rrd_file_count = 0;
let mut errors = 0;
let mut dir_set = BTreeSet::new();
log::info!("write rrd data back to disk");
// save all RRDs - we only need a read lock here
// Note: no fsync here (we do it afterwards)
for rel_path in files.iter() {
let parent_dir = rrd_parent_dir(&config.basedir, rel_path);
dir_set.insert(parent_dir);
rrd_file_count += 1;
if let Err(err) = rrd_map.read().unwrap().flush_rrd_file(rel_path) {
errors += 1;
log::error!("unable to save rrd {}: {}", rel_path, err);
}
}
if errors != 0 {
bail!("errors during rrd flush - unable to commit rrd journal");
}
// Important: We fsync files after writing all data! This increase
// the likelihood that files are already synced, so this is
// much faster (although we need to re-open the files).
log::info!("starting rrd data sync");
for rel_path in files.iter() {
let mut path = config.basedir.clone();
path.push(rel_path);
fsync_file_or_dir(&path)
.map_err(|err| format_err!("fsync rrd file {} failed - {}", rel_path, err))?;
}
// also fsync directories
for dir_path in dir_set {
fsync_file_or_dir(&dir_path)
.map_err(|err| format_err!("fsync rrd dir {:?} failed - {}", dir_path, err))?;
}
// if everything went ok, remove the old journal files
state.write().unwrap().remove_old_journals()?;
Ok(rrd_file_count)
}

200
proxmox-rrd/src/cache/journal.rs vendored Normal file
View File

@ -0,0 +1,200 @@
use std::ffi::OsStr;
use std::fs::File;
use std::io::{BufReader, Write};
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
use crossbeam_channel::Receiver;
use nix::fcntl::OFlag;
use proxmox_sys::fs::atomic_open_or_create_file;
const RRD_JOURNAL_NAME: &str = "rrd.journal";
use crate::cache::CacheConfig;
use crate::rrd::DST;
// shared state behind RwLock
pub struct JournalState {
config: Arc<CacheConfig>,
journal: File,
pub last_journal_flush: f64,
pub journal_applied: bool,
pub apply_thread_result: Option<Receiver<Result<(), String>>>,
}
pub struct JournalEntry {
pub time: f64,
pub value: f64,
pub dst: DST,
pub rel_path: String,
}
impl FromStr for JournalEntry {
type Err = Error;
fn from_str(line: &str) -> Result<Self, Self::Err> {
let line = line.trim();
let parts: Vec<&str> = line.splitn(4, ':').collect();
if parts.len() != 4 {
bail!("wrong numper of components");
}
let time: f64 = parts[0]
.parse()
.map_err(|_| format_err!("unable to parse time"))?;
let value: f64 = parts[1]
.parse()
.map_err(|_| format_err!("unable to parse value"))?;
let dst: u8 = parts[2]
.parse()
.map_err(|_| format_err!("unable to parse data source type"))?;
let dst = match dst {
0 => DST::Gauge,
1 => DST::Derive,
_ => bail!("got strange value for data source type '{}'", dst),
};
let rel_path = parts[3].to_string();
Ok(JournalEntry {
time,
value,
dst,
rel_path,
})
}
}
pub struct JournalFileInfo {
pub time: u64,
pub name: String,
pub path: PathBuf,
}
impl JournalState {
pub(crate) fn new(config: Arc<CacheConfig>) -> Result<Self, Error> {
let journal = JournalState::open_journal_writer(&config)?;
Ok(Self {
config,
journal,
last_journal_flush: 0.0,
journal_applied: false,
apply_thread_result: None,
})
}
pub fn sync_journal(&self) -> Result<(), Error> {
nix::unistd::fdatasync(self.journal.as_raw_fd())?;
Ok(())
}
pub fn append_journal_entry(
&mut self,
time: f64,
value: f64,
dst: DST,
rel_path: &str,
) -> Result<(), Error> {
let journal_entry = format!("{}:{}:{}:{}\n", time, value, dst as u8, rel_path);
self.journal.write_all(journal_entry.as_bytes())?;
Ok(())
}
pub fn open_journal_reader(&self) -> Result<BufReader<File>, Error> {
// fixme : dup self.journal instead??
let mut journal_path = self.config.basedir.clone();
journal_path.push(RRD_JOURNAL_NAME);
let flags = OFlag::O_CLOEXEC | OFlag::O_RDONLY;
let journal = atomic_open_or_create_file(
&journal_path,
flags,
&[],
self.config.file_options.clone(),
false,
)?;
Ok(BufReader::new(journal))
}
fn open_journal_writer(config: &CacheConfig) -> Result<File, Error> {
let mut journal_path = config.basedir.clone();
journal_path.push(RRD_JOURNAL_NAME);
let flags = OFlag::O_CLOEXEC | OFlag::O_WRONLY | OFlag::O_APPEND;
let journal = atomic_open_or_create_file(
&journal_path,
flags,
&[],
config.file_options.clone(),
false,
)?;
Ok(journal)
}
pub fn rotate_journal(&mut self) -> Result<(), Error> {
let mut journal_path = self.config.basedir.clone();
journal_path.push(RRD_JOURNAL_NAME);
let mut new_name = journal_path.clone();
let now = proxmox_time::epoch_i64();
new_name.set_extension(format!("journal-{:08x}", now));
std::fs::rename(journal_path, &new_name)?;
self.journal = Self::open_journal_writer(&self.config)?;
// make sure the old journal data landed on the disk
super::fsync_file_and_parent(&new_name)?;
Ok(())
}
pub fn remove_old_journals(&self) -> Result<(), Error> {
let journal_list = self.list_old_journals()?;
for entry in journal_list {
std::fs::remove_file(entry.path)?;
}
Ok(())
}
pub fn list_old_journals(&self) -> Result<Vec<JournalFileInfo>, Error> {
let mut list = Vec::new();
for entry in std::fs::read_dir(&self.config.basedir)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
match path.file_stem() {
None => continue,
Some(stem) if stem != OsStr::new("rrd") => continue,
Some(_) => (),
}
if let Some(extension) = path.extension() {
if let Some(extension) = extension.to_str() {
if let Some(rest) = extension.strip_prefix("journal-") {
if let Ok(time) = u64::from_str_radix(rest, 16) {
list.push(JournalFileInfo {
time,
name: format!("rrd.{}", extension),
path: path.to_owned(),
});
}
}
}
}
}
list.sort_unstable_by_key(|entry| entry.time);
Ok(list)
}
}

97
proxmox-rrd/src/cache/rrd_map.rs vendored Normal file
View File

@ -0,0 +1,97 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Error};
use proxmox_sys::fs::create_path;
use crate::rrd::{CF, DST, RRD};
use super::CacheConfig;
use crate::Entry;
pub struct RRDMap {
config: Arc<CacheConfig>,
map: HashMap<String, RRD>,
load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
}
impl RRDMap {
pub(crate) fn new(
config: Arc<CacheConfig>,
load_rrd_cb: fn(path: &Path, rel_path: &str, dst: DST) -> RRD,
) -> Self {
Self {
config,
map: HashMap::new(),
load_rrd_cb,
}
}
pub fn update(
&mut self,
rel_path: &str,
time: f64,
value: f64,
dst: DST,
new_only: bool,
) -> Result<(), Error> {
if let Some(rrd) = self.map.get_mut(rel_path) {
if !new_only || time > rrd.last_update() {
rrd.update(time, value);
}
} else {
let mut path = self.config.basedir.clone();
path.push(rel_path);
create_path(
path.parent().unwrap(),
Some(self.config.dir_options.clone()),
Some(self.config.dir_options.clone()),
)?;
let mut rrd = (self.load_rrd_cb)(&path, rel_path, dst);
if !new_only || time > rrd.last_update() {
rrd.update(time, value);
}
self.map.insert(rel_path.to_string(), rrd);
}
Ok(())
}
pub fn file_list(&self) -> Vec<String> {
let mut list = Vec::new();
for rel_path in self.map.keys() {
list.push(rel_path.clone());
}
list
}
pub fn flush_rrd_file(&self, rel_path: &str) -> Result<(), Error> {
if let Some(rrd) = self.map.get(rel_path) {
let mut path = self.config.basedir.clone();
path.push(rel_path);
rrd.save(&path, self.config.file_options.clone(), true)
} else {
bail!("rrd file {} not loaded", rel_path);
}
}
pub fn extract_cached_data(
&self,
base: &str,
name: &str,
cf: CF,
resolution: u64,
start: Option<u64>,
end: Option<u64>,
) -> Result<Option<Entry>, Error> {
match self.map.get(&format!("{}/{}", base, name)) {
Some(rrd) => Ok(Some(rrd.extract_data(cf, resolution, start, end)?)),
None => Ok(None),
}
}
}

16
proxmox-rrd/src/lib.rs Normal file
View File

@ -0,0 +1,16 @@
//! # Round Robin Database files
//!
//! ## Features
//!
//! * One file stores a single data source
//! * Stores data for different time resolution
//! * Simple cache implementation with journal support
mod rrd_v1;
pub mod rrd;
#[doc(inline)]
pub use rrd::Entry;
mod cache;
pub use cache::*;

694
proxmox-rrd/src/rrd.rs Normal file
View File

@ -0,0 +1,694 @@
//! # Proxmox RRD format version 2
//!
//! The new format uses
//! [CBOR](https://datatracker.ietf.org/doc/html/rfc8949) as storage
//! format. This way we can use the serde serialization framework,
//! which make our code more flexible, much nicer and type safe.
//!
//! ## Features
//!
//! * Well defined data format [CBOR](https://datatracker.ietf.org/doc/html/rfc8949)
//! * Platform independent (big endian f64, hopefully a standard format?)
//! * Arbitrary number of RRAs (dynamically changeable)
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::path::Path;
use anyhow::{bail, format_err, Error};
use serde::{Deserialize, Serialize};
use proxmox_schema::api;
use proxmox_sys::fs::{make_tmp_file, CreateOptions};
use crate::rrd_v1;
/// Proxmox RRD v2 file magic number
// openssl::sha::sha256(b"Proxmox Round Robin Database file v2.0")[0..8];
pub const PROXMOX_RRD_MAGIC_2_0: [u8; 8] = [224, 200, 228, 27, 239, 112, 122, 159];
#[api()]
#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// RRD data source type
pub enum DST {
/// Gauge values are stored unmodified.
Gauge,
/// Stores the difference to the previous value.
Derive,
/// Stores the difference to the previous value (like Derive), but
/// detect counter overflow (and ignores that value)
Counter,
}
#[api()]
#[derive(Debug, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Consolidation function
pub enum CF {
/// Average
Average,
/// Maximum
Maximum,
/// Minimum
Minimum,
/// Use the last value
Last,
}
#[derive(Serialize, Deserialize)]
/// Data source specification
pub struct DataSource {
/// Data source type
pub dst: DST,
/// Last update time (epoch)
pub last_update: f64,
/// Stores the last value, used to compute differential value for
/// derive/counters
pub last_value: f64,
}
/// An RRD entry.
///
/// Serializes as a tuple.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(
from = "(u64, u64, Vec<Option<f64>>)",
into = "(u64, u64, Vec<Option<f64>>)"
)]
pub struct Entry {
pub start: u64,
pub resolution: u64,
pub data: Vec<Option<f64>>,
}
impl Entry {
pub const fn new(start: u64, resolution: u64, data: Vec<Option<f64>>) -> Self {
Self {
start,
resolution,
data,
}
}
/// Get a data point at a specific index which also does bound checking and returns `None` for
/// out of bounds indices.
pub fn get(&self, idx: usize) -> Option<f64> {
self.data.get(idx).copied().flatten()
}
}
impl From<Entry> for (u64, u64, Vec<Option<f64>>) {
fn from(entry: Entry) -> (u64, u64, Vec<Option<f64>>) {
(entry.start, entry.resolution, entry.data)
}
}
impl From<(u64, u64, Vec<Option<f64>>)> for Entry {
fn from(data: (u64, u64, Vec<Option<f64>>)) -> Self {
Self::new(data.0, data.1, data.2)
}
}
impl DataSource {
/// Create a new Instance
pub fn new(dst: DST) -> Self {
Self {
dst,
last_update: 0.0,
last_value: f64::NAN,
}
}
fn compute_new_value(&mut self, time: f64, mut value: f64) -> Result<f64, Error> {
if time < 0.0 {
bail!("got negative time");
}
if time <= self.last_update {
bail!("time in past ({} < {})", time, self.last_update);
}
if value.is_nan() {
bail!("new value is NAN");
}
// derive counter value
let is_counter = self.dst == DST::Counter;
if is_counter || self.dst == DST::Derive {
let time_diff = time - self.last_update;
let diff = if self.last_value.is_nan() {
0.0
} else if is_counter && value < 0.0 {
bail!("got negative value for counter");
} else if is_counter && value < self.last_value {
// Note: We do not try automatic overflow corrections, but
// we update last_value anyways, so that we can compute the diff
// next time.
self.last_value = value;
bail!("counter overflow/reset detected");
} else {
value - self.last_value
};
self.last_value = value;
value = diff / time_diff;
} else {
self.last_value = value;
}
Ok(value)
}
}
#[derive(Serialize, Deserialize)]
/// Round Robin Archive
pub struct RRA {
/// Number of seconds spaned by a single data entry.
pub resolution: u64,
/// Consolitation function.
pub cf: CF,
/// Count values computed inside this update interval.
pub last_count: u64,
/// The actual data entries.
pub data: Vec<f64>,
}
impl RRA {
/// Creates a new instance
pub fn new(cf: CF, resolution: u64, points: usize) -> Self {
Self {
cf,
resolution,
last_count: 0,
data: vec![f64::NAN; points],
}
}
/// Data slot end time
pub fn slot_end_time(&self, time: u64) -> u64 {
self.resolution * (time / self.resolution + 1)
}
/// Data slot start time
pub fn slot_start_time(&self, time: u64) -> u64 {
self.resolution * (time / self.resolution)
}
/// Data slot index
pub fn slot(&self, time: u64) -> usize {
((time / self.resolution) as usize) % self.data.len()
}
/// Directly overwrite data slots.
///
/// The caller need to set `last_update` value on the [DataSource] manually.
pub fn insert_data(
&mut self,
start: u64,
resolution: u64,
data: Vec<Option<f64>>,
) -> Result<(), Error> {
if resolution != self.resolution {
bail!("inser_data failed: got wrong resolution");
}
let mut index = self.slot(start);
for item in data {
if let Some(v) = item {
self.data[index] = v;
}
index += 1;
if index >= self.data.len() {
index = 0;
}
}
Ok(())
}
fn delete_old_slots(&mut self, time: f64, last_update: f64) {
let epoch = time as u64;
let last_update = last_update as u64;
let reso = self.resolution;
let num_entries = self.data.len() as u64;
let min_time = epoch.saturating_sub(num_entries * reso);
let min_time = self.slot_end_time(min_time);
let mut t = last_update.saturating_sub(num_entries * reso);
let mut index = self.slot(t);
for _ in 0..num_entries {
t += reso;
index += 1;
if index >= self.data.len() {
index = 0;
}
if t < min_time {
self.data[index] = f64::NAN;
} else {
break;
}
}
}
fn compute_new_value(&mut self, time: f64, last_update: f64, value: f64) {
let epoch = time as u64;
let last_update = last_update as u64;
let reso = self.resolution;
let index = self.slot(epoch);
let last_index = self.slot(last_update);
if (epoch - last_update) > reso || index != last_index {
self.last_count = 0;
}
let last_value = self.data[index];
if last_value.is_nan() {
self.last_count = 0;
}
let new_count = self.last_count.saturating_add(1);
if self.last_count == 0 {
self.data[index] = value;
self.last_count = 1;
} else {
let new_value = match self.cf {
CF::Maximum => {
if last_value > value {
last_value
} else {
value
}
}
CF::Minimum => {
if last_value < value {
last_value
} else {
value
}
}
CF::Last => value,
CF::Average => {
(last_value * (self.last_count as f64)) / (new_count as f64)
+ value / (new_count as f64)
}
};
self.data[index] = new_value;
self.last_count = new_count;
}
}
/// Extract data
///
/// Extract data from `start` to `end`. The RRA itself does not
/// store the `last_update` time, so you need to pass this a
/// parameter (see [DataSource]).
pub fn extract_data(&self, start: u64, end: u64, last_update: f64) -> Entry {
let last_update = last_update as u64;
let reso = self.resolution;
let num_entries = self.data.len() as u64;
let mut list = Vec::new();
let rrd_end = self.slot_end_time(last_update);
let rrd_start = rrd_end.saturating_sub(reso * num_entries);
let mut t = start;
let mut index = self.slot(t);
for _ in 0..num_entries {
if t > end {
break;
};
if t < rrd_start || t >= rrd_end {
list.push(None);
} else {
let value = self.data[index];
if value.is_nan() {
list.push(None);
} else {
list.push(Some(value));
}
}
t += reso;
index += 1;
if index >= self.data.len() {
index = 0;
}
}
Entry::new(start, reso, list)
}
}
#[derive(Serialize, Deserialize)]
/// Round Robin Database
pub struct RRD {
/// The data source definition
pub source: DataSource,
/// List of round robin archives
pub rra_list: Vec<RRA>,
}
impl RRD {
/// Creates a new Instance
pub fn new(dst: DST, rra_list: Vec<RRA>) -> RRD {
let source = DataSource::new(dst);
RRD { source, rra_list }
}
fn from_raw(raw: &[u8]) -> Result<Self, Error> {
if raw.len() < 8 {
bail!("not an rrd file - file is too small ({})", raw.len());
}
let rrd = if raw[0..8] == rrd_v1::PROXMOX_RRD_MAGIC_1_0 {
let v1 = rrd_v1::RRDv1::from_raw(raw)?;
v1.to_rrd_v2()
.map_err(|err| format_err!("unable to convert from old V1 format - {}", err))?
} else if raw[0..8] == PROXMOX_RRD_MAGIC_2_0 {
serde_cbor::from_slice(&raw[8..])
.map_err(|err| format_err!("unable to decode RRD file - {}", err))?
} else {
bail!("not an rrd file - unknown magic number");
};
if rrd.source.last_update < 0.0 {
bail!("rrd file has negative last_update time");
}
Ok(rrd)
}
/// Load data from a file
///
/// Setting `avoid_page_cache` uses
/// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
/// the linux page cache.
pub fn load(path: &Path, avoid_page_cache: bool) -> Result<Self, std::io::Error> {
let mut file = std::fs::File::open(path)?;
let buffer_size = file.metadata().map(|m| m.len() as usize + 1).unwrap_or(0);
let mut raw = Vec::with_capacity(buffer_size);
file.read_to_end(&mut raw)?;
if avoid_page_cache {
nix::fcntl::posix_fadvise(
file.as_raw_fd(),
0,
buffer_size as i64,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err.to_string()))?;
}
match Self::from_raw(&raw) {
Ok(rrd) => Ok(rrd),
Err(err) => Err(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)),
}
}
/// Store data into a file (atomic replace file)
///
/// Setting `avoid_page_cache` uses
/// `fadvise(..,POSIX_FADV_DONTNEED)` to avoid keeping the data in
/// the linux page cache.
pub fn save(
&self,
path: &Path,
options: CreateOptions,
avoid_page_cache: bool,
) -> Result<(), Error> {
let (fd, tmp_path) = make_tmp_file(path, options)?;
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
let mut try_block = || -> Result<(), Error> {
let mut data: Vec<u8> = Vec::new();
data.extend(PROXMOX_RRD_MAGIC_2_0);
serde_cbor::to_writer(&mut data, self)?;
file.write_all(&data)?;
if avoid_page_cache {
nix::fcntl::posix_fadvise(
file.as_raw_fd(),
0,
data.len() as i64,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_DONTNEED,
)?;
}
Ok(())
};
match try_block() {
Ok(()) => (),
error => {
let _ = nix::unistd::unlink(&tmp_path);
return error;
}
}
if let Err(err) = std::fs::rename(&tmp_path, path) {
let _ = nix::unistd::unlink(&tmp_path);
bail!("Atomic rename failed - {}", err);
}
Ok(())
}
/// Returns the last update time.
pub fn last_update(&self) -> f64 {
self.source.last_update
}
/// Update the value (in memory)
///
/// Note: This does not call [Self::save].
pub fn update(&mut self, time: f64, value: f64) {
let value = match self.source.compute_new_value(time, value) {
Ok(value) => value,
Err(err) => {
log::error!("rrd update failed: {}", err);
return;
}
};
let last_update = self.source.last_update;
self.source.last_update = time;
for rra in self.rra_list.iter_mut() {
rra.delete_old_slots(time, last_update);
rra.compute_new_value(time, last_update, value);
}
}
/// Extract data from the archive
///
/// This selects the RRA with specified [CF] and (minimum)
/// resolution, and extract data from `start` to `end`.
///
/// `start`: Start time. If not specified, we simply extract 10 data points.
/// `end`: End time. Default is to use the current time.
pub fn extract_data(
&self,
cf: CF,
resolution: u64,
start: Option<u64>,
end: Option<u64>,
) -> Result<Entry, Error> {
let mut rra: Option<&RRA> = None;
for item in self.rra_list.iter() {
if item.cf != cf {
continue;
}
if item.resolution > resolution {
continue;
}
if let Some(current) = rra {
if item.resolution > current.resolution {
rra = Some(item);
}
} else {
rra = Some(item);
}
}
match rra {
Some(rra) => {
let end = end.unwrap_or_else(|| proxmox_time::epoch_f64() as u64);
let start = start.unwrap_or_else(|| end.saturating_sub(10 * rra.resolution));
Ok(rra.extract_data(start, end, self.source.last_update))
}
None => bail!("unable to find RRA suitable ({:?}:{})", cf, resolution),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_rra_maximum_gauge_test() -> Result<(), Error> {
let rra = RRA::new(CF::Maximum, 60, 5);
let mut rrd = RRD::new(DST::Gauge, vec![rra]);
for i in 2..10 {
rrd.update((i as f64) * 30.0, i as f64);
}
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Maximum, 60, Some(0), Some(5 * 60))?;
assert_eq!(start, 0);
assert_eq!(resolution, 60);
assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
Ok(())
}
#[test]
fn basic_rra_minimum_gauge_test() -> Result<(), Error> {
let rra = RRA::new(CF::Minimum, 60, 5);
let mut rrd = RRD::new(DST::Gauge, vec![rra]);
for i in 2..10 {
rrd.update((i as f64) * 30.0, i as f64);
}
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Minimum, 60, Some(0), Some(5 * 60))?;
assert_eq!(start, 0);
assert_eq!(resolution, 60);
assert_eq!(data, [None, Some(2.0), Some(4.0), Some(6.0), Some(8.0)]);
Ok(())
}
#[test]
fn basic_rra_last_gauge_test() -> Result<(), Error> {
let rra = RRA::new(CF::Last, 60, 5);
let mut rrd = RRD::new(DST::Gauge, vec![rra]);
for i in 2..10 {
rrd.update((i as f64) * 30.0, i as f64);
}
assert!(
rrd.extract_data(CF::Average, 60, Some(0), Some(5 * 60))
.is_err(),
"CF::Average should not exist"
);
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Last, 60, Some(0), Some(20 * 60))?;
assert_eq!(start, 0);
assert_eq!(resolution, 60);
assert_eq!(data, [None, Some(3.0), Some(5.0), Some(7.0), Some(9.0)]);
Ok(())
}
#[test]
fn basic_rra_average_derive_test() -> Result<(), Error> {
let rra = RRA::new(CF::Average, 60, 5);
let mut rrd = RRD::new(DST::Derive, vec![rra]);
for i in 2..10 {
rrd.update((i as f64) * 30.0, (i * 60) as f64);
}
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
assert_eq!(start, 60);
assert_eq!(resolution, 60);
assert_eq!(data, [Some(1.0), Some(2.0), Some(2.0), Some(2.0), None]);
Ok(())
}
#[test]
fn basic_rra_average_gauge_test() -> Result<(), Error> {
let rra = RRA::new(CF::Average, 60, 5);
let mut rrd = RRD::new(DST::Gauge, vec![rra]);
for i in 2..10 {
rrd.update((i as f64) * 30.0, i as f64);
}
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
assert_eq!(start, 60);
assert_eq!(resolution, 60);
assert_eq!(data, [Some(2.5), Some(4.5), Some(6.5), Some(8.5), None]);
for i in 10..14 {
rrd.update((i as f64) * 30.0, i as f64);
}
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(60), Some(5 * 60))?;
assert_eq!(start, 60);
assert_eq!(resolution, 60);
assert_eq!(data, [None, Some(4.5), Some(6.5), Some(8.5), Some(10.5)]);
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(3 * 60), Some(8 * 60))?;
assert_eq!(start, 3 * 60);
assert_eq!(resolution, 60);
assert_eq!(data, [Some(6.5), Some(8.5), Some(10.5), Some(12.5), None]);
// add much newer value (should delete all previous/outdated value)
let i = 100;
rrd.update((i as f64) * 30.0, i as f64);
println!("TEST {:?}", serde_json::to_string_pretty(&rrd));
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(100 * 30 + 5 * 60))?;
assert_eq!(start, 100 * 30);
assert_eq!(resolution, 60);
assert_eq!(data, [Some(100.0), None, None, None, None]);
// extract with end time smaller than start time
let Entry {
start,
resolution,
data,
} = rrd.extract_data(CF::Average, 60, Some(100 * 30), Some(60))?;
assert_eq!(start, 100 * 30);
assert_eq!(resolution, 60);
assert_eq!(data, []);
Ok(())
}
}

295
proxmox-rrd/src/rrd_v1.rs Normal file
View File

@ -0,0 +1,295 @@
use std::io::Read;
use anyhow::Error;
use bitflags::bitflags;
/// The number of data entries per RRA
pub const RRD_DATA_ENTRIES: usize = 70;
/// Proxmox RRD file magic number
// openssl::sha::sha256(b"Proxmox Round Robin Database file v1.0")[0..8];
pub const PROXMOX_RRD_MAGIC_1_0: [u8; 8] = [206, 46, 26, 212, 172, 158, 5, 186];
use crate::rrd::{DataSource, CF, DST, RRA, RRD};
bitflags! {
/// Flags to specify the data source type and consolidation function
pub struct RRAFlags: u64 {
// Data Source Types
const DST_GAUGE = 1;
const DST_DERIVE = 2;
const DST_COUNTER = 4;
const DST_MASK = 255; // first 8 bits
// Consolidation Functions
const CF_AVERAGE = 1 << 8;
const CF_MAX = 2 << 8;
const CF_MASK = 255 << 8;
}
}
/// Round Robin Archive with [RRD_DATA_ENTRIES] data slots.
///
/// This data structure is used inside [RRD] and directly written to the
/// RRD files.
#[repr(C)]
pub struct RRAv1 {
/// Defined the data source type and consolidation function
pub flags: RRAFlags,
/// Resolution (seconds)
pub resolution: u64,
/// Last update time (epoch)
pub last_update: f64,
/// Count values computed inside this update interval
pub last_count: u64,
/// Stores the last value, used to compute differential value for derive/counters
pub counter_value: f64,
/// Data slots
pub data: [f64; RRD_DATA_ENTRIES],
}
impl RRAv1 {
fn extract_data(&self) -> (u64, u64, Vec<Option<f64>>) {
let reso = self.resolution;
let mut list = Vec::new();
let rra_end = reso * ((self.last_update as u64) / reso);
let rra_start = rra_end - reso * (RRD_DATA_ENTRIES as u64);
let mut t = rra_start;
let mut index = ((t / reso) % (RRD_DATA_ENTRIES as u64)) as usize;
for _ in 0..RRD_DATA_ENTRIES {
let value = self.data[index];
if value.is_nan() {
list.push(None);
} else {
list.push(Some(value));
}
t += reso;
index = (index + 1) % RRD_DATA_ENTRIES;
}
(rra_start, reso, list)
}
}
/// Round Robin Database file format with fixed number of [RRA]s
#[repr(C)]
// Note: Avoid alignment problems by using 8byte types only
pub struct RRDv1 {
/// The magic number to identify the file type
pub magic: [u8; 8],
/// Hourly data (average values)
pub hour_avg: RRAv1,
/// Hourly data (maximum values)
pub hour_max: RRAv1,
/// Dayly data (average values)
pub day_avg: RRAv1,
/// Dayly data (maximum values)
pub day_max: RRAv1,
/// Weekly data (average values)
pub week_avg: RRAv1,
/// Weekly data (maximum values)
pub week_max: RRAv1,
/// Monthly data (average values)
pub month_avg: RRAv1,
/// Monthly data (maximum values)
pub month_max: RRAv1,
/// Yearly data (average values)
pub year_avg: RRAv1,
/// Yearly data (maximum values)
pub year_max: RRAv1,
}
impl RRDv1 {
pub fn from_raw(mut raw: &[u8]) -> Result<Self, std::io::Error> {
let expected_len = std::mem::size_of::<RRDv1>();
if raw.len() != expected_len {
let msg = format!("wrong data size ({} != {})", raw.len(), expected_len);
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
}
let mut rrd: RRDv1 = unsafe { std::mem::zeroed() };
unsafe {
let rrd_slice =
std::slice::from_raw_parts_mut(&mut rrd as *mut _ as *mut u8, expected_len);
raw.read_exact(rrd_slice)?;
}
if rrd.magic != PROXMOX_RRD_MAGIC_1_0 {
let msg = "wrong magic number".to_string();
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
}
Ok(rrd)
}
pub fn to_rrd_v2(&self) -> Result<RRD, Error> {
let mut rra_list = Vec::new();
// old format v1:
//
// hour 1 min, 70 points
// day 30 min, 70 points
// week 3 hours, 70 points
// month 12 hours, 70 points
// year 1 week, 70 points
//
// new default for RRD v2:
//
// day 1 min, 1440 points
// month 30 min, 1440 points
// year 365 min (6h), 1440 points
// decade 1 week, 570 points
// Linear extrapolation
fn extrapolate_data(
start: u64,
reso: u64,
factor: u64,
data: Vec<Option<f64>>,
) -> (u64, u64, Vec<Option<f64>>) {
let mut new = Vec::new();
for i in 0..data.len() {
let mut next = i + 1;
if next >= data.len() {
next = 0
};
let v = data[i];
let v1 = data[next];
match (v, v1) {
(Some(v), Some(v1)) => {
let diff = (v1 - v) / (factor as f64);
for j in 0..factor {
new.push(Some(v + diff * (j as f64)));
}
}
(Some(v), None) => {
new.push(Some(v));
for _ in 0..factor - 1 {
new.push(None);
}
}
(None, Some(v1)) => {
for _ in 0..factor - 1 {
new.push(None);
}
new.push(Some(v1));
}
(None, None) => {
for _ in 0..factor {
new.push(None);
}
}
}
}
(start, reso / factor, new)
}
// Try to convert to new, higher capacity format
// compute daily average (merge old self.day_avg and self.hour_avg
let mut day_avg = RRA::new(CF::Average, 60, 1440);
let (start, reso, data) = self.day_avg.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 30, data);
day_avg.insert_data(start, reso, data)?;
let (start, reso, data) = self.hour_avg.extract_data();
day_avg.insert_data(start, reso, data)?;
// compute daily maximum (merge old self.day_max and self.hour_max
let mut day_max = RRA::new(CF::Maximum, 60, 1440);
let (start, reso, data) = self.day_max.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 30, data);
day_max.insert_data(start, reso, data)?;
let (start, reso, data) = self.hour_max.extract_data();
day_max.insert_data(start, reso, data)?;
// compute monthly average (merge old self.month_avg,
// self.week_avg and self.day_avg)
let mut month_avg = RRA::new(CF::Average, 30 * 60, 1440);
let (start, reso, data) = self.month_avg.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 24, data);
month_avg.insert_data(start, reso, data)?;
let (start, reso, data) = self.week_avg.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 6, data);
month_avg.insert_data(start, reso, data)?;
let (start, reso, data) = self.day_avg.extract_data();
month_avg.insert_data(start, reso, data)?;
// compute monthly maximum (merge old self.month_max,
// self.week_max and self.day_max)
let mut month_max = RRA::new(CF::Maximum, 30 * 60, 1440);
let (start, reso, data) = self.month_max.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 24, data);
month_max.insert_data(start, reso, data)?;
let (start, reso, data) = self.week_max.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 6, data);
month_max.insert_data(start, reso, data)?;
let (start, reso, data) = self.day_max.extract_data();
month_max.insert_data(start, reso, data)?;
// compute yearly average (merge old self.year_avg)
let mut year_avg = RRA::new(CF::Average, 6 * 3600, 1440);
let (start, reso, data) = self.year_avg.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 28, data);
year_avg.insert_data(start, reso, data)?;
// compute yearly maximum (merge old self.year_avg)
let mut year_max = RRA::new(CF::Maximum, 6 * 3600, 1440);
let (start, reso, data) = self.year_max.extract_data();
let (start, reso, data) = extrapolate_data(start, reso, 28, data);
year_max.insert_data(start, reso, data)?;
// compute decade average (merge old self.year_avg)
let mut decade_avg = RRA::new(CF::Average, 7 * 86400, 570);
let (start, reso, data) = self.year_avg.extract_data();
decade_avg.insert_data(start, reso, data)?;
// compute decade maximum (merge old self.year_max)
let mut decade_max = RRA::new(CF::Maximum, 7 * 86400, 570);
let (start, reso, data) = self.year_max.extract_data();
decade_max.insert_data(start, reso, data)?;
rra_list.push(day_avg);
rra_list.push(day_max);
rra_list.push(month_avg);
rra_list.push(month_max);
rra_list.push(year_avg);
rra_list.push(year_max);
rra_list.push(decade_avg);
rra_list.push(decade_max);
// use values from hour_avg for source (all RRAv1 must have the same config)
let dst = if self.hour_avg.flags.contains(RRAFlags::DST_COUNTER) {
DST::Counter
} else if self.hour_avg.flags.contains(RRAFlags::DST_DERIVE) {
DST::Derive
} else {
DST::Gauge
};
let source = DataSource {
dst,
last_value: f64::NAN,
last_update: self.hour_avg.last_update, // IMPORTANT!
};
Ok(RRD { source, rra_list })
}
}

View File

@ -0,0 +1,56 @@
use std::path::Path;
use std::process::Command;
use anyhow::{bail, Error};
use proxmox_rrd::rrd::RRD;
use proxmox_sys::fs::CreateOptions;
fn compare_file(fn1: &str, fn2: &str) -> Result<(), Error> {
let status = Command::new("/usr/bin/cmp")
.arg(fn1)
.arg(fn2)
.status()
.expect("failed to execute process");
if !status.success() {
bail!("file compare failed");
}
Ok(())
}
const RRD_V1_FN: &str = "./tests/testdata/cpu.rrd_v1";
const RRD_V2_FN: &str = "./tests/testdata/cpu.rrd_v2";
// make sure we can load and convert RRD v1
#[test]
fn upgrade_from_rrd_v1() -> Result<(), Error> {
let rrd = RRD::load(Path::new(RRD_V1_FN), true)?;
const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.upgraded";
let new_path = Path::new(RRD_V2_NEW_FN);
rrd.save(new_path, CreateOptions::new(), true)?;
let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN);
let _ = std::fs::remove_file(RRD_V2_NEW_FN);
result?;
Ok(())
}
// make sure we can load and save RRD v2
#[test]
fn load_and_save_rrd_v2() -> Result<(), Error> {
let rrd = RRD::load(Path::new(RRD_V2_FN), true)?;
const RRD_V2_NEW_FN: &str = "./tests/testdata/cpu.rrd_v2.saved";
let new_path = Path::new(RRD_V2_NEW_FN);
rrd.save(new_path, CreateOptions::new(), true)?;
let result = compare_file(RRD_V2_FN, RRD_V2_NEW_FN);
let _ = std::fs::remove_file(RRD_V2_NEW_FN);
result?;
Ok(())
}

BIN
proxmox-rrd/tests/testdata/cpu.rrd_v1 vendored Normal file

Binary file not shown.

BIN
proxmox-rrd/tests/testdata/cpu.rrd_v2 vendored Normal file

Binary file not shown.