[pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric collection: save metric data to RRD in separate task

Lukas Wagner l.wagner at proxmox.com
Wed Feb 12 15:32:06 CET 2025



On  2025-02-12 14:59, Wolfgang Bumiller wrote:
> On Tue, Feb 11, 2025 at 01:05:22PM +0100, Lukas Wagner wrote:
>> This is a preparation for concurrent metric collection. While the RRD
>> cache appears to be safe to concurrent access (it uses an RwLock to
>> protect the data), delegating all RRD writes to a separate tokio task
>> makes it IMO easier to reason about and understand. Furthermore, it
>> decouples the 'metric fetching' and 'metric
>> storing' parts, making it much easier to write tests for both parts
>> independently.
>>
>> For better code separation, this also splits out the new task into
>> a new submodule.
>>
>> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
>> ---
>>  server/src/metric_collection/mod.rs      | 102 +++++++----------------
>>  server/src/metric_collection/rrd_task.rs |  92 ++++++++++++++++++++
>>  2 files changed, 124 insertions(+), 70 deletions(-)
>>  create mode 100644 server/src/metric_collection/rrd_task.rs
>>
>> diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
>> index 5540f93..06ade5f 100644
>> --- a/server/src/metric_collection/mod.rs
>> +++ b/server/src/metric_collection/mod.rs
[ .. ]
> 
> Might be more readable using `Iterator::max()`:
> 
>     metrics.data.iter().map(|e| e.timestamp).max().unwrap_or(0);
> 
> (while this changes throughout the series, there's at least 1 instance
> of this pattern remaining at the end AFAICT)

Right, that's a tiny bit nicer to read. 
In the final code there is one occurrence left in the test code, I'll change it there.
> 
>>  
>> -                            for data_point in metrics.data {
>> -                                most_recent_timestamp =
>> -                                    most_recent_timestamp.max(data_point.timestamp);
>> -                                store_metric_pve(&remote_name_clone, &data_point);
>> -                            }
>> +                        sender
>> +                            .send(RrdStoreRequest::Pve {
>> +                                remote: remote_name_clone,
>> +                                metrics,
>> +                            })
>> +                            .await?;
>>  
>> -                            most_recent_timestamp
>> -                        })
>> -                        .await
>> +                        Ok::<i64, Error>(most_recent)
> 
> ^ This...
> 
>>                      }
>>                      RemoteType::Pbs => {
>>                          let client = connection::make_pbs_client(remote)?;
>>                          let metrics = client.metrics(Some(true), Some(start_time)).await?;
>>  
>> -                        // Involves some blocking file IO
>> -                        tokio::task::spawn_blocking(move || {
>> -                            let mut most_recent_timestamp = 0;
>> +                        let most_recent =
>> +                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
>>  
>> -                            for data_point in metrics.data {
>> -                                most_recent_timestamp =
>> -                                    most_recent_timestamp.max(data_point.timestamp);
>> -                                store_metric_pbs(&remote_name_clone, &data_point);
>> -                            }
>> +                        sender
>> +                            .send(RrdStoreRequest::Pbs {
>> +                                remote: remote_name_clone,
>> +                                metrics,
>> +                            })
>> +                            .await?;
>>  
>> -                            most_recent_timestamp
>> -                        })
>> -                        .await
>> +                        Ok::<i64, Error>(most_recent)
> 
> ^ ... and this are now the only values for a single `let` binding which
> now looks like:
> 
>     let x = big_always_Ok_code?; // `?` op on always-Ok()
>     Ok(x) // and re-OK-wrap
> 
> Could just drop the `Ok` and `?`.
> 

This changes in a later commit where overall error handling changes - so this
is not relevant in the final code.
Anyway, thanks for pointing it out.

>>                      }
>>                  }?;
>>  
>> @@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
>>          }
>>      }
>>  }
>> -
>> -fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
>> -    let name = format!(
>> -        "pve/{remote_name}/{id}/{metric}",
>> -        id = data_point.id,
>> -        metric = data_point.metric,
>> -    );
>> -
>> -    let data_source_type = match data_point.ty {
>> -        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
>> -        ClusterMetricsDataType::Counter => DataSourceType::Counter,
>> -        ClusterMetricsDataType::Derive => DataSourceType::Derive,
>> -    };
>> -
>> -    rrd_cache::update_value(
>> -        &name,
>> -        data_point.value,
>> -        data_point.timestamp,
>> -        data_source_type,
>> -    );
>> -}
>> -
>> -fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
>> -    let name = format!(
>> -        "pbs/{remote_name}/{id}/{metric}",
>> -        id = data_point.id,
>> -        metric = data_point.metric,
>> -    );
>> -
>> -    let data_source_type = match data_point.ty {
>> -        MetricDataType::Gauge => DataSourceType::Gauge,
>> -        MetricDataType::Counter => DataSourceType::Counter,
>> -        MetricDataType::Derive => DataSourceType::Derive,
>> -    };
>> -
>> -    rrd_cache::update_value(
>> -        &name,
>> -        data_point.value,
>> -        data_point.timestamp,
>> -        data_source_type,
>> -    );
>> -}
>> diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
>> new file mode 100644
>> index 0000000..fe19580
>> --- /dev/null
>> +++ b/server/src/metric_collection/rrd_task.rs
>> @@ -0,0 +1,92 @@
>> +use anyhow::Error;
>> +use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
>> +use proxmox_rrd::rrd::DataSourceType;
>> +use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
>> +use tokio::sync::mpsc::Receiver;
>> +
>> +use super::rrd_cache;
>> +
>> +/// Store requrest for the RRD task.
>> +pub(super) enum RrdStoreRequest {
>> +    /// Store PVE metrics.
>> +    Pve {
>> +        /// Remote name
>> +        remote: String,
>> +        /// Metric data
>> +        metrics: ClusterMetrics,
>> +    },
>> +    /// Store PBS metrics.
>> +    Pbs {
>> +        /// Remote name
>> +        remote: String,
>> +        /// Metric data
>> +        metrics: Metrics,
>> +    },
>> +}
>> +
>> +/// Task which stores received metrics in the RRD. Metric data is fed into
>> +/// this task via a MPSC channel.
>> +pub(super) async fn store_in_rrd_task(
>> +    mut receiver: Receiver<RrdStoreRequest>,
>> +) -> Result<(), Error> {
>> +    while let Some(msg) = receiver.recv().await {
>> +        //// Involves some blocking file IO
>> +        tokio::task::spawn_blocking(move || match msg {
>> +            RrdStoreRequest::Pve { remote, metrics } => {
>> +                for data_point in metrics.data {
>> +                    store_metric_pve(&remote, &data_point);
>> +                }
>> +            }
>> +            RrdStoreRequest::Pbs { remote, metrics } => {
>> +                for data_point in metrics.data {
>> +                    store_metric_pbs(&remote, &data_point);
>> +                }
>> +            }
>> +        })
>> +        .await?;
> 
> ^ This error won't be shown anywhere as this is the `spawn()`ed task's
> entry point, and the error will stop the task for the rest of the
> daemon's lifetime.

Good point, I'll drop the `?` and log the error, if it occurs.

> 
> In general I prefer "detached" tasks to always return `()`.
> 
> Perhaps we should have a helper for the
> `spawn_task_aborted_on_shutdown()` which enforces the `()` return type
> in its generic bounds as well as a version which simply logs errors if
> they are returned where we also warn in the docs that the task will then
> be over for the rest of the daemon's lifetime.
> 
>> +    }
>> +
>> +    Ok(())
>> +}
>> +
>> +fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
>> +    let name = format!(
>> +        "pve/{remote_name}/{id}/{metric}",
>> +        id = data_point.id,
>> +        metric = data_point.metric,
>> +    );
>> +
>> +    let data_source_type = match data_point.ty {
>> +        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
>> +        ClusterMetricsDataType::Counter => DataSourceType::Counter,
>> +        ClusterMetricsDataType::Derive => DataSourceType::Derive,
>> +    };
>> +
>> +    rrd_cache::update_value(
>> +        &name,
>> +        data_point.value,
>> +        data_point.timestamp,
>> +        data_source_type,
>> +    );
>> +}
>> +
>> +fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
>> +    let name = format!(
>> +        "pbs/{remote_name}/{id}/{metric}",
>> +        id = data_point.id,
>> +        metric = data_point.metric,
>> +    );
>> +
>> +    let data_source_type = match data_point.ty {
>> +        MetricDataType::Gauge => DataSourceType::Gauge,
>> +        MetricDataType::Counter => DataSourceType::Counter,
>> +        MetricDataType::Derive => DataSourceType::Derive,
>> +    };
>> +
>> +    rrd_cache::update_value(
>> +        &name,
>> +        data_point.value,
>> +        data_point.timestamp,
>> +        data_source_type,
>> +    );
>> +}
>> -- 
>> 2.39.5

-- 
- Lukas





More information about the pdm-devel mailing list