[pbs-devel] [PATCH proxmox v5 2/2] proxmox-metrics: implement metrics server client code
Dominik Csapak
d.csapak at proxmox.com
Wed Feb 2 10:50:11 CET 2022
influxdb (udp + http(s)) only for now
general architecture looks as follows:
the helper functions influxdb_http/udp start a tokio task and return
a Metrics struct, that can be used to send data and wait for the tokio
task. if the struct is dropped, the task is canceled.
so it would look like this:
let metrics = influxdb_http(..params..)?;
metrics.send_data(...).await?;
metrics.send_data(...).await?;
metrics.join?;
on join, the sending part of the channel will be dropped and thus
flushing the remaining data to the server
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
Cargo.toml | 1 +
proxmox-metrics/Cargo.toml | 21 +++
proxmox-metrics/debian/changelog | 5 +
proxmox-metrics/debian/copyright | 16 +++
proxmox-metrics/debian/debcargo.toml | 7 +
proxmox-metrics/src/influxdb/http.rs | 185 ++++++++++++++++++++++++++
proxmox-metrics/src/influxdb/mod.rs | 7 +
proxmox-metrics/src/influxdb/udp.rs | 86 ++++++++++++
proxmox-metrics/src/influxdb/utils.rs | 50 +++++++
proxmox-metrics/src/lib.rs | 117 ++++++++++++++++
10 files changed, 495 insertions(+)
create mode 100644 proxmox-metrics/Cargo.toml
create mode 100644 proxmox-metrics/debian/changelog
create mode 100644 proxmox-metrics/debian/copyright
create mode 100644 proxmox-metrics/debian/debcargo.toml
create mode 100644 proxmox-metrics/src/influxdb/http.rs
create mode 100644 proxmox-metrics/src/influxdb/mod.rs
create mode 100644 proxmox-metrics/src/influxdb/udp.rs
create mode 100644 proxmox-metrics/src/influxdb/utils.rs
create mode 100644 proxmox-metrics/src/lib.rs
diff --git a/Cargo.toml b/Cargo.toml
index 8f85e08..4a458d2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,7 @@ members = [
"proxmox-http",
"proxmox-io",
"proxmox-lang",
+ "proxmox-metrics",
"proxmox-router",
"proxmox-schema",
"proxmox-serde",
diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
new file mode 100644
index 0000000..4f0b8e3
--- /dev/null
+++ b/proxmox-metrics/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "proxmox-metrics"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support at proxmox.com>"]
+edition = "2018"
+license = "AGPL-3"
+description = "Metrics Server export utilitites"
+
+exclude = [ "debian" ]
+
+[dependencies]
+anyhow = "1.0"
+tokio = { version = "1.0", features = [ "net", "sync" ] }
+futures = "0.3"
+serde = "1.0"
+serde_json = "1.0"
+http = "0.2"
+hyper = "0.14"
+openssl = "0.10"
+proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
+proxmox-async = { path = "../proxmox-async", features = [], version = "0.3" }
diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
new file mode 100644
index 0000000..c02803b
--- /dev/null
+++ b/proxmox-metrics/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
+
+ * initial package
+
+ -- Proxmox Support Team <support at proxmox.com> Tue, 14 Dec 2021 08:56:54 +0100
diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
new file mode 100644
index 0000000..5661ef6
--- /dev/null
+++ b/proxmox-metrics/debian/copyright
@@ -0,0 +1,16 @@
+Copyright (C) 2021 Proxmox Server Solutions GmbH
+
+This software is written by Proxmox Server Solutions GmbH <support at proxmox.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
new file mode 100644
index 0000000..b7864cd
--- /dev/null
+++ b/proxmox-metrics/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support at proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
new file mode 100644
index 0000000..51a4181
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/http.rs
@@ -0,0 +1,185 @@
+use std::sync::Arc;
+
+use anyhow::{bail, Error};
+use hyper::Body;
+use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
+use tokio::sync::mpsc;
+
+use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
+
+use crate::influxdb::utils;
+use crate::{Metrics, MetricsData};
+
+struct InfluxDbHttp {
+ client: SimpleHttp,
+ healthuri: http::Uri,
+ writeuri: http::Uri,
+ token: Option<String>,
+ max_body_size: usize,
+ data: String,
+ channel: mpsc::Receiver<Arc<MetricsData>>,
+}
+
+/// Tests the connection to the given influxdb http server with the given
+/// parameters.
+pub async fn test_influxdb_http(
+ uri: &str,
+ organization: &str,
+ bucket: &str,
+ token: Option<&str>,
+ verify_tls: bool,
+) -> Result<(), Error> {
+ let (_tx, rx) = mpsc::channel(1);
+
+ let this = InfluxDbHttp::new(uri, organization, bucket, token, verify_tls, 1, rx)?;
+
+ this.test_connection().await
+}
+
+/// Returns a [Metrics] handle that connects and sends data to the
+/// given influxdb server at the given https url
+pub fn influxdb_http(
+ uri: &str,
+ organization: &str,
+ bucket: &str,
+ token: Option<&str>,
+ verify_tls: bool,
+ max_body_size: usize,
+) -> Result<Metrics, Error> {
+ let (tx, rx) = mpsc::channel(1024);
+
+ let this = InfluxDbHttp::new(
+ uri,
+ organization,
+ bucket,
+ token,
+ verify_tls,
+ max_body_size,
+ rx,
+ )?;
+
+ let join_handle = Some(tokio::spawn(async { this.finish().await }));
+
+ Ok(Metrics {
+ join_handle,
+ channel: Some(tx),
+ })
+}
+
+impl InfluxDbHttp {
+ fn new(
+ uri: &str,
+ organization: &str,
+ bucket: &str,
+ token: Option<&str>,
+ verify_tls: bool,
+ max_body_size: usize,
+ channel: mpsc::Receiver<Arc<MetricsData>>,
+ ) -> Result<Self, Error> {
+ let client = if verify_tls {
+ SimpleHttp::with_options(SimpleHttpOptions::default())
+ } else {
+ let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
+ ssl_connector.set_verify(SslVerifyMode::NONE);
+ SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
+ };
+
+ let uri: http::uri::Uri = uri.parse()?;
+ let uri_parts = uri.into_parts();
+
+ let base_path = if let Some(ref p) = uri_parts.path_and_query {
+ p.path().trim_end_matches('/')
+ } else {
+ ""
+ };
+
+ let writeuri = http::uri::Builder::new()
+ .scheme(uri_parts.scheme.clone().unwrap())
+ .authority(uri_parts.authority.clone().unwrap())
+ .path_and_query(format!(
+ "{}/api/v2/write?org={}&bucket={}",
+ base_path, organization, bucket
+ ))
+ .build()?;
+
+ let healthuri = http::uri::Builder::new()
+ .scheme(uri_parts.scheme.unwrap())
+ .authority(uri_parts.authority.unwrap())
+ .path_and_query(format!("{}/health", base_path))
+ .build()?;
+
+ Ok(InfluxDbHttp {
+ client,
+ writeuri,
+ healthuri,
+ token: token.map(String::from),
+ max_body_size,
+ data: String::new(),
+ channel,
+ })
+ }
+
+ async fn test_connection(&self) -> Result<(), Error> {
+ let mut request = http::Request::builder().method("GET").uri(&self.healthuri);
+
+ if let Some(token) = &self.token {
+ request = request.header("Authorization", format!("Token {}", token));
+ }
+
+ let res = self.client.request(request.body(Body::empty())?).await?;
+
+ let status = res.status();
+ if !status.is_success() {
+ bail!("got bad status: {}", status);
+ }
+
+ Ok(())
+ }
+
+ async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+ let new_data = utils::format_influxdb_line(&data)?;
+
+ if self.data.len() + new_data.len() >= self.max_body_size {
+ self.flush().await?;
+ }
+
+ self.data.push_str(&new_data);
+
+ if self.data.len() >= self.max_body_size {
+ self.flush().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn flush(&mut self) -> Result<(), Error> {
+ if self.data.is_empty() {
+ return Ok(());
+ }
+ let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
+
+ if let Some(token) = &self.token {
+ request = request.header("Authorization", format!("Token {}", token));
+ }
+
+ let request = request.body(Body::from(self.data.split_off(0)))?;
+
+ let res = self.client.request(request).await?;
+
+ let status = res.status();
+ if !status.is_success() {
+ bail!("got bad status: {}", status);
+ }
+ Ok(())
+ }
+
+ async fn finish(mut self) -> Result<(), Error> {
+ while let Some(data) = self.channel.recv().await {
+ self.add_data(data).await?;
+ }
+
+ self.flush().await?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
new file mode 100644
index 0000000..26a715c
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/mod.rs
@@ -0,0 +1,7 @@
+mod http;
+pub use self::http::*;
+
+mod udp;
+pub use udp::*;
+
+pub mod utils;
diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
new file mode 100644
index 0000000..4217c61
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/udp.rs
@@ -0,0 +1,86 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use tokio::sync::mpsc;
+
+use proxmox_async::io::udp;
+
+use crate::influxdb::utils;
+use crate::{Metrics, MetricsData};
+
+struct InfluxDbUdp {
+ address: String,
+ conn: Option<tokio::net::UdpSocket>,
+ mtu: u16,
+ data: String,
+ channel: mpsc::Receiver<Arc<MetricsData>>,
+}
+
+/// Tests the connection to the given influxdb udp server.
+pub async fn test_influxdb_udp(address: &str) -> Result<(), Error> {
+ udp::connect(address).await?;
+ Ok(())
+}
+
+/// Returns a [Metrics] handle that connects and sends data to the
+/// given influxdb server at the given udp address/port
+///
+/// `address` must be in the format of 'ip_or_hostname:port'
+pub fn influxdb_udp(address: &str, mtu: Option<u16>) -> Metrics {
+ let (tx, rx) = mpsc::channel(1024);
+
+ let this = InfluxDbUdp {
+ address: address.to_string(),
+ conn: None,
+ // empty ipv6 udp package needs 48 bytes, subtract 50 for safety
+ mtu: mtu.unwrap_or(1500) - 50,
+ data: String::new(),
+ channel: rx,
+ };
+
+ let join_handle = Some(tokio::spawn(async { this.finish().await }));
+
+ Metrics {
+ join_handle,
+ channel: Some(tx),
+ }
+}
+
+impl InfluxDbUdp {
+ async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+ let new_data = utils::format_influxdb_line(&data)?;
+
+ if self.data.len() + new_data.len() >= (self.mtu as usize) {
+ self.flush().await?;
+ }
+
+ self.data.push_str(&new_data);
+
+ if self.data.len() >= (self.mtu as usize) {
+ self.flush().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn flush(&mut self) -> Result<(), Error> {
+ let conn = match self.conn.take() {
+ Some(conn) => conn,
+ None => udp::connect(&self.address).await?,
+ };
+
+ conn.send(self.data.split_off(0).as_bytes()).await?;
+ self.conn = Some(conn);
+ Ok(())
+ }
+
+ async fn finish(mut self) -> Result<(), Error> {
+ while let Some(data) = self.channel.recv().await {
+ self.add_data(data).await?;
+ }
+
+ self.flush().await?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
new file mode 100644
index 0000000..3507e61
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/utils.rs
@@ -0,0 +1,50 @@
+use std::fmt::Write;
+
+use anyhow::{bail, Error};
+use serde_json::Value;
+
+use crate::MetricsData;
+
+pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
+ if !data.values.is_object() {
+ bail!("invalid data");
+ }
+
+ let mut line = escape_measurement(&data.measurement);
+
+ for (key, value) in &data.tags {
+ write!(line, ",{}={}", escape_key(key), escape_key(value))?;
+ }
+
+ line.push(' ');
+
+ let mut first = true;
+ for (key, value) in data.values.as_object().unwrap().iter() {
+ match value {
+ Value::Object(_) => bail!("objects not supported"),
+ Value::Array(_) => bail!("arrays not supported"),
+ _ => {}
+ }
+ if !first {
+ line.push(',');
+ }
+ first = false;
+ write!(line, "{}={}", escape_key(key), value.to_string())?;
+ }
+
+ // nanosecond precision
+ writeln!(line, " {}", data.ctime * 1_000_000_000)?;
+
+ Ok(line)
+}
+
+fn escape_key(key: &str) -> String {
+ let key = key.replace(',', "\\,");
+ let key = key.replace('=', "\\=");
+ key.replace(' ', "\\ ")
+}
+
+fn escape_measurement(measurement: &str) -> String {
+ let measurement = measurement.replace(',', "\\,");
+ measurement.replace(' ', "\\ ")
+}
diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
new file mode 100644
index 0000000..9fb098e
--- /dev/null
+++ b/proxmox-metrics/src/lib.rs
@@ -0,0 +1,117 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Error};
+use serde::Serialize;
+use serde_json::Value;
+use tokio::sync::mpsc;
+
+mod influxdb;
+#[doc(inline)]
+pub use influxdb::{influxdb_http, influxdb_udp, test_influxdb_http, test_influxdb_udp};
+
+#[derive(Clone)]
+/// Structured data for the metric server
+pub struct MetricsData {
+ /// The category of measurements
+ pub measurement: String,
+ /// A list of to attach to the measurements
+ pub tags: HashMap<String, String>,
+ /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
+ pub values: Value,
+ /// The time of the measurement
+ pub ctime: i64,
+}
+
+impl MetricsData {
+ /// Convenient helper to create from references
+ pub fn new<V: Serialize>(
+ measurement: &str,
+ tags: &[(&str, &str)],
+ ctime: i64,
+ values: V,
+ ) -> Result<Self, Error> {
+ let mut new_tags = HashMap::new();
+ for (key, value) in tags {
+ new_tags.insert(key.to_string(), value.to_string());
+ }
+
+ Ok(Self {
+ measurement: measurement.to_string(),
+ tags: new_tags,
+ values: serde_json::to_value(values)?,
+ ctime,
+ })
+ }
+}
+
+/// Helper to send a list of [MetricsData] to a list of [Metrics]
+pub async fn send_data_to_channels(
+ values: &[Arc<MetricsData>],
+ connections: &[Metrics],
+) -> Vec<Result<(), Error>> {
+ let mut futures = Vec::with_capacity(connections.len());
+ for connection in connections {
+ futures.push(async move {
+ for data in values {
+ connection.send_data(Arc::clone(data)).await?
+ }
+ Ok::<(), Error>(())
+ });
+ }
+
+ futures::future::join_all(futures).await
+}
+
+/// Represents connection to the metric server which can be used to send data
+///
+/// You can send [MetricsData] by using [`Self::send_data()`], and to flush and
+/// finish the connection use [`Self::join`].
+///
+/// If dropped, it will abort the connection and not flush out buffered data.
+pub struct Metrics {
+ join_handle: Option<tokio::task::JoinHandle<Result<(), Error>>>,
+ channel: Option<mpsc::Sender<Arc<MetricsData>>>,
+}
+
+impl Drop for Metrics {
+ fn drop(&mut self) {
+ if let Some(join_handle) = self.join_handle.take() {
+ join_handle.abort();
+ }
+ }
+}
+
+impl Metrics {
+ /// Closes the queue and waits for the connection to send all remaining data
+ pub async fn join(mut self) -> Result<(), Error> {
+ if let Some(channel) = self.channel.take() {
+ drop(channel);
+ }
+ if let Some(join_handle) = self.join_handle.take() {
+ join_handle.await?
+ } else {
+ bail!("internal error: no join_handle")
+ }
+ }
+
+ /// Queues the given data to the metric server
+ pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
+ // return ok if we got no data to send
+ if let Value::Object(map) = &data.values {
+ if map.is_empty() {
+ return Ok(());
+ }
+ }
+
+ if let Some(channel) = &self.channel {
+ channel
+ .send(data)
+ .await
+ .map_err(|_| format_err!("receiver side closed"))?;
+ } else {
+ bail!("channel was already closed");
+ }
+ Ok(())
+ }
+}
--
2.30.2
More information about the pbs-devel
mailing list