[pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric collection: save metric data to RRD in separate task
Lukas Wagner
l.wagner at proxmox.com
Wed Feb 12 15:32:06 CET 2025
On 2025-02-12 14:59, Wolfgang Bumiller wrote:
> On Tue, Feb 11, 2025 at 01:05:22PM +0100, Lukas Wagner wrote:
>> This is a preparation for concurrent metric collection. While the RRD
>> cache appears to be safe to concurrent access (it uses an RwLock to
>> protect the data), delegating all RRD writes to a separate tokio task
>> makes it IMO easier to reason about and understand. Furthermore, it
>> decouples the 'metric fetching' and 'metric
>> storing' parts, making it much easier to write tests for both parts
>> independently.
>>
>> For better code separation, this also splits out the new task into
>> a new submodule.
>>
>> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
>> ---
>> server/src/metric_collection/mod.rs | 102 +++++++----------------
>> server/src/metric_collection/rrd_task.rs | 92 ++++++++++++++++++++
>> 2 files changed, 124 insertions(+), 70 deletions(-)
>> create mode 100644 server/src/metric_collection/rrd_task.rs
>>
>> diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
>> index 5540f93..06ade5f 100644
>> --- a/server/src/metric_collection/mod.rs
>> +++ b/server/src/metric_collection/mod.rs
[ .. ]
>
> Might be more readable using `Iterator::max()`:
>
> metrics.data.iter().map(|e| e.timestamp).max().unwrap_or(0);
>
> (while this changes throughout the series, there's at least 1 instance
> of this pattern remaining at the end AFAICT)
Right, that's a tiny bit nicer to read.
In the final code there is one occurrence left in the test code, I'll change it there.
>
>>
>> - for data_point in metrics.data {
>> - most_recent_timestamp =
>> - most_recent_timestamp.max(data_point.timestamp);
>> - store_metric_pve(&remote_name_clone, &data_point);
>> - }
>> + sender
>> + .send(RrdStoreRequest::Pve {
>> + remote: remote_name_clone,
>> + metrics,
>> + })
>> + .await?;
>>
>> - most_recent_timestamp
>> - })
>> - .await
>> + Ok::<i64, Error>(most_recent)
>
> ^ This...
>
>> }
>> RemoteType::Pbs => {
>> let client = connection::make_pbs_client(remote)?;
>> let metrics = client.metrics(Some(true), Some(start_time)).await?;
>>
>> - // Involves some blocking file IO
>> - tokio::task::spawn_blocking(move || {
>> - let mut most_recent_timestamp = 0;
>> + let most_recent =
>> + metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
>>
>> - for data_point in metrics.data {
>> - most_recent_timestamp =
>> - most_recent_timestamp.max(data_point.timestamp);
>> - store_metric_pbs(&remote_name_clone, &data_point);
>> - }
>> + sender
>> + .send(RrdStoreRequest::Pbs {
>> + remote: remote_name_clone,
>> + metrics,
>> + })
>> + .await?;
>>
>> - most_recent_timestamp
>> - })
>> - .await
>> + Ok::<i64, Error>(most_recent)
>
> ^ ... and this are now the only values for a single `let` binding which
> now looks like:
>
> let x = big_always_Ok_code?; // `?` op on always-Ok()
> Ok(x) // and re-OK-wrap
>
> Could just drop the `Ok` and `?`.
>
This changes in a later commit where overall error handling changes - so this
is not relevant in the final code.
Anyway, thanks for pointing it out.
>> }
>> }?;
>>
>> @@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
>> }
>> }
>> }
>> -
>> -fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
>> - let name = format!(
>> - "pve/{remote_name}/{id}/{metric}",
>> - id = data_point.id,
>> - metric = data_point.metric,
>> - );
>> -
>> - let data_source_type = match data_point.ty {
>> - ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
>> - ClusterMetricsDataType::Counter => DataSourceType::Counter,
>> - ClusterMetricsDataType::Derive => DataSourceType::Derive,
>> - };
>> -
>> - rrd_cache::update_value(
>> - &name,
>> - data_point.value,
>> - data_point.timestamp,
>> - data_source_type,
>> - );
>> -}
>> -
>> -fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
>> - let name = format!(
>> - "pbs/{remote_name}/{id}/{metric}",
>> - id = data_point.id,
>> - metric = data_point.metric,
>> - );
>> -
>> - let data_source_type = match data_point.ty {
>> - MetricDataType::Gauge => DataSourceType::Gauge,
>> - MetricDataType::Counter => DataSourceType::Counter,
>> - MetricDataType::Derive => DataSourceType::Derive,
>> - };
>> -
>> - rrd_cache::update_value(
>> - &name,
>> - data_point.value,
>> - data_point.timestamp,
>> - data_source_type,
>> - );
>> -}
>> diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
>> new file mode 100644
>> index 0000000..fe19580
>> --- /dev/null
>> +++ b/server/src/metric_collection/rrd_task.rs
>> @@ -0,0 +1,92 @@
>> +use anyhow::Error;
>> +use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
>> +use proxmox_rrd::rrd::DataSourceType;
>> +use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
>> +use tokio::sync::mpsc::Receiver;
>> +
>> +use super::rrd_cache;
>> +
>> +/// Store requrest for the RRD task.
>> +pub(super) enum RrdStoreRequest {
>> + /// Store PVE metrics.
>> + Pve {
>> + /// Remote name
>> + remote: String,
>> + /// Metric data
>> + metrics: ClusterMetrics,
>> + },
>> + /// Store PBS metrics.
>> + Pbs {
>> + /// Remote name
>> + remote: String,
>> + /// Metric data
>> + metrics: Metrics,
>> + },
>> +}
>> +
>> +/// Task which stores received metrics in the RRD. Metric data is fed into
>> +/// this task via a MPSC channel.
>> +pub(super) async fn store_in_rrd_task(
>> + mut receiver: Receiver<RrdStoreRequest>,
>> +) -> Result<(), Error> {
>> + while let Some(msg) = receiver.recv().await {
>> + //// Involves some blocking file IO
>> + tokio::task::spawn_blocking(move || match msg {
>> + RrdStoreRequest::Pve { remote, metrics } => {
>> + for data_point in metrics.data {
>> + store_metric_pve(&remote, &data_point);
>> + }
>> + }
>> + RrdStoreRequest::Pbs { remote, metrics } => {
>> + for data_point in metrics.data {
>> + store_metric_pbs(&remote, &data_point);
>> + }
>> + }
>> + })
>> + .await?;
>
> ^ This error won't be shown anywhere as this is the `spawn()`ed task's
> entry point, and the error will stop the task for the rest of the
> daemon's lifetime.
Good point, I'll drop the `?` and log the error, if it occurs.
>
> In general I prefer "detached" tasks to always return `()`.
>
> Perhaps we should have a helper for the
> `spawn_task_aborted_on_shutdown()` which enforces the `()` return type
> in its generic bounds as well as a version which simply logs errors if
> they are returned where we also warn in the docs that the task will then
> be over for the rest of the daemon's lifetime.
>
>> + }
>> +
>> + Ok(())
>> +}
>> +
>> +fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
>> + let name = format!(
>> + "pve/{remote_name}/{id}/{metric}",
>> + id = data_point.id,
>> + metric = data_point.metric,
>> + );
>> +
>> + let data_source_type = match data_point.ty {
>> + ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
>> + ClusterMetricsDataType::Counter => DataSourceType::Counter,
>> + ClusterMetricsDataType::Derive => DataSourceType::Derive,
>> + };
>> +
>> + rrd_cache::update_value(
>> + &name,
>> + data_point.value,
>> + data_point.timestamp,
>> + data_source_type,
>> + );
>> +}
>> +
>> +fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
>> + let name = format!(
>> + "pbs/{remote_name}/{id}/{metric}",
>> + id = data_point.id,
>> + metric = data_point.metric,
>> + );
>> +
>> + let data_source_type = match data_point.ty {
>> + MetricDataType::Gauge => DataSourceType::Gauge,
>> + MetricDataType::Counter => DataSourceType::Counter,
>> + MetricDataType::Derive => DataSourceType::Derive,
>> + };
>> +
>> + rrd_cache::update_value(
>> + &name,
>> + data_point.value,
>> + data_point.timestamp,
>> + data_source_type,
>> + );
>> +}
>> --
>> 2.39.5
--
- Lukas
More information about the pdm-devel
mailing list