[pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Dec 14 14:51:23 CET 2021


On Tue, Dec 14, 2021 at 01:24:06PM +0100, Dominik Csapak wrote:
> influxdb (udp + http(s)) only for now
> 
> general architecture looks as follows:
> 
> "new" returns a MetricsChannel and a Future
> the channels can be used to push data in (it flushes automatically if
> it would be over the configured size (mtu/max_body_size))
> 
> and the future must be polled to actually send data to the servers.
> 
> so most often it would look like this:
>   let (future, channel) = InfluxDbHttp::new(..params..)?;
>   let handle = tokio::spawn(future);
>   channel.send_data(...).await?;
>   handle.await?;
> 
> when all channels go out of scope, all remaining data in the channel
> will be read and sent to the server
> 
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
>  Cargo.toml                            |   1 +
>  proxmox-metrics/Cargo.toml            |  20 ++++
>  proxmox-metrics/debian/changelog      |   5 +
>  proxmox-metrics/debian/copyright      |  16 +++
>  proxmox-metrics/debian/debcargo.toml  |   7 ++
>  proxmox-metrics/src/influxdb/http.rs  | 143 ++++++++++++++++++++++++++
>  proxmox-metrics/src/influxdb/mod.rs   |   7 ++
>  proxmox-metrics/src/influxdb/udp.rs   | 107 +++++++++++++++++++
>  proxmox-metrics/src/influxdb/utils.rs |  51 +++++++++
>  proxmox-metrics/src/lib.rs            |  92 +++++++++++++++++
>  10 files changed, 449 insertions(+)
>  create mode 100644 proxmox-metrics/Cargo.toml
>  create mode 100644 proxmox-metrics/debian/changelog
>  create mode 100644 proxmox-metrics/debian/copyright
>  create mode 100644 proxmox-metrics/debian/debcargo.toml
>  create mode 100644 proxmox-metrics/src/influxdb/http.rs
>  create mode 100644 proxmox-metrics/src/influxdb/mod.rs
>  create mode 100644 proxmox-metrics/src/influxdb/udp.rs
>  create mode 100644 proxmox-metrics/src/influxdb/utils.rs
>  create mode 100644 proxmox-metrics/src/lib.rs
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 8f85e08..4a458d2 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -6,6 +6,7 @@ members = [
>      "proxmox-http",
>      "proxmox-io",
>      "proxmox-lang",
> +    "proxmox-metrics",
>      "proxmox-router",
>      "proxmox-schema",
>      "proxmox-serde",
> diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
> new file mode 100644
> index 0000000..9ac50fe
> --- /dev/null
> +++ b/proxmox-metrics/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-metrics"
> +version = "0.1.0"
> +authors = ["Proxmox Support Team <support at proxmox.com>"]
> +edition = "2018"
> +license = "AGPL-3"
> +description = "Metrics Server export utilitites"
> +
> +exclude = [ "debian" ]
> +
> +[dependencies]
> +anyhow = "1.0"
> +tokio = { version = "1.0", features = [ "net", "sync" ] }
> +futures = "0.3"
> +serde = "1.0"
> +serde_json = "1.0"
> +http = "0.2"
> +hyper = "0.14"
> +openssl = "0.10"

Please sort the above, and separate the line below from the above group
with a newline

> +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
> diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
> new file mode 100644
> index 0000000..c02803b
> --- /dev/null
> +++ b/proxmox-metrics/debian/changelog
> @@ -0,0 +1,5 @@
> +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
> +
> +  * initial package
> +
> + -- Proxmox Support Team <support at proxmox.com>  Tue, 14 Dec 2021 08:56:54 +0100
> diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
> new file mode 100644
> index 0000000..5661ef6
> --- /dev/null
> +++ b/proxmox-metrics/debian/copyright
> @@ -0,0 +1,16 @@
> +Copyright (C) 2021 Proxmox Server Solutions GmbH
> +
> +This software is written by Proxmox Server Solutions GmbH <support at proxmox.com>
> +
> +This program is free software: you can redistribute it and/or modify
> +it under the terms of the GNU Affero General Public License as published by
> +the Free Software Foundation, either version 3 of the License, or
> +(at your option) any later version.
> +
> +This program is distributed in the hope that it will be useful,
> +but WITHOUT ANY WARRANTY; without even the implied warranty of
> +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +GNU Affero General Public License for more details.
> +
> +You should have received a copy of the GNU Affero General Public License
> +along with this program.  If not, see <http://www.gnu.org/licenses/>.
> diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
> new file mode 100644
> index 0000000..b7864cd
> --- /dev/null
> +++ b/proxmox-metrics/debian/debcargo.toml
> @@ -0,0 +1,7 @@
> +overlay = "."
> +crate_src_path = ".."
> +maintainer = "Proxmox Support Team <support at proxmox.com>"
> +
> +[source]
> +vcs_git = "git://git.proxmox.com/git/proxmox.git"
> +vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
> diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
> new file mode 100644
> index 0000000..8f1157d
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/http.rs
> @@ -0,0 +1,143 @@
> +use std::sync::Arc;
> +
> +use anyhow::{bail, Error};
> +use futures::{future::FutureExt, select};
> +use hyper::Body;
> +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
> +use tokio::sync::mpsc;
> +
> +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbHttp {
> +    client: SimpleHttp,
> +    _healthuri: http::Uri,
> +    writeuri: http::Uri,
> +    token: Option<String>,
> +    max_body_size: usize,
> +    data: String,
> +    data_channel: mpsc::Receiver<Arc<MetricsData>>,
> +    flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbHttp {
> +    pub fn new(
> +        https: bool,
> +        host: &str,
> +        port: u16,
> +        organization: &str,
> +        bucket: &str,
> +        token: Option<&str>,
> +        verify_tls: bool,
> +        max_body_size: usize,
> +    ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
> +        let (data_tx, data_rx) = mpsc::channel(1024);
> +        let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> +        let client = if verify_tls {
> +            SimpleHttp::with_options(SimpleHttpOptions::default())
> +        } else {
> +            let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
> +            ssl_connector.set_verify(SslVerifyMode::NONE);
> +            SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
> +        };
> +
> +        let authority = proxmox_http::uri::build_authority(host, port)?;
> +
> +        let writeuri = http::uri::Builder::new()
> +            .scheme(if https { "https" } else { "http" })
> +            .authority(authority.clone())
> +            .path_and_query(format!(
> +                "/api/v2/write?org={}&bucket={}",
> +                organization, bucket
> +            ))
> +            .build()?;
> +
> +        let healthuri = http::uri::Builder::new()
> +            .scheme(if https { "https" } else { "http" })
> +            .authority(authority)
> +            .path_and_query("/health")
> +            .build()?;
> +
> +        let this = Self {
> +            client,
> +            writeuri,
> +            _healthuri: healthuri,
> +            token: token.map(String::from),
> +            max_body_size,
> +            data: String::new(),
> +            data_channel: data_rx,
> +            flush_channel: flush_rx,
> +        };
> +
> +        let future = Box::pin(this.finish());
> +        let channel = MetricsChannel {
> +            data_channel: data_tx,
> +            flush_channel: flush_tx,
> +        };
> +        Ok((future, channel))
> +    }
> +
> +    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        let new_data = utils::format_influxdb_line(&data)?;
> +
> +        if self.data.len() + new_data.len() >= self.max_body_size {
> +            self.flush().await?;
> +        }
> +
> +        self.data.push_str(&new_data);
> +
> +        if self.data.len() >= self.max_body_size {
> +            self.flush().await?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    pub async fn flush(&mut self) -> Result<(), Error> {
> +        if self.data.is_empty() {
> +            return Ok(());
> +        }
> +        let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
> +
> +        if let Some(token) = &self.token {
> +            request = request.header("Authorization", format!("Token {}", token));
> +        }
> +
> +        let request = request.body(Body::from(self.data.split_off(0)))?;
> +
> +        let res = self.client.request(request).await?;
> +
> +        let status = res.status();
> +        if !status.is_success() {
> +            bail!("got bad status: {}", status);
> +        }
> +        Ok(())
> +    }
> +
> +    async fn finish(mut self) -> Result<(), Error> {
> +        loop {
> +            select! {

I wonder, don't you want to receive data & flushes in some kind of
order?
Wouldn't a single channel over an
`enum MetricsValue { Flush, Data(MetricsData) }`
make more sense?

> +                res = self.flush_channel.recv().fuse() => match res {
> +                    Some(_) => self.flush().await?,
> +                    None => break, // all senders gone
> +                },
> +                data = self.data_channel.recv().fuse() => match data {
> +                    Some(data) => self.add_data(data).await?,
> +                    None => break, // all senders gone
> +                },
> +            }
> +        }
> +
> +        // consume remaining data in channel
> +        while let Some(data) = self.data_channel.recv().await {
> +            self.add_data(data).await?;
> +        }
> +
> +        self.flush().await?;
> +
> +        Ok(())
> +    }
> +}
> diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
> new file mode 100644
> index 0000000..26a715c
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/mod.rs
> @@ -0,0 +1,7 @@
> +mod http;
> +pub use self::http::*;
> +
> +mod udp;
> +pub use udp::*;
> +
> +pub mod utils;
> diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
> new file mode 100644
> index 0000000..de2b0d5
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/udp.rs
> @@ -0,0 +1,107 @@
> +use std::sync::Arc;
> +
> +use anyhow::Error;
> +use futures::{future::FutureExt, select};
> +use tokio::net::UdpSocket;
> +use tokio::sync::mpsc;
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbUdp {
> +    address: String,
> +    conn: Option<tokio::net::UdpSocket>,
> +    mtu: u16,
> +    data: String,
> +    data_channel: mpsc::Receiver<Arc<MetricsData>>,
> +    flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbUdp {
> +    pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
> +        let (data_tx, data_rx) = mpsc::channel(1024);
> +        let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> +        let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" {
> +            format!("[{}]:{}", host, port)

Here you handle IPv6 but...

> +        } else {
> +            format!("{}:{}", host, port)
> +        };
> +
> +        let this = Self {
> +            address,
> +            conn: None,
> +            mtu: mtu.unwrap_or(1500),
> +            data: String::new(),
> +            data_channel: data_rx,
> +            flush_channel: flush_rx,
> +        };
> +
> +        let future = Box::pin(this.finish());
> +
> +        let channel = MetricsChannel {
> +            data_channel: data_tx,
> +            flush_channel: flush_tx,
> +        };
> +
> +        (future, channel)
> +    }
> +
> +    async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
> +        let conn = UdpSocket::bind("0.0.0.0:0").await?;

...here you're specifically binding to an IPv4 which will cause rust to
issue a `socket(AF_INET, ...)` syscall rather than `socket(AF_INET6,
...)` for IPv6.

> +        let addr = self.address.clone();
> +        conn.connect(addr).await?;
> +        Ok(conn)
> +    }
> +
> +    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        let new_data = utils::format_influxdb_line(&data)?;
> +
> +        if self.data.len() + new_data.len() >= (self.mtu as usize) {
> +            self.flush().await?;
> +        }
> +
> +        self.data.push_str(&new_data);

Is it possible for `new_data.len()` to be larger than the mtu? if so,
should this warn or something?

Otherwise the next flush below might become a problem?

> +
> +        if self.data.len() >= (self.mtu as usize) {
> +            self.flush().await?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    async fn flush(&mut self) -> Result<(), Error> {
> +        let conn = match self.conn.take() {
> +            Some(conn) => conn,
> +            None => self.connect().await?,
> +        };
> +
> +        conn.send(self.data.split_off(0).as_bytes()).await?;
> +        self.conn = Some(conn);
> +        Ok(())
> +    }
> +
> +    async fn finish(mut self) -> Result<(), Error> {
> +        loop {
> +            select! {
> +                res = self.flush_channel.recv().fuse() => match res {
> +                    Some(_) => self.flush().await?,
> +                    None => break, // all senders gone
> +                },
> +                data = self.data_channel.recv().fuse() => match data {
> +                    Some(data) => self.add_data(data).await?,
> +                    None => break, // all senders gone
> +                },
> +            }
> +        }
> +
> +        // consume remaining data in channel
> +        while let Some(data) = self.data_channel.recv().await {
> +            self.add_data(data).await?;
> +        }
> +
> +        self.flush().await?;
> +
> +        Ok(())
> +    }
> +}
> diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
> new file mode 100644
> index 0000000..bf391f9
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/utils.rs
> @@ -0,0 +1,51 @@
> +use anyhow::{bail, Error};
> +
> +use crate::MetricsData;
> +
> +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
> +    if !data.values.is_object() {
> +        bail!("invalid data");
> +    }
> +
> +    let mut line = escape_measurement(&data.measurement);
> +    line.push(',');
> +
> +    let tags = data.tags.iter().map(|(key, value)| {
> +        format!("{}={}", escape_key(&key), escape_key(&value))
> +    });
> +    line.push_str(&tags.collect::<Vec<String>>().join(","));

I'm not too fond of the temporary `Vec` here and below, maybe use
`line.extend()` with the ',' as part of the format string (",{}={}") or
skip even the temporary format and just

    for (key, value) in &data.tags {
        line.push(',')
        line.push_str(escape_key(&key))
        line.push('=')
        line.push_str(value)
    }

it's not really longer... alternatively, more readable and without the
temporary `String` would be `write!(line, ",{}={}", ...)?` etc.

> +
> +    line.push(' ');
> +
> +    let values = data.values.as_object().unwrap().iter().map(|(key, value)| {
> +        let value = if value.is_string() {
> +             escape_value(&value.to_string())

^ extra space? :P

> +        } else {
> +            value.to_string()
> +        };
> +        format!("{}={}", escape_key(&key), value)
> +    });
> +
> +    line.push_str(&values.collect::<Vec<String>>().join(","));
> +
> +    // nanosecond precision
> +    line.push_str(&format!(" {}\n", data.ctime*1_000_000_000));
> +    Ok(line)
> +}
> +
> +fn escape_key(key: &str) -> String {
> +    let key = key.replace(',', "\\,");
> +    let key = key.replace('=', "\\=");
> +    let key = key.replace(' ', "\\ ");
> +    key
> +}
> +
> +fn escape_measurement(measurement: &str) -> String {
> +    let measurement = measurement.replace(',', "\\,");
> +    let measurement = measurement.replace(' ', "\\ ");
> +    measurement
> +}
> +
> +fn escape_value(value: &str) -> String {
> +    format!("\"{}\"",value.replace('"', "\\\""))
> +}
> diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
> new file mode 100644
> index 0000000..0a76faa
> --- /dev/null
> +++ b/proxmox-metrics/src/lib.rs
> @@ -0,0 +1,92 @@
> +use std::collections::HashMap;
> +use std::pin::Pin;
> +use std::sync::Arc;
> +
> +use anyhow::{bail, format_err, Error};
> +use serde::Serialize;
> +use serde_json::Value;
> +use tokio::sync::mpsc;
> +
> +pub mod influxdb;
> +
> +#[derive(Clone)]
> +/// Structured data for the metric server
> +pub struct MetricsData {
> +    /// The category of measurements
> +    pub measurement: String,
> +    /// A list of to attach to the measurements
> +    pub tags: HashMap<String, String>,
> +    /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
> +    pub values: Value,
> +    /// The time of the measurement
> +    pub ctime: i64,
> +}
> +
> +impl MetricsData {
> +    /// Convenient helper to create from references
> +    pub fn new<V: Serialize>(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result<Self, Error> {
> +        let mut new_tags = HashMap::new();
> +        for (key, value) in tags {
> +            new_tags.insert(key.to_string(), value.to_string());
> +        }
> +
> +        Ok(Self{
> +            measurement: measurement.to_string(),
> +            tags: new_tags,
> +            values: serde_json::to_value(values)?,
> +            ctime,
> +        })
> +    }
> +}
> +
> +pub type MetricsServerFuture =
> +    Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
> +
> +#[derive(Clone)]
> +/// A channel to send data to the metric server
> +pub struct MetricsChannel {
> +    pub(crate) data_channel: mpsc::Sender<Arc<MetricsData>>,
> +    pub(crate) flush_channel: mpsc::Sender<()>,
> +}
> +
> +impl MetricsChannel {
> +    /// Queues the given data for the metric server. If the queue is full,
> +    /// flush and try again.
> +    pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        if let Err(err) = self.data_channel.try_send(data) {
> +            match err {
> +                mpsc::error::TrySendError::Full(data) => {
> +                    self.flush_channel.send(()).await?;
> +                    self.data_channel
> +                        .send(data)
> +                        .await
> +                        .map_err(|_| format_err!("error sending data"))?;
> +                }
> +                mpsc::error::TrySendError::Closed(_) => {
> +                    bail!("channel closed");
> +                }
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    /// Flush data to the metric server
> +    pub async fn flush(&self) -> Result<(), Error> {
> +        self.flush_channel.send(()).await?;
> +        Ok(())
> +    }
> +}
> +
> +pub async fn send_data_to_channels(values: &[Arc<MetricsData>], channels: &[MetricsChannel]) -> Vec<Result<(), Error>> {
> +    let mut futures = Vec::with_capacity(channels.len());
> +    for channel in channels {
> +        futures.push(async move {
> +            for data in values.into_iter() {

`.into_iter()` shouldn't be necessary, that's how `for` loops are
defined after all.

> +                channel.send_data(data.clone()).await?
> +            }
> +            Ok::<(), Error>(())
> +        });
> +    }
> +
> +    futures::future::join_all(futures).await
> +}
> -- 
> 2.30.2





More information about the pbs-devel mailing list