proxmox-backup-proxy: send metrics to configured metrics server

and keep the data as similar as possible to pve (tags/fields)

datastores get their own 'object' type and reside in the "blockstat"
measurement

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-06-10 13:17:54 +02:00 committed by Wolfgang Bumiller
parent 759c4c87af
commit 4d397849b5

View File

@ -19,6 +19,7 @@ use tokio_stream::wrappers::ReceiverStream;
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
use proxmox_lang::try_block;
use proxmox_metrics::MetricsData;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
use proxmox_sys::linux::{
@ -28,6 +29,7 @@ use proxmox_sys::linux::{
use proxmox_sys::logrotate::LogRotate;
use proxmox_sys::{task_log, task_warn};
use pbs_config::metrics::get_metric_server_connections;
use pbs_datastore::DataStore;
use proxmox_rest_server::{
@ -1009,19 +1011,121 @@ async fn run_stat_generator() {
}
};
if let Err(err) = tokio::task::spawn_blocking(move || {
rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
rrd_sync_journal();
})
.await
{
log::error!("updating rrd panicked: {}", err);
let rrd_future = tokio::task::spawn_blocking({
let stats = Arc::clone(&stats);
move || {
rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
rrd_sync_journal();
}
});
let metrics_future = send_data_to_metric_servers(stats);
let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
if let Err(err) = rrd_res {
log::error!("rrd update panicked: {}", err);
}
if let Err(err) = metrics_res {
log::error!("error during metrics sending: {}", err);
}
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
}
}
async fn send_data_to_metric_servers(
stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
) -> Result<(), Error> {
let (config, _digest) = pbs_config::metrics::config()?;
let channel_list = get_metric_server_connections(config)?;
if channel_list.is_empty() {
return Ok(());
}
let ctime = proxmox_time::epoch_i64();
let nodename = proxmox_sys::nodename();
let mut values = Vec::new();
let mut cpuvalue = match &stats.0.proc {
Some(stat) => serde_json::to_value(stat)?,
None => json!({}),
};
if let Some(loadavg) = &stats.0.load {
cpuvalue["avg1"] = Value::from(loadavg.0);
cpuvalue["avg5"] = Value::from(loadavg.1);
cpuvalue["avg15"] = Value::from(loadavg.2);
}
values.push(Arc::new(
MetricsData::new("cpustat", ctime, cpuvalue)?
.tag("object", "host")
.tag("host", nodename),
));
if let Some(stat) = &stats.0.meminfo {
values.push(Arc::new(
MetricsData::new("memory", ctime, stat)?
.tag("object", "host")
.tag("host", nodename),
));
}
if let Some(netdev) = &stats.0.net {
for item in netdev {
values.push(Arc::new(
MetricsData::new("nics", ctime, item)?
.tag("object", "host")
.tag("host", nodename)
.tag("instance", item.device.clone()),
));
}
}
values.push(Arc::new(
MetricsData::new("blockstat", ctime, stats.1.to_value())?
.tag("object", "host")
.tag("host", nodename),
));
for datastore in stats.2.iter() {
values.push(Arc::new(
MetricsData::new("blockstat", ctime, datastore.to_value())?
.tag("object", "host")
.tag("host", nodename)
.tag("datastore", datastore.name.clone()),
));
}
// we must have a concrete functions, because the inferred lifetime from a
// closure is not general enough for the tokio::spawn call we are in here...
fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
&item.0
}
let results =
proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
for (res, name) in results
.into_iter()
.zip(channel_list.iter().map(|(_, name)| name))
{
if let Err(err) = res {
log::error!("error sending into channel of {}: {}", name, err);
}
}
futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
if let Err(err) = channel.join().await {
log::error!("error sending to metric server {}: {}", name, err);
}
}))
.await;
Ok(())
}
struct HostStats {
proc: Option<ProcFsStat>,
meminfo: Option<ProcFsMemInfo>,
@ -1035,6 +1139,26 @@ struct DiskStat {
dev: Option<BlockDevStat>,
}
impl DiskStat {
fn to_value(&self) -> Value {
let mut value = json!({});
if let Some(usage) = &self.usage {
value["total"] = Value::from(usage.total);
value["used"] = Value::from(usage.used);
value["avail"] = Value::from(usage.available);
}
if let Some(dev) = &self.dev {
value["read_ios"] = Value::from(dev.read_ios);
value["read_bytes"] = Value::from(dev.read_sectors * 512);
value["write_ios"] = Value::from(dev.write_ios);
value["write_bytes"] = Value::from(dev.write_sectors * 512);
value["io_ticks"] = Value::from(dev.io_ticks / 1000);
}
value
}
}
fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,