[pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache
Max Carrara
m.carrara at proxmox.com
Fri Mar 21 14:39:11 CET 2025
On Thu Mar 20, 2025 at 6:39 PM CET, Thomas Lamprecht wrote:
> Am 14.03.25 um 15:12 schrieb Lukas Wagner:
> > This commits changes the remote task module as follows:
> >
> > - Add a new background task for regular polling of task data
> > Instead of triggering fetching of task data from the `get_tasks` function,
> > which is usually called by an API handler, we move the fetching to a
> > new background task. The task fetches the latest tasks from all remotes
> > and stores them in the task cache in regular intervals (10 minutes).
> > The `get_tasks` function itself only reads from the cache.
> > The main rationale for this change is that for large setups, fetching
> > tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
> > each with a >100ms connection - adds up to minutes quickly).
> > If we do this from within `get_tasks`, the API handler calling the
> > function is also blocked for the entire time.
> > The `get_tasks` API is called every couple of seconds by the UI the get
> > a list of running remote tasks, so this *must* be quick.
> >
> > - Tracked tasks are also polled in the same background task, but with
> > a short polling delay (10 seconds). Instead of polling the status specific
> > tracked task UPID, we simply fetch *all* tasks since the tracked task started.
> > While this increased the amount of transmitted data a bit for tracked tasks
> > that run for a very long time, this new approach make the whole
> > task tracking functionality much more elegant; it integrates better with
> > the 'regular' task fetching which happens in long intervals.
> >
> > - Tasks are now stored in the new improved task cache implementation.
> > This should make retrieving tasks much quicker and avoids
> > unneeded disk IO.
>
> The higher level description reads good to me and I plan to take a closer
> look, but while applying this – with a trivial merge conflict in use
> statements resolved due to Wolfgang's client series being applied earlier –
> some smaller things stuck a bit out to me when skimming git log, a few
> are rather in the bike shed area though, see below.
>
> >
> > Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> > Reviewed-by: Max Carrara <m.carrara at proxmox.com>
>
> While I certainly encourage working together I'm not a really huge fan
> of blind review (tags) where one cannot see any review comments at all,
> it's a loss in information and can also result in some increased overhead.
Fair point! In that case, let me provide some more context here; it's
nothing really that major:
I mostly pointed out cases where `const`s would be beneficial instead of
having a magic number floating around and also suggested the use of a
type alias for unix timestamps; the latter was out of scope for this
series though (and mostly bikeshedding anyways). Also pointed out a typo
here and there in docstrings and suggested breaking up the original
`TaskCache::add_tasks` method (see patch 04, the one you see there is
the slimmed-down version).
That's pretty much it; everything else seemed quite fine to me.
>
> > ---
> > server/src/api/pve/lxc.rs | 10 +-
> > server/src/api/pve/mod.rs | 4 +-
> > server/src/api/pve/qemu.rs | 6 +-
> > server/src/api/remote_tasks.rs | 11 +-
> > server/src/bin/proxmox-datacenter-api.rs | 3 +-
> > server/src/remote_tasks/mod.rs | 788 +++++++++++------------
> > 6 files changed, 388 insertions(+), 434 deletions(-)
> >
>
> > diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> > index 2062f2b7..48d54694 100644
> > --- a/server/src/remote_tasks/mod.rs
> > +++ b/server/src/remote_tasks/mod.rs
> > @@ -1,65 +1,106 @@
> > -use std::{
> > - collections::{HashMap, HashSet},
> > - fs::File,
> > - path::{Path, PathBuf},
> > - sync::{LazyLock, RwLock},
> > - time::Duration,
> > -};
> > +use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
> >
> > -use anyhow::Error;
> > +use anyhow::{format_err, Error};
> > +use nix::sys::stat::Mode;
> > use pdm_api_types::{
> > remotes::{Remote, RemoteType},
> > RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
> > };
> > use proxmox_sys::fs::CreateOptions;
> > use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
> > -use serde::{Deserialize, Serialize};
> > -use tokio::task::JoinHandle;
> > +use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
> > +use tokio::{sync::Semaphore, task::JoinSet};
> >
> > use crate::{api::pve, task_utils};
> >
> > mod task_cache;
> >
> > +const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
> > +
> > +const SECONDS_PER_MINUTE: u64 = 60;
> > +const MINUTES_PER_HOUR: u64 = 60;
> While technically correct this reads like a rate and makes it IMO a bit
> harder to tell what is meant in below calculations.
>
> I'd either use Duration from std, like e.g. Duration::from_mins(10).as_secs()
> and Duration::from_hours(1).as_secs() respectively for REGULAR_REFRESH_S and
> CHECK_ROTATE_S below or just 60 and 3600, those values are pretty much general
> knowledge, which might be a part of the confusion potential I see in your
> variant, as I basically expect that this has to be something more elaborate or
> why would it go for this seemingly complex variant.
> That said, using what the std lib already provides is totally fine here.
>
> > +
> > +/// Tick rate for the remote task fetching task.
> > +/// This is also the rate at which we check on tracked tasks.
> > +const TICK_RATE_S: u64 = 10;
> > +
> > +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
> > +/// task for this remote).
> > +const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
> IMO it has some tiny value to encode if it's a period or a rate in the
> name, while I do not see much benefit in having "regular" in there (albeit
> I can see where you come from to use that in the name).
>
> const TASK_REFRESH_PERIOD_S: u64 = Duration::from_mins(10).as_secs();
>
> or
>
> const TASK_REFRESH_PERIOD_S: u64 = 60;
>
> or if I'm already bike shedding s/REFRESH/POLL/ to be slightly shorter:
>
> const TASK_POLL_PERIOD_S: u64 = 60;
>
>
> > +/// Number of cycles until a regular refresh.
> > +const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
> > +
> > +/// Check if we want to rotate once every hour.
> > +const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
> > +/// Number of cycles before we want to check if we should rotate the task archives.
> > +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
> > +
> > +/// Rotate once the most recent archive file is at least 24 hour old.
> > +const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
> Why no HOURS_PER_DAY? ;P
>
> > +
> > +/// Keep 7 days worth of tasks.
> > +const KEEP_OLD_FILES: u64 = 7;
> > +
> > +/// Maximum number of concurrent connections per remote.
> > +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
> > +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
> > +/// consideration when accounting for the total number of connections.
> > +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
> > +/// to 4 PVE remotes in parallel.
> > +const MAX_CONNECTIONS: usize = 20;
> > +
> > +/// Maximum number of tasks to fetch from a single remote in one API call.
> > +const MAX_TASKS_TO_FETCH: u64 = 5000;
> > +
>
> > +
> > +/// Task which handles fetching remote tasks and task archive rotation.
> > +/// This function never returns.
> > +async fn remote_task_fetching_task() -> ! {
> > + let mut cycle = 0u64;
> > + let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
> > + interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
> > +
> > + // We don't really care about catching up to missed tick, we just want
> > + // a steady tick rate.
> > + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
> > +
> > + if let Err(err) = init_cache().await {
> > + log::error!("error when initialized task cache: {err}");
> > + }
> > +
> > + loop {
> > + interval.tick().await;
> > + if let Err(err) = do_tick(cycle).await {
> > + log::error!("error when fetching remote tasks: {err}");
> > + }
> > +
> > + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
> > + // better be safe and use .wrapping_add(1) :)
> > + cycle = cycle.wrapping_add(1);
> > + }
> > +}
> > +
> > +/// Initialize the remote task cache with initial archive files, in case there are not
> > +/// any archive files yet.
> > +///
> > +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
> > +/// spaced `ROTATE_AFTER_S` seconds apart.
> > +/// This allows us to immediately backfill remote task history when setting up a new PDM instance
> > +/// without any prior task archive rotation.
> > +async fn init_cache() -> Result<(), Error> {
> > + tokio::task::spawn_blocking(|| {
> > + let cache = get_cache()?;
> > + cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
> > + Ok(())
> > + })
> > + .await?
> > +}
> > +
> > +/// Handle a single timer tick.
> > +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
> > +async fn do_tick(cycle: u64) -> Result<(), Error> {
> > + let cache = get_cache()?;
> > +
> > + if should_check_for_cache_rotation(cycle) {
> > + log::debug!("checking if remote task archive should be rotated");
> > + if rotate_cache(cache.clone()).await? {
> > + log::info!("rotated remote task archive");
> > + }
> > + }
> > +
> > + let state = cache.read_state();
> > +
> > + let mut all_tasks = HashMap::new();
> > +
> > + let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>
> Talking names would be nice, we got a rather generically named semaphore
> variable twice in this module, maybe use (respectively):
>
> total_connection_semaphore or total_connection_limit
> per_remote_semaphore or per_remote_limit
>
> > + let mut join_set = JoinSet::new();
> > +
> > + let remotes = remotes_to_check(cycle, &state).await?;
> > + for remote in remotes {
> > + let since = get_cutoff_timestamp(&remote, &state);
> > +
> > + let permit = if remote.ty == RemoteType::Pve {
> > + // Acquire multiple permits, for PVE remotes we want
> > + // to multiple nodes in parallel.
> > + //
> > + // `.unwrap()` is safe, we never close the semaphore.
> > + Arc::clone(&semaphore)
> > + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
> > + .await
> > + .unwrap()
> > + } else {
> > + // For PBS remotes we only have a single outgoing connection
> > + //
> > + // `.unwrap()` is safe, we never close the semaphore.
> > + Arc::clone(&semaphore).acquire_owned().await.unwrap()
> > + };
> > +
> > + join_set.spawn(async move {
> > + log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
> > + let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
> > + format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> > + });
> > +
> > + drop(permit);
> > + tasks
> > + });
> > + }
> > +
> > + while let Some(res) = join_set.join_next().await {
> > + match res {
> > + Ok(Ok((remote, request))) => {
> > + all_tasks.insert(remote, request);
> > + }
> > + Ok(Err(err)) => log::error!("{err}"),
> > + Err(err) => log::error!("could not join task fetching future: {err}"),
> > + }
> > + }
> > +
> > + if !all_tasks.is_empty() {
> > + save_tasks(cache, all_tasks).await?;
> > + }
> > +
> > + Ok(())
> > +}
> > +
> > +/// Return list of remotes that are to be polled in this cycle.
>
> some higher level description of what this means and what the rationale is
> would be nice here.
>
> > +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
> > + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
> > +
> > + let all = cycle % REGULAR_REFRESH_CYCLES == 0;
> > +
> > + if all {
> > + Ok(config.sections.into_values().collect())
> > + } else {
> > + Ok(config
> > + .sections
> > + .into_iter()
> > + .filter_map(|(name, remote)| {
> > + if let Some(tracked) = state.tracked_tasks.get(&name) {
> > + if !tracked.is_empty() {
> > + Some(remote)
> > + } else {
> > + None
> > + }
> > + } else {
> > + None
> > + }
> > + })
> > + .collect())
> > + }
> > +}
>
> > /// Fetch tasks (active and finished) from a remote
> > -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> > +/// `since` is a UNIX timestamp (seconds).
> > +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
> > let mut tasks = Vec::new();
> >
> > + let mut all_successful = true;
> > +
> > match remote.ty {
> > RemoteType::Pve => {
> > + let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> > + let mut join_set = JoinSet::new();
> > +
> > let client = pve::connect(remote)?;
> >
> > // N+1 requests - we could use /cluster/tasks, but that one
>
> btw. as I see it from the context, re-evaluating the limit of the broadcasted
> tasks would be definitively an option too, on remotes with more than a handful
> of nodes that might be quite a bit cheaper and has some benefits outside of the
> PDM use case. At least some quick experimentation about actual impact of an
> increase there would be warranted, albeit out of scope of this series as checking
> separately is already the status quo.
>
> > @@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> > let params = ListTasks {
> > // Include running tasks
> > source: Some(ListTasksSource::All),
> > - // TODO: How much task history do we want? Right now we just hard-code it
> > - // to 7 days.
> > - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> > + since: Some(since),
> > + // If `limit` is not provided, we only receive 50 tasks
> > + limit: Some(MAX_TASKS_TO_FETCH),
> > ..Default::default()
> > };
> >
> > - let list = client.get_task_list(&node.node, params).await?;
> > - let mapped = map_tasks(list, &remote.id)?;
> > + let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
> >
> > - tasks.extend(mapped);
> > + let r = remote.clone();
> > +
> > + join_set.spawn(async move {
> > + let client = pve::connect(&r)?;
> > + let task_list =
> > + client
> > + .get_task_list(&node.node, params)
> > + .await
> > + .map_err(|err| {
> > + format_err!("remote '{}', node '{}': {err}", r.id, node.node)
> > + })?;
> > +
> > + drop(permit);
> > +
> > + Ok::<Vec<_>, Error>(task_list)
> > + });
> > + }
> > +
> > + while let Some(res) = join_set.join_next().await {
> > + match res {
> > + Ok(Ok(list)) => {
> > + let mapped = list.into_iter().filter_map(|task| {
> > + match map_pve_task(task, &remote.id) {
> > + Ok(task) => Some(task),
> > + Err(err) => {
> > + log::error!("could not map task data, skipping: {err}");
> > + None
> > + }
> > + }
> > + });
> > +
> > + tasks.extend(mapped);
> > + }
> > + Ok(Err(err)) => {
> > + all_successful = false;
> > + log::error!("could not fetch tasks: {err:?}");
> > + }
> > + Err(err) => return Err(err.into()),
> > + }
> > }
> > }
> > RemoteType::Pbs => {
>
> > +/// Check if we are due for checking for cache rotation.
> > +fn should_check_for_cache_rotation(cycle: u64) -> bool {
> > + cycle % CHECK_ROTATE_CYCLES == 0
>
> This basically depends on the fn getting called initially with cycle
> being still zereo, as otherewise we would never rotate if a PDM gets
> restarted/rebooted daily or more often, which can be a possibility for
> use cases where one just has the PDM running during their worktime,
> which is seldomly longer than a day.
>
> I.e., it works fine as is, but I'd like to see that invariant state
> a bit explicitly somewhere.
>
>
> _______________________________________________
> pdm-devel mailing list
> pdm-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
More information about the pdm-devel
mailing list