[pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache
Thomas Lamprecht
t.lamprecht at proxmox.com
Thu Mar 20 18:39:27 CET 2025
Am 14.03.25 um 15:12 schrieb Lukas Wagner:
> This commits changes the remote task module as follows:
>
> - Add a new background task for regular polling of task data
> Instead of triggering fetching of task data from the `get_tasks` function,
> which is usually called by an API handler, we move the fetching to a
> new background task. The task fetches the latest tasks from all remotes
> and stores them in the task cache in regular intervals (10 minutes).
> The `get_tasks` function itself only reads from the cache.
> The main rationale for this change is that for large setups, fetching
> tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
> each with a >100ms connection - adds up to minutes quickly).
> If we do this from within `get_tasks`, the API handler calling the
> function is also blocked for the entire time.
> The `get_tasks` API is called every couple of seconds by the UI the get
> a list of running remote tasks, so this *must* be quick.
>
> - Tracked tasks are also polled in the same background task, but with
> a short polling delay (10 seconds). Instead of polling the status specific
> tracked task UPID, we simply fetch *all* tasks since the tracked task started.
> While this increased the amount of transmitted data a bit for tracked tasks
> that run for a very long time, this new approach make the whole
> task tracking functionality much more elegant; it integrates better with
> the 'regular' task fetching which happens in long intervals.
>
> - Tasks are now stored in the new improved task cache implementation.
> This should make retrieving tasks much quicker and avoids
> unneeded disk IO.
The higher level description reads good to me and I plan to take a closer
look, but while applying this – with a trivial merge conflict in use
statements resolved due to Wolfgang's client series being applied earlier –
some smaller things stuck a bit out to me when skimming git log, a few
are rather in the bike shed area though, see below.
>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> Reviewed-by: Max Carrara <m.carrara at proxmox.com>
While I certainly encourage working together I'm not a really huge fan
of blind review (tags) where one cannot see any review comments at all,
it's a loss in information and can also result in some increased overhead.
> ---
> server/src/api/pve/lxc.rs | 10 +-
> server/src/api/pve/mod.rs | 4 +-
> server/src/api/pve/qemu.rs | 6 +-
> server/src/api/remote_tasks.rs | 11 +-
> server/src/bin/proxmox-datacenter-api.rs | 3 +-
> server/src/remote_tasks/mod.rs | 788 +++++++++++------------
> 6 files changed, 388 insertions(+), 434 deletions(-)
>
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 2062f2b7..48d54694 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -1,65 +1,106 @@
> -use std::{
> - collections::{HashMap, HashSet},
> - fs::File,
> - path::{Path, PathBuf},
> - sync::{LazyLock, RwLock},
> - time::Duration,
> -};
> +use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
>
> -use anyhow::Error;
> +use anyhow::{format_err, Error};
> +use nix::sys::stat::Mode;
> use pdm_api_types::{
> remotes::{Remote, RemoteType},
> RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
> };
> use proxmox_sys::fs::CreateOptions;
> use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
> -use serde::{Deserialize, Serialize};
> -use tokio::task::JoinHandle;
> +use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
> +use tokio::{sync::Semaphore, task::JoinSet};
>
> use crate::{api::pve, task_utils};
>
> mod task_cache;
>
> +const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
> +
> +const SECONDS_PER_MINUTE: u64 = 60;
> +const MINUTES_PER_HOUR: u64 = 60;
While technically correct this reads like a rate and makes it IMO a bit
harder to tell what is meant in below calculations.
I'd either use Duration from std, like e.g. Duration::from_mins(10).as_secs()
and Duration::from_hours(1).as_secs() respectively for REGULAR_REFRESH_S and
CHECK_ROTATE_S below or just 60 and 3600, those values are pretty much general
knowledge, which might be a part of the confusion potential I see in your
variant, as I basically expect that this has to be something more elaborate or
why would it go for this seemingly complex variant.
That said, using what the std lib already provides is totally fine here.
> +
> +/// Tick rate for the remote task fetching task.
> +/// This is also the rate at which we check on tracked tasks.
> +const TICK_RATE_S: u64 = 10;
> +
> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
> +/// task for this remote).
> +const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
IMO it has some tiny value to encode if it's a period or a rate in the
name, while I do not see much benefit in having "regular" in there (albeit
I can see where you come from to use that in the name).
const TASK_REFRESH_PERIOD_S: u64 = Duration::from_mins(10).as_secs();
or
const TASK_REFRESH_PERIOD_S: u64 = 60;
or if I'm already bike shedding s/REFRESH/POLL/ to be slightly shorter:
const TASK_POLL_PERIOD_S: u64 = 60;
> +/// Number of cycles until a regular refresh.
> +const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
> +
> +/// Check if we want to rotate once every hour.
> +const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
> +/// Number of cycles before we want to check if we should rotate the task archives.
> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
> +
> +/// Rotate once the most recent archive file is at least 24 hour old.
> +const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
Why no HOURS_PER_DAY? ;P
> +
> +/// Keep 7 days worth of tasks.
> +const KEEP_OLD_FILES: u64 = 7;
> +
> +/// Maximum number of concurrent connections per remote.
> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
> +/// consideration when accounting for the total number of connections.
> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
> +/// to 4 PVE remotes in parallel.
> +const MAX_CONNECTIONS: usize = 20;
> +
> +/// Maximum number of tasks to fetch from a single remote in one API call.
> +const MAX_TASKS_TO_FETCH: u64 = 5000;
> +
> +
> +/// Task which handles fetching remote tasks and task archive rotation.
> +/// This function never returns.
> +async fn remote_task_fetching_task() -> ! {
> + let mut cycle = 0u64;
> + let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
> + interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
> +
> + // We don't really care about catching up to missed tick, we just want
> + // a steady tick rate.
> + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
> +
> + if let Err(err) = init_cache().await {
> + log::error!("error when initialized task cache: {err}");
> + }
> +
> + loop {
> + interval.tick().await;
> + if let Err(err) = do_tick(cycle).await {
> + log::error!("error when fetching remote tasks: {err}");
> + }
> +
> + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
> + // better be safe and use .wrapping_add(1) :)
> + cycle = cycle.wrapping_add(1);
> + }
> +}
> +
> +/// Initialize the remote task cache with initial archive files, in case there are not
> +/// any archive files yet.
> +///
> +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
> +/// spaced `ROTATE_AFTER_S` seconds apart.
> +/// This allows us to immediately backfill remote task history when setting up a new PDM instance
> +/// without any prior task archive rotation.
> +async fn init_cache() -> Result<(), Error> {
> + tokio::task::spawn_blocking(|| {
> + let cache = get_cache()?;
> + cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
> + Ok(())
> + })
> + .await?
> +}
> +
> +/// Handle a single timer tick.
> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
> +async fn do_tick(cycle: u64) -> Result<(), Error> {
> + let cache = get_cache()?;
> +
> + if should_check_for_cache_rotation(cycle) {
> + log::debug!("checking if remote task archive should be rotated");
> + if rotate_cache(cache.clone()).await? {
> + log::info!("rotated remote task archive");
> + }
> + }
> +
> + let state = cache.read_state();
> +
> + let mut all_tasks = HashMap::new();
> +
> + let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
Talking names would be nice, we got a rather generically named semaphore
variable twice in this module, maybe use (respectively):
total_connection_semaphore or total_connection_limit
per_remote_semaphore or per_remote_limit
> + let mut join_set = JoinSet::new();
> +
> + let remotes = remotes_to_check(cycle, &state).await?;
> + for remote in remotes {
> + let since = get_cutoff_timestamp(&remote, &state);
> +
> + let permit = if remote.ty == RemoteType::Pve {
> + // Acquire multiple permits, for PVE remotes we want
> + // to multiple nodes in parallel.
> + //
> + // `.unwrap()` is safe, we never close the semaphore.
> + Arc::clone(&semaphore)
> + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
> + .await
> + .unwrap()
> + } else {
> + // For PBS remotes we only have a single outgoing connection
> + //
> + // `.unwrap()` is safe, we never close the semaphore.
> + Arc::clone(&semaphore).acquire_owned().await.unwrap()
> + };
> +
> + join_set.spawn(async move {
> + log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
> + let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
> + format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> + });
> +
> + drop(permit);
> + tasks
> + });
> + }
> +
> + while let Some(res) = join_set.join_next().await {
> + match res {
> + Ok(Ok((remote, request))) => {
> + all_tasks.insert(remote, request);
> + }
> + Ok(Err(err)) => log::error!("{err}"),
> + Err(err) => log::error!("could not join task fetching future: {err}"),
> + }
> + }
> +
> + if !all_tasks.is_empty() {
> + save_tasks(cache, all_tasks).await?;
> + }
> +
> + Ok(())
> +}
> +
> +/// Return list of remotes that are to be polled in this cycle.
some higher level description of what this means and what the rationale is
would be nice here.
> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
> + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
> +
> + let all = cycle % REGULAR_REFRESH_CYCLES == 0;
> +
> + if all {
> + Ok(config.sections.into_values().collect())
> + } else {
> + Ok(config
> + .sections
> + .into_iter()
> + .filter_map(|(name, remote)| {
> + if let Some(tracked) = state.tracked_tasks.get(&name) {
> + if !tracked.is_empty() {
> + Some(remote)
> + } else {
> + None
> + }
> + } else {
> + None
> + }
> + })
> + .collect())
> + }
> +}
> /// Fetch tasks (active and finished) from a remote
> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> +/// `since` is a UNIX timestamp (seconds).
> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
> let mut tasks = Vec::new();
>
> + let mut all_successful = true;
> +
> match remote.ty {
> RemoteType::Pve => {
> + let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> + let mut join_set = JoinSet::new();
> +
> let client = pve::connect(remote)?;
>
> // N+1 requests - we could use /cluster/tasks, but that one
btw. as I see it from the context, re-evaluating the limit of the broadcasted
tasks would be definitively an option too, on remotes with more than a handful
of nodes that might be quite a bit cheaper and has some benefits outside of the
PDM use case. At least some quick experimentation about actual impact of an
increase there would be warranted, albeit out of scope of this series as checking
separately is already the status quo.
> @@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> let params = ListTasks {
> // Include running tasks
> source: Some(ListTasksSource::All),
> - // TODO: How much task history do we want? Right now we just hard-code it
> - // to 7 days.
> - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> + since: Some(since),
> + // If `limit` is not provided, we only receive 50 tasks
> + limit: Some(MAX_TASKS_TO_FETCH),
> ..Default::default()
> };
>
> - let list = client.get_task_list(&node.node, params).await?;
> - let mapped = map_tasks(list, &remote.id)?;
> + let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
>
> - tasks.extend(mapped);
> + let r = remote.clone();
> +
> + join_set.spawn(async move {
> + let client = pve::connect(&r)?;
> + let task_list =
> + client
> + .get_task_list(&node.node, params)
> + .await
> + .map_err(|err| {
> + format_err!("remote '{}', node '{}': {err}", r.id, node.node)
> + })?;
> +
> + drop(permit);
> +
> + Ok::<Vec<_>, Error>(task_list)
> + });
> + }
> +
> + while let Some(res) = join_set.join_next().await {
> + match res {
> + Ok(Ok(list)) => {
> + let mapped = list.into_iter().filter_map(|task| {
> + match map_pve_task(task, &remote.id) {
> + Ok(task) => Some(task),
> + Err(err) => {
> + log::error!("could not map task data, skipping: {err}");
> + None
> + }
> + }
> + });
> +
> + tasks.extend(mapped);
> + }
> + Ok(Err(err)) => {
> + all_successful = false;
> + log::error!("could not fetch tasks: {err:?}");
> + }
> + Err(err) => return Err(err.into()),
> + }
> }
> }
> RemoteType::Pbs => {
> +/// Check if we are due for checking for cache rotation.
> +fn should_check_for_cache_rotation(cycle: u64) -> bool {
> + cycle % CHECK_ROTATE_CYCLES == 0
This basically depends on the fn getting called initially with cycle
being still zereo, as otherewise we would never rotate if a PDM gets
restarted/rebooted daily or more often, which can be a possibility for
use cases where one just has the PDM running during their worktime,
which is seldomly longer than a day.
I.e., it works fine as is, but I'd like to see that invariant state
a bit explicitly somewhere.
More information about the pdm-devel
mailing list