proxmox-metrics: implement metrics server client code

influxdb (udp + http(s)) only for now

general architecture looks as follows:

the helper functions influxdb_http/udp start a tokio task and return
a Metrics struct, that can be used to send data and wait for the tokio
task. if the struct is dropped, the task is canceled.

so it would look like this:
  let metrics = influxdb_http(..params..)?;
  metrics.send_data(...).await?;
  metrics.send_data(...).await?;
  metrics.join?;

on join, the sending part of the channel will be dropped and thus
flushing the remaining data to the server

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
[renamed proxmox_async::io::udp -> proxmox_async::net::udp]
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-02-02 10:50:11 +01:00 committed by Wolfgang Bumiller
parent de891b1f76
commit 8bf293bfc5
10 changed files with 495 additions and 0 deletions

View File

@ -6,6 +6,7 @@ members = [
"proxmox-http",
"proxmox-io",
"proxmox-lang",
"proxmox-metrics",
"proxmox-router",
"proxmox-schema",
"proxmox-serde",

View File

@ -0,0 +1,21 @@
[package]
name = "proxmox-metrics"
version = "0.1.0"
authors = ["Proxmox Support Team <support@proxmox.com>"]
edition = "2018"
license = "AGPL-3"
description = "Metrics Server export utilitites"
exclude = [ "debian" ]
[dependencies]
anyhow = "1.0"
tokio = { version = "1.0", features = [ "net", "sync" ] }
futures = "0.3"
serde = "1.0"
serde_json = "1.0"
http = "0.2"
hyper = "0.14"
openssl = "0.10"
proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
proxmox-async = { path = "../proxmox-async", features = [], version = "0.3" }

View File

@ -0,0 +1,5 @@
rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
* initial package
-- Proxmox Support Team <support@proxmox.com> Tue, 14 Dec 2021 08:56:54 +0100

View File

@ -0,0 +1,16 @@
Copyright (C) 2021 Proxmox Server Solutions GmbH
This software is written by Proxmox Server Solutions GmbH <support@proxmox.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@ -0,0 +1,7 @@
overlay = "."
crate_src_path = ".."
maintainer = "Proxmox Support Team <support@proxmox.com>"
[source]
vcs_git = "git://git.proxmox.com/git/proxmox.git"
vcs_browser = "https://git.proxmox.com/?p=proxmox.git"

View File

@ -0,0 +1,185 @@
use std::sync::Arc;
use anyhow::{bail, Error};
use hyper::Body;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use tokio::sync::mpsc;
use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
use crate::influxdb::utils;
use crate::{Metrics, MetricsData};
struct InfluxDbHttp {
client: SimpleHttp,
healthuri: http::Uri,
writeuri: http::Uri,
token: Option<String>,
max_body_size: usize,
data: String,
channel: mpsc::Receiver<Arc<MetricsData>>,
}
/// Tests the connection to the given influxdb http server with the given
/// parameters.
pub async fn test_influxdb_http(
uri: &str,
organization: &str,
bucket: &str,
token: Option<&str>,
verify_tls: bool,
) -> Result<(), Error> {
let (_tx, rx) = mpsc::channel(1);
let this = InfluxDbHttp::new(uri, organization, bucket, token, verify_tls, 1, rx)?;
this.test_connection().await
}
/// Returns a [Metrics] handle that connects and sends data to the
/// given influxdb server at the given https url
pub fn influxdb_http(
uri: &str,
organization: &str,
bucket: &str,
token: Option<&str>,
verify_tls: bool,
max_body_size: usize,
) -> Result<Metrics, Error> {
let (tx, rx) = mpsc::channel(1024);
let this = InfluxDbHttp::new(
uri,
organization,
bucket,
token,
verify_tls,
max_body_size,
rx,
)?;
let join_handle = Some(tokio::spawn(async { this.finish().await }));
Ok(Metrics {
join_handle,
channel: Some(tx),
})
}
impl InfluxDbHttp {
fn new(
uri: &str,
organization: &str,
bucket: &str,
token: Option<&str>,
verify_tls: bool,
max_body_size: usize,
channel: mpsc::Receiver<Arc<MetricsData>>,
) -> Result<Self, Error> {
let client = if verify_tls {
SimpleHttp::with_options(SimpleHttpOptions::default())
} else {
let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
ssl_connector.set_verify(SslVerifyMode::NONE);
SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
};
let uri: http::uri::Uri = uri.parse()?;
let uri_parts = uri.into_parts();
let base_path = if let Some(ref p) = uri_parts.path_and_query {
p.path().trim_end_matches('/')
} else {
""
};
let writeuri = http::uri::Builder::new()
.scheme(uri_parts.scheme.clone().unwrap())
.authority(uri_parts.authority.clone().unwrap())
.path_and_query(format!(
"{}/api/v2/write?org={}&bucket={}",
base_path, organization, bucket
))
.build()?;
let healthuri = http::uri::Builder::new()
.scheme(uri_parts.scheme.unwrap())
.authority(uri_parts.authority.unwrap())
.path_and_query(format!("{}/health", base_path))
.build()?;
Ok(InfluxDbHttp {
client,
writeuri,
healthuri,
token: token.map(String::from),
max_body_size,
data: String::new(),
channel,
})
}
async fn test_connection(&self) -> Result<(), Error> {
let mut request = http::Request::builder().method("GET").uri(&self.healthuri);
if let Some(token) = &self.token {
request = request.header("Authorization", format!("Token {}", token));
}
let res = self.client.request(request.body(Body::empty())?).await?;
let status = res.status();
if !status.is_success() {
bail!("got bad status: {}", status);
}
Ok(())
}
async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
let new_data = utils::format_influxdb_line(&data)?;
if self.data.len() + new_data.len() >= self.max_body_size {
self.flush().await?;
}
self.data.push_str(&new_data);
if self.data.len() >= self.max_body_size {
self.flush().await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<(), Error> {
if self.data.is_empty() {
return Ok(());
}
let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
if let Some(token) = &self.token {
request = request.header("Authorization", format!("Token {}", token));
}
let request = request.body(Body::from(self.data.split_off(0)))?;
let res = self.client.request(request).await?;
let status = res.status();
if !status.is_success() {
bail!("got bad status: {}", status);
}
Ok(())
}
async fn finish(mut self) -> Result<(), Error> {
while let Some(data) = self.channel.recv().await {
self.add_data(data).await?;
}
self.flush().await?;
Ok(())
}
}

View File

@ -0,0 +1,7 @@
mod http;
pub use self::http::*;
mod udp;
pub use udp::*;
pub mod utils;

View File

@ -0,0 +1,86 @@
use std::sync::Arc;
use anyhow::Error;
use tokio::sync::mpsc;
use proxmox_async::net::udp;
use crate::influxdb::utils;
use crate::{Metrics, MetricsData};
struct InfluxDbUdp {
address: String,
conn: Option<tokio::net::UdpSocket>,
mtu: u16,
data: String,
channel: mpsc::Receiver<Arc<MetricsData>>,
}
/// Tests the connection to the given influxdb udp server.
pub async fn test_influxdb_udp(address: &str) -> Result<(), Error> {
udp::connect(address).await?;
Ok(())
}
/// Returns a [Metrics] handle that connects and sends data to the
/// given influxdb server at the given udp address/port
///
/// `address` must be in the format of 'ip_or_hostname:port'
pub fn influxdb_udp(address: &str, mtu: Option<u16>) -> Metrics {
let (tx, rx) = mpsc::channel(1024);
let this = InfluxDbUdp {
address: address.to_string(),
conn: None,
// empty ipv6 udp package needs 48 bytes, subtract 50 for safety
mtu: mtu.unwrap_or(1500) - 50,
data: String::new(),
channel: rx,
};
let join_handle = Some(tokio::spawn(async { this.finish().await }));
Metrics {
join_handle,
channel: Some(tx),
}
}
impl InfluxDbUdp {
async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
let new_data = utils::format_influxdb_line(&data)?;
if self.data.len() + new_data.len() >= (self.mtu as usize) {
self.flush().await?;
}
self.data.push_str(&new_data);
if self.data.len() >= (self.mtu as usize) {
self.flush().await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<(), Error> {
let conn = match self.conn.take() {
Some(conn) => conn,
None => udp::connect(&self.address).await?,
};
conn.send(self.data.split_off(0).as_bytes()).await?;
self.conn = Some(conn);
Ok(())
}
async fn finish(mut self) -> Result<(), Error> {
while let Some(data) = self.channel.recv().await {
self.add_data(data).await?;
}
self.flush().await?;
Ok(())
}
}

View File

@ -0,0 +1,50 @@
use std::fmt::Write;
use anyhow::{bail, Error};
use serde_json::Value;
use crate::MetricsData;
pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
if !data.values.is_object() {
bail!("invalid data");
}
let mut line = escape_measurement(&data.measurement);
for (key, value) in &data.tags {
write!(line, ",{}={}", escape_key(key), escape_key(value))?;
}
line.push(' ');
let mut first = true;
for (key, value) in data.values.as_object().unwrap().iter() {
match value {
Value::Object(_) => bail!("objects not supported"),
Value::Array(_) => bail!("arrays not supported"),
_ => {}
}
if !first {
line.push(',');
}
first = false;
write!(line, "{}={}", escape_key(key), value.to_string())?;
}
// nanosecond precision
writeln!(line, " {}", data.ctime * 1_000_000_000)?;
Ok(line)
}
fn escape_key(key: &str) -> String {
let key = key.replace(',', "\\,");
let key = key.replace('=', "\\=");
key.replace(' ', "\\ ")
}
fn escape_measurement(measurement: &str) -> String {
let measurement = measurement.replace(',', "\\,");
measurement.replace(' ', "\\ ")
}

117
proxmox-metrics/src/lib.rs Normal file
View File

@ -0,0 +1,117 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
use serde::Serialize;
use serde_json::Value;
use tokio::sync::mpsc;
mod influxdb;
#[doc(inline)]
pub use influxdb::{influxdb_http, influxdb_udp, test_influxdb_http, test_influxdb_udp};
#[derive(Clone)]
/// Structured data for the metric server
pub struct MetricsData {
/// The category of measurements
pub measurement: String,
/// A list of to attach to the measurements
pub tags: HashMap<String, String>,
/// The actual values to send. Only plain (not-nested) objects are supported at the moment.
pub values: Value,
/// The time of the measurement
pub ctime: i64,
}
impl MetricsData {
/// Convenient helper to create from references
pub fn new<V: Serialize>(
measurement: &str,
tags: &[(&str, &str)],
ctime: i64,
values: V,
) -> Result<Self, Error> {
let mut new_tags = HashMap::new();
for (key, value) in tags {
new_tags.insert(key.to_string(), value.to_string());
}
Ok(Self {
measurement: measurement.to_string(),
tags: new_tags,
values: serde_json::to_value(values)?,
ctime,
})
}
}
/// Helper to send a list of [MetricsData] to a list of [Metrics]
pub async fn send_data_to_channels(
values: &[Arc<MetricsData>],
connections: &[Metrics],
) -> Vec<Result<(), Error>> {
let mut futures = Vec::with_capacity(connections.len());
for connection in connections {
futures.push(async move {
for data in values {
connection.send_data(Arc::clone(data)).await?
}
Ok::<(), Error>(())
});
}
futures::future::join_all(futures).await
}
/// Represents connection to the metric server which can be used to send data
///
/// You can send [MetricsData] by using [`Self::send_data()`], and to flush and
/// finish the connection use [`Self::join`].
///
/// If dropped, it will abort the connection and not flush out buffered data.
pub struct Metrics {
join_handle: Option<tokio::task::JoinHandle<Result<(), Error>>>,
channel: Option<mpsc::Sender<Arc<MetricsData>>>,
}
impl Drop for Metrics {
fn drop(&mut self) {
if let Some(join_handle) = self.join_handle.take() {
join_handle.abort();
}
}
}
impl Metrics {
/// Closes the queue and waits for the connection to send all remaining data
pub async fn join(mut self) -> Result<(), Error> {
if let Some(channel) = self.channel.take() {
drop(channel);
}
if let Some(join_handle) = self.join_handle.take() {
join_handle.await?
} else {
bail!("internal error: no join_handle")
}
}
/// Queues the given data to the metric server
pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
// return ok if we got no data to send
if let Value::Object(map) = &data.values {
if map.is_empty() {
return Ok(());
}
}
if let Some(channel) = &self.channel {
channel
.send(data)
.await
.map_err(|_| format_err!("receiver side closed"))?;
} else {
bail!("channel was already closed");
}
Ok(())
}
}