[pbs-devel] [PATCH proxmox v3 3/3] proxmox-metrics: implement metrics server client code
Wolfgang Bumiller
w.bumiller at proxmox.com
Wed Jan 12 15:36:58 CET 2022
On Fri, Dec 17, 2021 at 09:09:54AM +0100, Dominik Csapak wrote:
> influxdb (udp + http(s)) only for now
>
> general architecture looks as follows:
>
> "new" returns a MetricsChannel and a Future
> the channels can be used to push data in (it flushes automatically if
> it would be over the configured size (mtu/max_body_size))
>
> and the future must be polled to actually send data to the servers.
>
> so most often it would look like this:
> let (future, channel) = InfluxDbHttp::new(..params..)?;
> let handle = tokio::spawn(future);
> channel.send_data(...).await?;
> handle.await?;
Sounds okay, but we already depend on tokio here, so maybe we should
consider always spawning and putting the spawn-handle and channel
together into a single helper.
The reason I'd like to do this is because I feel your above example is
not ideal. I'd prefer to enforce control over *aborting* the the spawned
task by wrapping it in `futures::future::abortable()` roughly like so:
struct Metrics {
join_handle: JoinHandle<_>,
abort_handle: AbortHandle,
channel: Sender<_>,
}
// convenience
impl Deref for Metrics {
type Target = Sender<_>;
fn deref(&self) -> &Sender { &self.channel }
}
impl Drop for Metrics {
fn drop(&mut self) {
self.abort_handle.abort();
}
}
impl Metrics {
pub async fn join(self) -> _ {
self.join_handle.join().await
}
}
>
> when all channels go out of scope, all remaining data in the channel
> will be read and sent to the server
>
> Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
> ---
> Cargo.toml | 1 +
> proxmox-metrics/Cargo.toml | 20 +++++
> proxmox-metrics/debian/changelog | 5 ++
> proxmox-metrics/debian/copyright | 16 ++++
> proxmox-metrics/debian/debcargo.toml | 7 ++
> proxmox-metrics/src/influxdb/http.rs | 122 ++++++++++++++++++++++++++
> proxmox-metrics/src/influxdb/mod.rs | 7 ++
> proxmox-metrics/src/influxdb/udp.rs | 94 ++++++++++++++++++++
> proxmox-metrics/src/influxdb/utils.rs | 50 +++++++++++
> proxmox-metrics/src/lib.rs | 89 +++++++++++++++++++
> 10 files changed, 411 insertions(+)
> create mode 100644 proxmox-metrics/Cargo.toml
> create mode 100644 proxmox-metrics/debian/changelog
> create mode 100644 proxmox-metrics/debian/copyright
> create mode 100644 proxmox-metrics/debian/debcargo.toml
> create mode 100644 proxmox-metrics/src/influxdb/http.rs
> create mode 100644 proxmox-metrics/src/influxdb/mod.rs
> create mode 100644 proxmox-metrics/src/influxdb/udp.rs
> create mode 100644 proxmox-metrics/src/influxdb/utils.rs
> create mode 100644 proxmox-metrics/src/lib.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 8f85e08..4a458d2 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -6,6 +6,7 @@ members = [
> "proxmox-http",
> "proxmox-io",
> "proxmox-lang",
> + "proxmox-metrics",
> "proxmox-router",
> "proxmox-schema",
> "proxmox-serde",
> diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
> new file mode 100644
> index 0000000..9ac50fe
> --- /dev/null
> +++ b/proxmox-metrics/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-metrics"
> +version = "0.1.0"
> +authors = ["Proxmox Support Team <support at proxmox.com>"]
> +edition = "2018"
> +license = "AGPL-3"
> +description = "Metrics Server export utilitites"
> +
> +exclude = [ "debian" ]
> +
> +[dependencies]
> +anyhow = "1.0"
> +tokio = { version = "1.0", features = [ "net", "sync" ] }
> +futures = "0.3"
> +serde = "1.0"
> +serde_json = "1.0"
> +http = "0.2"
> +hyper = "0.14"
> +openssl = "0.10"
> +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
> diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
> new file mode 100644
> index 0000000..c02803b
> --- /dev/null
> +++ b/proxmox-metrics/debian/changelog
> @@ -0,0 +1,5 @@
> +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
> +
> + * initial package
> +
> + -- Proxmox Support Team <support at proxmox.com> Tue, 14 Dec 2021 08:56:54 +0100
> diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
> new file mode 100644
> index 0000000..5661ef6
> --- /dev/null
> +++ b/proxmox-metrics/debian/copyright
> @@ -0,0 +1,16 @@
> +Copyright (C) 2021 Proxmox Server Solutions GmbH
> +
> +This software is written by Proxmox Server Solutions GmbH <support at proxmox.com>
> +
> +This program is free software: you can redistribute it and/or modify
> +it under the terms of the GNU Affero General Public License as published by
> +the Free Software Foundation, either version 3 of the License, or
> +(at your option) any later version.
> +
> +This program is distributed in the hope that it will be useful,
> +but WITHOUT ANY WARRANTY; without even the implied warranty of
> +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +GNU Affero General Public License for more details.
> +
> +You should have received a copy of the GNU Affero General Public License
> +along with this program. If not, see <http://www.gnu.org/licenses/>.
> diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
> new file mode 100644
> index 0000000..b7864cd
> --- /dev/null
> +++ b/proxmox-metrics/debian/debcargo.toml
> @@ -0,0 +1,7 @@
> +overlay = "."
> +crate_src_path = ".."
> +maintainer = "Proxmox Support Team <support at proxmox.com>"
> +
> +[source]
> +vcs_git = "git://git.proxmox.com/git/proxmox.git"
> +vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
> diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
> new file mode 100644
> index 0000000..df6c6a5
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/http.rs
> @@ -0,0 +1,122 @@
> +use std::sync::Arc;
> +
> +use anyhow::{bail, Error};
> +use hyper::Body;
> +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
> +use tokio::sync::mpsc;
> +
> +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbHttp {
> + client: SimpleHttp,
> + _healthuri: http::Uri,
> + writeuri: http::Uri,
> + token: Option<String>,
> + max_body_size: usize,
> + data: String,
> + channel: mpsc::Receiver<Arc<MetricsData>>,
> +}
> +
> +impl InfluxDbHttp {
> + pub fn new(
> + https: bool,
> + host: &str,
> + port: u16,
> + organization: &str,
> + bucket: &str,
> + token: Option<&str>,
> + verify_tls: bool,
> + max_body_size: usize,
> + ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
> + let (tx, rx) = mpsc::channel(1024);
> +
> + let client = if verify_tls {
> + SimpleHttp::with_options(SimpleHttpOptions::default())
> + } else {
> + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
> + ssl_connector.set_verify(SslVerifyMode::NONE);
> + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
> + };
> +
> + let authority = proxmox_http::uri::build_authority(host, port)?;
> +
> + let writeuri = http::uri::Builder::new()
> + .scheme(if https { "https" } else { "http" })
> + .authority(authority.clone())
> + .path_and_query(format!(
> + "/api/v2/write?org={}&bucket={}",
> + organization, bucket
> + ))
> + .build()?;
> +
> + let healthuri = http::uri::Builder::new()
> + .scheme(if https { "https" } else { "http" })
> + .authority(authority)
> + .path_and_query("/health")
> + .build()?;
> +
> + let this = Self {
> + client,
> + writeuri,
> + _healthuri: healthuri,
> + token: token.map(String::from),
> + max_body_size,
> + data: String::new(),
> + channel: rx,
> + };
> +
> + let future = Box::pin(this.finish());
> + let channel = MetricsChannel { channel: tx };
> + Ok((future, channel))
> + }
> +
> + async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> + let new_data = utils::format_influxdb_line(&data)?;
> +
> + if self.data.len() + new_data.len() >= self.max_body_size {
> + self.flush().await?;
> + }
> +
> + self.data.push_str(&new_data);
> +
> + if self.data.len() >= self.max_body_size {
> + self.flush().await?;
> + }
> +
> + Ok(())
> + }
> +
> + pub async fn flush(&mut self) -> Result<(), Error> {
> + if self.data.is_empty() {
> + return Ok(());
> + }
> + let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
> +
> + if let Some(token) = &self.token {
> + request = request.header("Authorization", format!("Token {}", token));
> + }
> +
> + let request = request.body(Body::from(self.data.split_off(0)))?;
> +
> + let res = self.client.request(request).await?;
> +
> + let status = res.status();
> + if !status.is_success() {
> + bail!("got bad status: {}", status);
> + }
> + Ok(())
> + }
> +
> + async fn finish(mut self) -> Result<(), Error> {
> + while let Some(data) = self.channel.recv().await {
> + self.add_data(data).await?;
> + }
> +
> + self.flush().await?;
> +
> + Ok(())
> + }
> +}
> diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
> new file mode 100644
> index 0000000..26a715c
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/mod.rs
> @@ -0,0 +1,7 @@
> +mod http;
> +pub use self::http::*;
> +
> +mod udp;
> +pub use udp::*;
> +
> +pub mod utils;
> diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
> new file mode 100644
> index 0000000..c4187a6
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/udp.rs
> @@ -0,0 +1,94 @@
> +use std::sync::Arc;
> +
> +use anyhow::Error;
> +use tokio::net::UdpSocket;
> +use tokio::sync::mpsc;
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbUdp {
> + address: String,
> + conn: Option<tokio::net::UdpSocket>,
> + ipv6: bool,
> + mtu: u16,
> + data: String,
> + channel: mpsc::Receiver<Arc<MetricsData>>,
> +}
> +
> +impl InfluxDbUdp {
> + pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
^ Also this is a weird api, it would probably make more sense to just
have an `influx_db_udp()` function in this case, similar to how
`mpsc::channel()` is a function returning `(mpsc::Sender, mpsc::Receiver)`,
we'd have
metrics::influx_db_udp() -> Matrics;
metrics::influx_db_http() -> Matrics;
And yet another IPv6 thing...
Do we *really* want to specify host and port separately?
Why not have 1 string and use `ToSocketAddrs`?
> + let (tx, rx) = mpsc::channel(1024);
> + let ipv6 = host.contains(':');
The reason I'm asking is because the above works for ipv6 addresses...
but...
the schema you introduce in PBS 2/6 uses DNS_NAME_OR_IP_SCHEMA for the
host, so the hostname can be, you know, a *name*...
which means you don't actually *know* whether it's IPv4 or IPv6...
> +
> + let address = if ipv6 && host.len() > 3 && &host[0..1] != "[" {
> + format!("[{}]:{}", host, port)
> + } else {
> + format!("{}:{}", host, port)
> + };
> +
> + let this = Self {
> + address,
> + ipv6,
> + conn: None,
> + // empty ipv6 udp package needs 48 bytes, subtract 50 for safety
> + mtu: mtu.unwrap_or(1500) - 50,
> + data: String::new(),
> + channel: rx,
> + };
> +
> + let future = Box::pin(this.finish());
> +
> + let channel = MetricsChannel { channel: tx };
> +
> + (future, channel)
> + }
> +
> + async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
Meaning this should actually iterate through the results of
tokio::net::lookup_host()
and do both bind() and connect() for each result one after the other
until they succeed...
> + let conn = if self.ipv6 {
> + UdpSocket::bind("[::]:0").await?
> + } else {
> + UdpSocket::bind("0.0.0.0:0").await?
> + };
> + let addr = self.address.clone();
> + conn.connect(addr).await?;
> + Ok(conn)
> + }
> +
> + async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> + let new_data = utils::format_influxdb_line(&data)?;
> +
> + if self.data.len() + new_data.len() >= (self.mtu as usize) {
> + self.flush().await?;
> + }
> +
> + self.data.push_str(&new_data);
> +
> + if self.data.len() >= (self.mtu as usize) {
> + self.flush().await?;
> + }
> +
> + Ok(())
> + }
> +
> + async fn flush(&mut self) -> Result<(), Error> {
> + let conn = match self.conn.take() {
> + Some(conn) => conn,
> + None => self.connect().await?,
> + };
> +
> + conn.send(self.data.split_off(0).as_bytes()).await?;
> + self.conn = Some(conn);
> + Ok(())
> + }
> +
> + async fn finish(mut self) -> Result<(), Error> {
> + while let Some(data) = self.channel.recv().await {
> + self.add_data(data).await?;
> + }
> +
> + self.flush().await?;
> +
> + Ok(())
> + }
> +}
> diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
> new file mode 100644
> index 0000000..3507e61
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/utils.rs
> @@ -0,0 +1,50 @@
> +use std::fmt::Write;
> +
> +use anyhow::{bail, Error};
> +use serde_json::Value;
> +
> +use crate::MetricsData;
> +
> +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
> + if !data.values.is_object() {
> + bail!("invalid data");
> + }
> +
> + let mut line = escape_measurement(&data.measurement);
> +
> + for (key, value) in &data.tags {
> + write!(line, ",{}={}", escape_key(key), escape_key(value))?;
> + }
> +
> + line.push(' ');
> +
> + let mut first = true;
> + for (key, value) in data.values.as_object().unwrap().iter() {
> + match value {
> + Value::Object(_) => bail!("objects not supported"),
> + Value::Array(_) => bail!("arrays not supported"),
> + _ => {}
> + }
> + if !first {
> + line.push(',');
> + }
> + first = false;
> + write!(line, "{}={}", escape_key(key), value.to_string())?;
> + }
> +
> + // nanosecond precision
> + writeln!(line, " {}", data.ctime * 1_000_000_000)?;
> +
> + Ok(line)
> +}
> +
> +fn escape_key(key: &str) -> String {
> + let key = key.replace(',', "\\,");
> + let key = key.replace('=', "\\=");
> + key.replace(' ', "\\ ")
> +}
> +
> +fn escape_measurement(measurement: &str) -> String {
> + let measurement = measurement.replace(',', "\\,");
> + measurement.replace(' ', "\\ ")
> +}
> diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
> new file mode 100644
> index 0000000..ba018fb
> --- /dev/null
> +++ b/proxmox-metrics/src/lib.rs
> @@ -0,0 +1,89 @@
> +use std::collections::HashMap;
> +use std::pin::Pin;
> +use std::sync::Arc;
> +
> +use anyhow::{format_err, Error};
> +use serde::Serialize;
> +use serde_json::Value;
> +use tokio::sync::mpsc;
> +
> +pub mod influxdb;
> +
> +#[derive(Clone)]
> +/// Structured data for the metric server
> +pub struct MetricsData {
> + /// The category of measurements
> + pub measurement: String,
> + /// A list of to attach to the measurements
> + pub tags: HashMap<String, String>,
> + /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
> + pub values: Value,
> + /// The time of the measurement
> + pub ctime: i64,
> +}
> +
> +impl MetricsData {
> + /// Convenient helper to create from references
> + pub fn new<V: Serialize>(
> + measurement: &str,
> + tags: &[(&str, &str)],
> + ctime: i64,
> + values: V,
> + ) -> Result<Self, Error> {
> + let mut new_tags = HashMap::new();
> + for (key, value) in tags {
> + new_tags.insert(key.to_string(), value.to_string());
> + }
> +
> + Ok(Self {
> + measurement: measurement.to_string(),
> + tags: new_tags,
> + values: serde_json::to_value(values)?,
> + ctime,
> + })
> + }
> +}
> +
> +pub type MetricsServerFuture =
> + Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
> +
> +#[derive(Clone)]
> +/// A channel to send data to the metric server
> +pub struct MetricsChannel {
> + pub(crate) channel: mpsc::Sender<Arc<MetricsData>>,
> +}
> +
> +impl MetricsChannel {
> + /// Queues the given data to the metric server
> + pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
> + // return ok if we got no data to send
> + if let Value::Object(map) = &data.values {
> + if map.is_empty() {
> + return Ok(());
> + }
> + }
> + self.channel
> + .send(data)
> + .await
> + .map_err(|_| format_err!("receiver side closed"))?;
> + Ok(())
> + }
> +}
> +
> +/// Helper to send a list of MetricsData to a list of MetricsChannels
> +pub async fn send_data_to_channels(
> + values: &[Arc<MetricsData>],
> + channels: &[MetricsChannel],
> +) -> Vec<Result<(), Error>> {
> + let mut futures = Vec::with_capacity(channels.len());
> + for channel in channels {
> + futures.push(async move {
> + for data in values {
> + channel.send_data(Arc::clone(data)).await?
> + }
> + Ok::<(), Error>(())
> + });
> + }
> +
> + futures::future::join_all(futures).await
> +}
> --
> 2.30.2
More information about the pbs-devel
mailing list