[pdm-devel] [PATCH proxmox-datacenter-manager 07/25] metric collection: rework metric poll task

Lukas Wagner l.wagner at proxmox.com
Thu Feb 13 13:31:21 CET 2025


On  2025-02-12 16:57, Wolfgang Bumiller wrote:
> Looks good.
> One improvement/follow-up suggestion (JoinSet), some nitpicking &
> potential refactoring requests...
> 

Thanks for the review :)

I'll incorporate your suggestions in a v2, I already have some other minor changes
queued up for it (typos, punctuation, minor code style improvements that Maximiliano
pointed out to me informally across the desk ;) )


>> +
>> +    /// Run the metric collection task.
>> +    ///
>> +    /// This function does never return.
>> +    #[tracing::instrument(skip_all, name = "metric_collection_task")]
>> +    pub(super) async fn run(&mut self) {
>> +        let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
>> +
>> +        log::debug!(
>> +            "metric collection starting up. collection interval set to {} seconds",
> 
> ^ nit: using punctuation in the middle but continuing in lower case and
> not ending with punctuation 🤪

Good point, fixed for v2.

> 
>> +            self.settings.collection_interval_or_default()
>> +        );
>> +
>> +        loop {
>> +            let old_settings = self.settings.clone();
>> +            tokio::select! {
>> +                _ = timer.tick() => {
>> +                    // Reload settings in case they have changed in the meanwhile
>> +                    self.settings = Self::get_settings_or_default();
>> +
>> +                    log::debug!("starting metric collection from all remotes - triggered by timer");
> 
> Not sure if it's worth moving this into a `self.on_tick()` but...
> 

yeah, I think it makes sense, thanks.

>> +                    self.sleep_for_random_interval_offset().await;
>> +
>> +                    if let Some(remotes) = Self::load_remote_config() {
>> +                        let to_fetch = remotes.order.as_slice();
>> +                        self.fetch_remotes(&remotes, to_fetch).await;
>> +                    }
>> +                }
>> +
>> +                val = self.control_message_rx.recv() => {
>> +                    // Reload settings in case they have changed in the meanwhile
>> +                    self.settings = Self::get_settings_or_default();
>> +                    match val {
> 
> ...but I think this part should be factored out into a
> `self.handle_control_message(msg).await`
> 
> It gets indented quite deeply and it just makes sense IMO :)

Same :)

> 
>> +                        Some(ControlMsg::CollectSingleRemote(remote)) => {
>> +                            if let Some(remotes) = Self::load_remote_config() {
> 
> ^ Also this...
> 
>> +                                log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
>> +                                self.fetch_remotes(&remotes, &[remote]).await;
>> +                            }
>> +                        }
>> +                        Some(ControlMsg::CollectAllRemotes) => {
>> +                            if let Some(remotes) = Self::load_remote_config() {
> 
> ... is the same as this, so if this is factored out into a separate
> function, it could just early-out via a let-else there, as we don't need
> to call it when `val` is `None`, and then it's quite compact.

Ack - refactored it in a separate new commit at the end of the v2 series,
not worth the effort to cleanly fold this into this commit.

> 
>> +                                log::debug!("starting metric collection from all remotes - triggered by control message");
>> +                                self.fetch_remotes(&remotes, &remotes.order).await;
>> +                            }
>> +                        }
>> +                        _ => {},
>> +                    }
>> +                }
>> +            }
>> +
>> +            let interval = self.settings.collection_interval_or_default();
>> +
>> +            if old_settings.collection_interval_or_default() != interval {
>> +                log::info!(
>> +                    "metric collection interval changed to {} seconds, reloading timer",
>> +                    interval
>> +                );
>> +                timer = Self::setup_timer(interval);
>> +            }
>> +        }
>> +    }
>> +
>> +    async fn sleep_for_random_interval_offset(&self) {
>> +        let mut min = self.settings.min_interval_offset_or_default();
>> +        let max = self.settings.max_interval_offset_or_default();
>> +
>> +        if min > max {
>> +            log::warn!(
>> +                "min-interval-offset is larger than max-interval-offset ({min} > {max}) - \
>> +                capping it to max-interval-offset ({max})"
>> +            );
>> +            min = max;
>> +        }
>> +
>> +        let jitter = {
>> +            let mut rng = rand::thread_rng();
>> +            rng.gen_range(min..=max)
>> +        };
>> +        tokio::time::sleep(Duration::from_secs(jitter)).await;
>> +    }
>> +
> 
> ↑ and ↓ are crying out for a common fn with the variable name, min and
> max as parameters ;-)

I think the main reason that deterred me from doing that was the fact that the one function
sleeps for milliseconds and the other one for seconds. But this was also before I added
the 'min' capping logic with the log message, which was added only briefly before sending the patches,
before that the code dedup wasn't really worth it. I've changed it in v2, there we now have a 
    async fn sleep_for_random_millis(param_base: &str, mut min: u64, max: u64)
function with the call sites adapted.

Thanks!

> 
>> +    async fn sleep_for_random_connection_delay(settings: &CollectionSettings) {
>> +        let mut min = settings.min_connection_delay_or_default();
>> +        let max = settings.max_connection_delay_or_default();
>> +
>> +        if min > max {
>> +            log::warn!(
>> +                "min-collection-delay is larger than max-collection-delay ({min} > {max}) - \
>> +                capping it to max-collection-delay ({max})"
>> +            );
>> +            min = max;
>> +        }
>> +
>> +        let jitter = {
>> +            let mut rng = rand::thread_rng();
>> +            rng.gen_range(min..=max)
>> +        };
>> +
>> +        tokio::time::sleep(Duration::from_millis(jitter)).await;
>> +    }
>> +
>> +    fn get_settings_or_default() -> CollectionSettings {
>> +        // This function is a bit odd, but not sure if there is a much nicer
>> +        // way to do it. We want to fall back to defaults if
>> +        //   - the config file does not exist (no errors logged)
>> +        //   - if section type is wrong or the config failed to parse (log errors)
> 
> It gets a bit shorter and if the `_impl` returns a result+option.
> The match below can just `return settings` in the first case, and have
> an `Ok(None) => (),`, the `Err` only has the log line and the default is
> moved to after the match.
> 
> Alternatively a `default()` helper could also shorten it
> (`Ok(default())` in the `_impl` and
> `get_settings_impl().unwrap_or_elese(|err| { log(err); default() })` is
> also shorter :-)

Went for something like this for v2:

        get_settings_impl().unwrap_or_else(|err| {
            log::error!("... {err} ");
            CollectionSettings::new("default")
        })
 

>> +
>> +    /// Fetch metric data from a provided list of remotes concurrently.
>> +    /// The maximum number of concurrent connections is determined by
>> +    /// `max_concurrent_connections` in the [`CollectionSettings`]
>> +    /// instance in `self`.
>> +    async fn fetch_remotes(
>> +        &mut self,
>> +        config: &SectionConfigData<Remote>,
>> +        remotes_to_fetch: &[String],
>> +    ) {
>> +        let semaphore = Arc::new(Semaphore::new(
>> +            self.settings.max_concurrent_connections_or_default(),
>> +        ));
>> +        let mut handles = Vec::new();
> 
> Not sure how "robust" the spawning is wrt. ordering - and this doesn't
> need to happen in this patch (unless it's easy), but a follow-up could
> probably change this into a tokio `JoinSet`, that way the await-loop
> below can use `.join_next()`, so that for example if an early task
> happens to end up finishing last, the loop still collects the remaining
> logs in a more "temporally accurate" way...

Good point, didn't know about `JoinSet`.

I'll do that in a followup or as a separate commit in v2. 
Requires a couple changes in how we do logging/error handling, since
we cannot remember the remotes' name in the handle Vec anymore if we switch to a join set.

> 
>> +
>> +        for remote_name in remotes_to_fetch {
>> +            let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
>> +
>> +            // unwrap is okay here, acquire_* will only fail if `close` has been
>> +            // called on the semaphore.
>> +            let permit = semaphore.clone().acquire_owned().await.unwrap();
> 
> ^ Nit: where possible, I prefer `Arc::clone(&ptr)` just for the sake of
> not accidentally `.clone()`ing something large after later a refactor
> where the clones aren't in the patch context lines.

Thx, I also usually prefer Arc::clone, missed it this time. Fixed for v2.


>>  
>>      tokio::spawn(async move {
>> -        let task_scheduler = pin!(metric_collection_task(tx));
>> +        let metric_collection_task_future = pin!(async move {
>> +            let mut task =
>> +                MetricCollectionTask::new(metric_data_tx, trigger_collection_rx).unwrap();
>> +            task.run().await;
> 
> ^ nit: could drop the let binding (then it IMO linewraps a bit nicer,
> too)

When I was attempting to fix this I spotted the .unwrap() that I left in there and
hence decided to change it into

            match MetricCollectionTask::new(metric_data_tx, trigger_collection_rx) {
                Ok(mut task) => task.run().await,
                Err(err) => log::error!("could not start metric collection task: {err}"),
            }

in v2 to get rid of the unwrap, adding a bit more context to any potential error.


>> +/// Schedule metric collection for a given remote as soon as possible.
>> +///
>> +/// Has no effect if the tx end of the channel has not been initialized yet.
>> +/// Returns an error if the mpsc channel has been closed already.
>> +pub async fn trigger_metric_collection_for_remote(remote: &str) -> Result<(), Error> {
>> +    if let Some(sender) = CONTROL_MESSAGE_TX.get() {
> 
> ^ nit: This is the normal case (given that `CONTROL_MESSAGE_TX` gets
> initialized in `stark_task()` on startup, so IMO the function signature
> could already be `String` to allow the caller to move it (or `impl
> Into<String>` for this to be implicit) - both callers could currently
> move it ;-)
> 

thx, fixed for v2 :)


-- 
- Lukas





More information about the pdm-devel mailing list