backup-proxy: decouple stats gathering from rrd update

that way we can reuse the stats gathered

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-06-10 13:17:53 +02:00 committed by Wolfgang Bumiller
parent 9f30a31e53
commit 759c4c87af

View File

@ -20,8 +20,11 @@ use tokio_stream::wrappers::ReceiverStream;
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_sys::fs::CreateOptions;
use proxmox_sys::linux::socket::set_tcp_keepalive;
use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
use proxmox_sys::linux::{
procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
socket::set_tcp_keepalive,
};
use proxmox_sys::logrotate::LogRotate;
use proxmox_sys::{task_log, task_warn};
@ -40,6 +43,7 @@ use proxmox_backup::{
auth::check_pbs_auth,
jobstate::{self, Job},
},
tools::disks::BlockDevStat,
traffic_control_cache::TRAFFIC_CONTROL_CACHE,
};
@ -990,81 +994,98 @@ async fn run_stat_generator() {
loop {
let delay_target = Instant::now() + Duration::from_secs(10);
generate_host_stats().await;
let stats = match tokio::task::spawn_blocking(|| {
let hoststats = collect_host_stats_sync();
let (hostdisk, datastores) = collect_disk_stats_sync();
Arc::new((hoststats, hostdisk, datastores))
})
.await
{
Ok(res) => res,
Err(err) => {
log::error!("collecting host stats panicked: {}", err);
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
continue;
}
};
rrd_sync_journal();
if let Err(err) = tokio::task::spawn_blocking(move || {
rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
rrd_sync_journal();
})
.await
{
log::error!("updating rrd panicked: {}", err);
}
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
}
}
async fn generate_host_stats() {
match tokio::task::spawn_blocking(generate_host_stats_sync).await {
Ok(()) => (),
Err(err) => log::error!("generate_host_stats panicked: {}", err),
}
struct HostStats {
proc: Option<ProcFsStat>,
meminfo: Option<ProcFsMemInfo>,
net: Option<Vec<ProcFsNetDev>>,
load: Option<Loadavg>,
}
fn generate_host_stats_sync() {
struct DiskStat {
name: String,
usage: Option<FileSystemInformation>,
dev: Option<BlockDevStat>,
}
fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
};
match read_proc_stat() {
Ok(stat) => {
rrd_update_gauge("host/cpu", stat.cpu);
rrd_update_gauge("host/iowait", stat.iowait_percent);
}
let proc = match read_proc_stat() {
Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_proc_stat failed - {}", err);
None
}
}
};
match read_meminfo() {
Ok(meminfo) => {
rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
rrd_update_gauge("host/memused", meminfo.memused as f64);
rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
rrd_update_gauge("host/swapused", meminfo.swapused as f64);
}
let meminfo = match read_meminfo() {
Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_meminfo failed - {}", err);
None
}
}
};
match read_proc_net_dev() {
Ok(netdev) => {
use pbs_config::network::is_physical_nic;
let mut netin = 0;
let mut netout = 0;
for item in netdev {
if !is_physical_nic(&item.device) {
continue;
}
netin += item.receive;
netout += item.send;
}
rrd_update_derive("host/netin", netin as f64);
rrd_update_derive("host/netout", netout as f64);
}
let net = match read_proc_net_dev() {
Ok(netdev) => Some(netdev),
Err(err) => {
eprintln!("read_prox_net_dev failed - {}", err);
None
}
}
};
match read_loadavg() {
Ok(loadavg) => {
rrd_update_gauge("host/loadavg", loadavg.0 as f64);
}
let load = match read_loadavg() {
Ok(loadavg) => Some(loadavg),
Err(err) => {
eprintln!("read_loadavg failed - {}", err);
None
}
}
};
HostStats {
proc,
meminfo,
net,
load,
}
}
fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
let disk_manager = DiskManage::new();
gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
let mut datastores = Vec::new();
match pbs_config::datastore::config() {
Ok((config, _)) => {
let datastore_list: Vec<DataStoreConfig> = config
@ -1078,15 +1099,80 @@ fn generate_host_stats_sync() {
{
continue;
}
let rrd_prefix = format!("datastore/{}", config.name);
let path = std::path::Path::new(&config.path);
gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
}
}
Err(err) => {
eprintln!("read datastore config failed - {}", err);
}
}
(root, datastores)
}
fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
if let Some(stat) = &host.proc {
rrd_update_gauge("host/cpu", stat.cpu);
rrd_update_gauge("host/iowait", stat.iowait_percent);
}
if let Some(meminfo) = &host.meminfo {
rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
rrd_update_gauge("host/memused", meminfo.memused as f64);
rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
rrd_update_gauge("host/swapused", meminfo.swapused as f64);
}
if let Some(netdev) = &host.net {
use pbs_config::network::is_physical_nic;
let mut netin = 0;
let mut netout = 0;
for item in netdev {
if !is_physical_nic(&item.device) {
continue;
}
netin += item.receive;
netout += item.send;
}
rrd_update_derive("host/netin", netin as f64);
rrd_update_derive("host/netout", netout as f64);
}
if let Some(loadavg) = &host.load {
rrd_update_gauge("host/loadavg", loadavg.0 as f64);
}
rrd_update_disk_stat(hostdisk, "host");
for stat in datastores {
let rrd_prefix = format!("datastore/{}", stat.name);
rrd_update_disk_stat(stat, &rrd_prefix);
}
}
fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
if let Some(status) = &disk.usage {
let rrd_key = format!("{}/total", rrd_prefix);
rrd_update_gauge(&rrd_key, status.total as f64);
let rrd_key = format!("{}/used", rrd_prefix);
rrd_update_gauge(&rrd_key, status.used as f64);
}
if let Some(stat) = &disk.dev {
let rrd_key = format!("{}/read_ios", rrd_prefix);
rrd_update_derive(&rrd_key, stat.read_ios as f64);
let rrd_key = format!("{}/read_bytes", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
let rrd_key = format!("{}/write_ios", rrd_prefix);
rrd_update_derive(&rrd_key, stat.write_ios as f64);
let rrd_key = format!("{}/write_bytes", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
let rrd_key = format!("{}/io_ticks", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
}
}
fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
@ -1122,21 +1208,17 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
match proxmox_sys::fs::fs_info(path) {
Ok(status) => {
let rrd_key = format!("{}/total", rrd_prefix);
rrd_update_gauge(&rrd_key, status.total as f64);
let rrd_key = format!("{}/used", rrd_prefix);
rrd_update_gauge(&rrd_key, status.used as f64);
}
fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
let usage = match proxmox_sys::fs::fs_info(path) {
Ok(status) => Some(status),
Err(err) => {
eprintln!("read fs info on {:?} failed - {}", path, err);
None
}
}
};
match disk_manager.find_mounted_device(path) {
Ok(None) => {}
let dev = match disk_manager.find_mounted_device(path) {
Ok(None) => None,
Ok(Some((fs_type, device, source))) => {
let mut device_stat = None;
match (fs_type.as_str(), source) {
@ -1158,24 +1240,18 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
}
}
if let Some(stat) = device_stat {
let rrd_key = format!("{}/read_ios", rrd_prefix);
rrd_update_derive(&rrd_key, stat.read_ios as f64);
let rrd_key = format!("{}/read_bytes", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
let rrd_key = format!("{}/write_ios", rrd_prefix);
rrd_update_derive(&rrd_key, stat.write_ios as f64);
let rrd_key = format!("{}/write_bytes", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
let rrd_key = format!("{}/io_ticks", rrd_prefix);
rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
}
device_stat
}
Err(err) => {
eprintln!("find_mounted_device failed - {}", err);
None
}
};
DiskStat {
name: name.to_string(),
usage,
dev,
}
}