[pbs-devel] [PATCH proxmox v3 3/3] proxmox-metrics: implement metrics server client code

Dominik Csapak d.csapak at proxmox.com
Fri Dec 17 09:09:54 CET 2021


influxdb (udp + http(s)) only for now

general architecture looks as follows:

"new" returns a MetricsChannel and a Future
the channels can be used to push data in (it flushes automatically if
it would be over the configured size (mtu/max_body_size))

and the future must be polled to actually send data to the servers.

so most often it would look like this:
  let (future, channel) = InfluxDbHttp::new(..params..)?;
  let handle = tokio::spawn(future);
  channel.send_data(...).await?;
  handle.await?;

when all channels go out of scope, all remaining data in the channel
will be read and sent to the server

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 Cargo.toml                            |   1 +
 proxmox-metrics/Cargo.toml            |  20 +++++
 proxmox-metrics/debian/changelog      |   5 ++
 proxmox-metrics/debian/copyright      |  16 ++++
 proxmox-metrics/debian/debcargo.toml  |   7 ++
 proxmox-metrics/src/influxdb/http.rs  | 122 ++++++++++++++++++++++++++
 proxmox-metrics/src/influxdb/mod.rs   |   7 ++
 proxmox-metrics/src/influxdb/udp.rs   |  94 ++++++++++++++++++++
 proxmox-metrics/src/influxdb/utils.rs |  50 +++++++++++
 proxmox-metrics/src/lib.rs            |  89 +++++++++++++++++++
 10 files changed, 411 insertions(+)
 create mode 100644 proxmox-metrics/Cargo.toml
 create mode 100644 proxmox-metrics/debian/changelog
 create mode 100644 proxmox-metrics/debian/copyright
 create mode 100644 proxmox-metrics/debian/debcargo.toml
 create mode 100644 proxmox-metrics/src/influxdb/http.rs
 create mode 100644 proxmox-metrics/src/influxdb/mod.rs
 create mode 100644 proxmox-metrics/src/influxdb/udp.rs
 create mode 100644 proxmox-metrics/src/influxdb/utils.rs
 create mode 100644 proxmox-metrics/src/lib.rs

diff --git a/Cargo.toml b/Cargo.toml
index 8f85e08..4a458d2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,7 @@ members = [
     "proxmox-http",
     "proxmox-io",
     "proxmox-lang",
+    "proxmox-metrics",
     "proxmox-router",
     "proxmox-schema",
     "proxmox-serde",
diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
new file mode 100644
index 0000000..9ac50fe
--- /dev/null
+++ b/proxmox-metrics/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox-metrics"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support at proxmox.com>"]
+edition = "2018"
+license = "AGPL-3"
+description = "Metrics Server export utilitites"
+
+exclude = [ "debian" ]
+
+[dependencies]
+anyhow = "1.0"
+tokio = { version = "1.0", features = [ "net", "sync" ] }
+futures = "0.3"
+serde = "1.0"
+serde_json = "1.0"
+http = "0.2"
+hyper = "0.14"
+openssl = "0.10"
+proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
new file mode 100644
index 0000000..c02803b
--- /dev/null
+++ b/proxmox-metrics/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
+
+  * initial package
+
+ -- Proxmox Support Team <support at proxmox.com>  Tue, 14 Dec 2021 08:56:54 +0100
diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
new file mode 100644
index 0000000..5661ef6
--- /dev/null
+++ b/proxmox-metrics/debian/copyright
@@ -0,0 +1,16 @@
+Copyright (C) 2021 Proxmox Server Solutions GmbH
+
+This software is written by Proxmox Server Solutions GmbH <support at proxmox.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
new file mode 100644
index 0000000..b7864cd
--- /dev/null
+++ b/proxmox-metrics/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support at proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
new file mode 100644
index 0000000..df6c6a5
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/http.rs
@@ -0,0 +1,122 @@
+use std::sync::Arc;
+
+use anyhow::{bail, Error};
+use hyper::Body;
+use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
+use tokio::sync::mpsc;
+
+use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbHttp {
+    client: SimpleHttp,
+    _healthuri: http::Uri,
+    writeuri: http::Uri,
+    token: Option<String>,
+    max_body_size: usize,
+    data: String,
+    channel: mpsc::Receiver<Arc<MetricsData>>,
+}
+
+impl InfluxDbHttp {
+    pub fn new(
+        https: bool,
+        host: &str,
+        port: u16,
+        organization: &str,
+        bucket: &str,
+        token: Option<&str>,
+        verify_tls: bool,
+        max_body_size: usize,
+    ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
+        let (tx, rx) = mpsc::channel(1024);
+
+        let client = if verify_tls {
+            SimpleHttp::with_options(SimpleHttpOptions::default())
+        } else {
+            let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
+            ssl_connector.set_verify(SslVerifyMode::NONE);
+            SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
+        };
+
+        let authority = proxmox_http::uri::build_authority(host, port)?;
+
+        let writeuri = http::uri::Builder::new()
+            .scheme(if https { "https" } else { "http" })
+            .authority(authority.clone())
+            .path_and_query(format!(
+                "/api/v2/write?org={}&bucket={}",
+                organization, bucket
+            ))
+            .build()?;
+
+        let healthuri = http::uri::Builder::new()
+            .scheme(if https { "https" } else { "http" })
+            .authority(authority)
+            .path_and_query("/health")
+            .build()?;
+
+        let this = Self {
+            client,
+            writeuri,
+            _healthuri: healthuri,
+            token: token.map(String::from),
+            max_body_size,
+            data: String::new(),
+            channel: rx,
+        };
+
+        let future = Box::pin(this.finish());
+        let channel = MetricsChannel { channel: tx };
+        Ok((future, channel))
+    }
+
+    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+        let new_data = utils::format_influxdb_line(&data)?;
+
+        if self.data.len() + new_data.len() >= self.max_body_size {
+            self.flush().await?;
+        }
+
+        self.data.push_str(&new_data);
+
+        if self.data.len() >= self.max_body_size {
+            self.flush().await?;
+        }
+
+        Ok(())
+    }
+
+    pub async fn flush(&mut self) -> Result<(), Error> {
+        if self.data.is_empty() {
+            return Ok(());
+        }
+        let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
+
+        if let Some(token) = &self.token {
+            request = request.header("Authorization", format!("Token {}", token));
+        }
+
+        let request = request.body(Body::from(self.data.split_off(0)))?;
+
+        let res = self.client.request(request).await?;
+
+        let status = res.status();
+        if !status.is_success() {
+            bail!("got bad status: {}", status);
+        }
+        Ok(())
+    }
+
+    async fn finish(mut self) -> Result<(), Error> {
+        while let Some(data) = self.channel.recv().await {
+            self.add_data(data).await?;
+        }
+
+        self.flush().await?;
+
+        Ok(())
+    }
+}
diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
new file mode 100644
index 0000000..26a715c
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/mod.rs
@@ -0,0 +1,7 @@
+mod http;
+pub use self::http::*;
+
+mod udp;
+pub use udp::*;
+
+pub mod utils;
diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
new file mode 100644
index 0000000..c4187a6
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/udp.rs
@@ -0,0 +1,94 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use tokio::net::UdpSocket;
+use tokio::sync::mpsc;
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbUdp {
+    address: String,
+    conn: Option<tokio::net::UdpSocket>,
+    ipv6: bool,
+    mtu: u16,
+    data: String,
+    channel: mpsc::Receiver<Arc<MetricsData>>,
+}
+
+impl InfluxDbUdp {
+    pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
+        let (tx, rx) = mpsc::channel(1024);
+        let ipv6 = host.contains(':');
+
+        let address = if ipv6 && host.len() > 3 && &host[0..1] != "[" {
+            format!("[{}]:{}", host, port)
+        } else {
+            format!("{}:{}", host, port)
+        };
+
+        let this = Self {
+            address,
+            ipv6,
+            conn: None,
+            // empty ipv6 udp package needs 48 bytes, subtract 50 for safety
+            mtu: mtu.unwrap_or(1500) - 50,
+            data: String::new(),
+            channel: rx,
+        };
+
+        let future = Box::pin(this.finish());
+
+        let channel = MetricsChannel { channel: tx };
+
+        (future, channel)
+    }
+
+    async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
+        let conn = if self.ipv6 {
+            UdpSocket::bind("[::]:0").await?
+        } else {
+            UdpSocket::bind("0.0.0.0:0").await?
+        };
+        let addr = self.address.clone();
+        conn.connect(addr).await?;
+        Ok(conn)
+    }
+
+    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+        let new_data = utils::format_influxdb_line(&data)?;
+
+        if self.data.len() + new_data.len() >= (self.mtu as usize) {
+            self.flush().await?;
+        }
+
+        self.data.push_str(&new_data);
+
+        if self.data.len() >= (self.mtu as usize) {
+            self.flush().await?;
+        }
+
+        Ok(())
+    }
+
+    async fn flush(&mut self) -> Result<(), Error> {
+        let conn = match self.conn.take() {
+            Some(conn) => conn,
+            None => self.connect().await?,
+        };
+
+        conn.send(self.data.split_off(0).as_bytes()).await?;
+        self.conn = Some(conn);
+        Ok(())
+    }
+
+    async fn finish(mut self) -> Result<(), Error> {
+        while let Some(data) = self.channel.recv().await {
+            self.add_data(data).await?;
+        }
+
+        self.flush().await?;
+
+        Ok(())
+    }
+}
diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
new file mode 100644
index 0000000..3507e61
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/utils.rs
@@ -0,0 +1,50 @@
+use std::fmt::Write;
+
+use anyhow::{bail, Error};
+use serde_json::Value;
+
+use crate::MetricsData;
+
+pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
+    if !data.values.is_object() {
+        bail!("invalid data");
+    }
+
+    let mut line = escape_measurement(&data.measurement);
+
+    for (key, value) in &data.tags {
+        write!(line, ",{}={}", escape_key(key), escape_key(value))?;
+    }
+
+    line.push(' ');
+
+    let mut first = true;
+    for (key, value) in data.values.as_object().unwrap().iter() {
+        match value {
+            Value::Object(_) => bail!("objects not supported"),
+            Value::Array(_) => bail!("arrays not supported"),
+            _ => {}
+        }
+        if !first {
+            line.push(',');
+        }
+        first = false;
+        write!(line, "{}={}", escape_key(key), value.to_string())?;
+    }
+
+    // nanosecond precision
+    writeln!(line, " {}", data.ctime * 1_000_000_000)?;
+
+    Ok(line)
+}
+
+fn escape_key(key: &str) -> String {
+    let key = key.replace(',', "\\,");
+    let key = key.replace('=', "\\=");
+    key.replace(' ', "\\ ")
+}
+
+fn escape_measurement(measurement: &str) -> String {
+    let measurement = measurement.replace(',', "\\,");
+    measurement.replace(' ', "\\ ")
+}
diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
new file mode 100644
index 0000000..ba018fb
--- /dev/null
+++ b/proxmox-metrics/src/lib.rs
@@ -0,0 +1,89 @@
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use anyhow::{format_err, Error};
+use serde::Serialize;
+use serde_json::Value;
+use tokio::sync::mpsc;
+
+pub mod influxdb;
+
+#[derive(Clone)]
+/// Structured data for the metric server
+pub struct MetricsData {
+    /// The category of measurements
+    pub measurement: String,
+    /// A list of to attach to the measurements
+    pub tags: HashMap<String, String>,
+    /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
+    pub values: Value,
+    /// The time of the measurement
+    pub ctime: i64,
+}
+
+impl MetricsData {
+    /// Convenient helper to create from references
+    pub fn new<V: Serialize>(
+        measurement: &str,
+        tags: &[(&str, &str)],
+        ctime: i64,
+        values: V,
+    ) -> Result<Self, Error> {
+        let mut new_tags = HashMap::new();
+        for (key, value) in tags {
+            new_tags.insert(key.to_string(), value.to_string());
+        }
+
+        Ok(Self {
+            measurement: measurement.to_string(),
+            tags: new_tags,
+            values: serde_json::to_value(values)?,
+            ctime,
+        })
+    }
+}
+
+pub type MetricsServerFuture =
+    Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
+
+#[derive(Clone)]
+/// A channel to send data to the metric server
+pub struct MetricsChannel {
+    pub(crate) channel: mpsc::Sender<Arc<MetricsData>>,
+}
+
+impl MetricsChannel {
+    /// Queues the given data to the metric server
+    pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
+        // return ok if we got no data to send
+        if let Value::Object(map) = &data.values {
+            if map.is_empty() {
+                return Ok(());
+            }
+        }
+        self.channel
+            .send(data)
+            .await
+            .map_err(|_| format_err!("receiver side closed"))?;
+        Ok(())
+    }
+}
+
+/// Helper to send a list of MetricsData to a list of MetricsChannels
+pub async fn send_data_to_channels(
+    values: &[Arc<MetricsData>],
+    channels: &[MetricsChannel],
+) -> Vec<Result<(), Error>> {
+    let mut futures = Vec::with_capacity(channels.len());
+    for channel in channels {
+        futures.push(async move {
+            for data in values {
+                channel.send_data(Arc::clone(data)).await?
+            }
+            Ok::<(), Error>(())
+        });
+    }
+
+    futures::future::join_all(futures).await
+}
-- 
2.30.2






More information about the pbs-devel mailing list