[pdm-devel] [PATCH proxmox-datacenter-manager 5/8] remote tasks: add background task for task polling, use new task cache
Lukas Wagner
l.wagner at proxmox.com
Fri Mar 21 14:33:20 CET 2025
On 2025-03-20 18:39, Thomas Lamprecht wrote:
>> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
>> Reviewed-by: Max Carrara <m.carrara at proxmox.com>
>
> While I certainly encourage working together I'm not a really huge fan
> of blind review (tags) where one cannot see any review comments at all,
> it's a loss in information and can also result in some increased overhead.
As briefly stated in the cover letter, I asked Max for a quick review of my approach
before I posted this to the mailing list. Apart from some tiny code-style / doc improvements,
which I quickly folded into the existing commits, there was nothing major that
he brought up and hence he was okay with me including his R-b's right away.
In my opinion this reduced some friction and noise on the list.
That being said, I understand your position, because as of now it is unclear *what* has
been changed *why*.
I'll make sure that there is some evidence of the review on the list for cases like these
in the future.
>
>> ---
>> server/src/api/pve/lxc.rs | 10 +-
>> server/src/api/pve/mod.rs | 4 +-
>> server/src/api/pve/qemu.rs | 6 +-
>> server/src/api/remote_tasks.rs | 11 +-
>> server/src/bin/proxmox-datacenter-api.rs | 3 +-
>> server/src/remote_tasks/mod.rs | 788 +++++++++++------------
>> 6 files changed, 388 insertions(+), 434 deletions(-)
>>
>
>> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
>> index 2062f2b7..48d54694 100644
>> --- a/server/src/remote_tasks/mod.rs
>> +++ b/server/src/remote_tasks/mod.rs
>> @@ -1,65 +1,106 @@
>> -use std::{
>> - collections::{HashMap, HashSet},
>> - fs::File,
>> - path::{Path, PathBuf},
>> - sync::{LazyLock, RwLock},
>> - time::Duration,
>> -};
>> +use std::{collections::HashMap, path::Path, sync::Arc, time::Duration};
>>
>> -use anyhow::Error;
>> +use anyhow::{format_err, Error};
>> +use nix::sys::stat::Mode;
>> use pdm_api_types::{
>> remotes::{Remote, RemoteType},
>> RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
>> };
>> use proxmox_sys::fs::CreateOptions;
>> use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
>> -use serde::{Deserialize, Serialize};
>> -use tokio::task::JoinHandle;
>> +use task_cache::{AddTasks, GetTasks, State, TaskCache, TaskCacheItem};
>> +use tokio::{sync::Semaphore, task::JoinSet};
>>
>> use crate::{api::pve, task_utils};
>>
>> mod task_cache;
>>
>> +const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
>> +
>> +const SECONDS_PER_MINUTE: u64 = 60;
>> +const MINUTES_PER_HOUR: u64 = 60;
> While technically correct this reads like a rate and makes it IMO a bit
> harder to tell what is meant in below calculations.
>
> I'd either use Duration from std, like e.g. Duration::from_mins(10).as_secs()
> and Duration::from_hours(1).as_secs() respectively for REGULAR_REFRESH_S and
> CHECK_ROTATE_S below or just 60 and 3600, those values are pretty much general
> knowledge, which might be a part of the confusion potential I see in your
> variant, as I basically expect that this has to be something more elaborate or
> why would it go for this seemingly complex variant.
> That said, using what the std lib already provides is totally fine here.
>
Right, const-Rust is a bit of a blind-spot for me, I tend to forget about what you can do with it by now.
The `Duration::from_mins(...).as_secs() ` seems good, I'll change it accordingly.
>> +
>> +/// Tick rate for the remote task fetching task.
>> +/// This is also the rate at which we check on tracked tasks.
>> +const TICK_RATE_S: u64 = 10;
>> +
>> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
>> +/// task for this remote).
>> +const REGULAR_REFRESH_S: u64 = 10 * SECONDS_PER_MINUTE;
> IMO it has some tiny value to encode if it's a period or a rate in the
> name, while I do not see much benefit in having "regular" in there (albeit
> I can see where you come from to use that in the name).
>
> const TASK_REFRESH_PERIOD_S: u64 = Duration::from_mins(10).as_secs();
>
> or
>
> const TASK_REFRESH_PERIOD_S: u64 = 60;
>
> or if I'm already bike shedding s/REFRESH/POLL/ to be slightly shorter:
>
> const TASK_POLL_PERIOD_S: u64 = 60;
I think I'll use this one, thx!
>
>
>> +/// Number of cycles until a regular refresh.
>> +const REGULAR_REFRESH_CYCLES: u64 = REGULAR_REFRESH_S / TICK_RATE_S;
>> +
>> +/// Check if we want to rotate once every hour.
>> +const CHECK_ROTATE_S: u64 = SECONDS_PER_MINUTE * MINUTES_PER_HOUR;
>> +/// Number of cycles before we want to check if we should rotate the task archives.
>> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_S / TICK_RATE_S;
>> +
>> +/// Rotate once the most recent archive file is at least 24 hour old.
>> +const ROTATE_AFTER_S: u64 = 24 * MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
> Why no HOURS_PER_DAY? ;P
>
>> +
>> +/// Keep 7 days worth of tasks.
>> +const KEEP_OLD_FILES: u64 = 7;
>> +
>> +/// Maximum number of concurrent connections per remote.
>> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
>> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_REMOTE` is taken into
>> +/// consideration when accounting for the total number of connections.
>> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_REMOTE` is 5, we can connect
>> +/// to 4 PVE remotes in parallel.
>> +const MAX_CONNECTIONS: usize = 20;
>> +
>> +/// Maximum number of tasks to fetch from a single remote in one API call.
>> +const MAX_TASKS_TO_FETCH: u64 = 5000;
>> +
>
>> +
>> +/// Task which handles fetching remote tasks and task archive rotation.
>> +/// This function never returns.
>> +async fn remote_task_fetching_task() -> ! {
>> + let mut cycle = 0u64;
>> + let mut interval = tokio::time::interval(Duration::from_secs(TICK_RATE_S));
>> + interval.reset_at(task_utils::next_aligned_instant(TICK_RATE_S).into());
>> +
>> + // We don't really care about catching up to missed tick, we just want
>> + // a steady tick rate.
>> + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
>> +
>> + if let Err(err) = init_cache().await {
>> + log::error!("error when initialized task cache: {err}");
>> + }
>> +
>> + loop {
>> + interval.tick().await;
>> + if let Err(err) = do_tick(cycle).await {
>> + log::error!("error when fetching remote tasks: {err}");
>> + }
>> +
>> + // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
>> + // better be safe and use .wrapping_add(1) :)
>> + cycle = cycle.wrapping_add(1);
>> + }
>> +}
>> +
>> +/// Initialize the remote task cache with initial archive files, in case there are not
>> +/// any archive files yet.
>> +///
>> +/// Creates `KEEP_OLD_FILES` archive files, with each archive file's cut-off time
>> +/// spaced `ROTATE_AFTER_S` seconds apart.
>> +/// This allows us to immediately backfill remote task history when setting up a new PDM instance
>> +/// without any prior task archive rotation.
>> +async fn init_cache() -> Result<(), Error> {
>> + tokio::task::spawn_blocking(|| {
>> + let cache = get_cache()?;
>> + cache.init(proxmox_time::epoch_i64(), KEEP_OLD_FILES, ROTATE_AFTER_S)?;
>> + Ok(())
>> + })
>> + .await?
>> +}
>> +
>> +/// Handle a single timer tick.
>> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
>> +async fn do_tick(cycle: u64) -> Result<(), Error> {
>> + let cache = get_cache()?;
>> +
>> + if should_check_for_cache_rotation(cycle) {
>> + log::debug!("checking if remote task archive should be rotated");
>> + if rotate_cache(cache.clone()).await? {
>> + log::info!("rotated remote task archive");
>> + }
>> + }
>> +
>> + let state = cache.read_state();
>> +
>> + let mut all_tasks = HashMap::new();
>> +
>> + let semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>
> Talking names would be nice, we got a rather generically named semaphore
> variable twice in this module, maybe use (respectively):
>
> total_connection_semaphore or total_connection_limit
> per_remote_semaphore or per_remote_limit
Good point, I will do that!
>
>> + let mut join_set = JoinSet::new();
>> +
>> + let remotes = remotes_to_check(cycle, &state).await?;
>> + for remote in remotes {
>> + let since = get_cutoff_timestamp(&remote, &state);
>> +
>> + let permit = if remote.ty == RemoteType::Pve {
>> + // Acquire multiple permits, for PVE remotes we want
>> + // to multiple nodes in parallel.
>> + //
>> + // `.unwrap()` is safe, we never close the semaphore.
>> + Arc::clone(&semaphore)
>> + .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
>> + .await
>> + .unwrap()
>> + } else {
>> + // For PBS remotes we only have a single outgoing connection
>> + //
>> + // `.unwrap()` is safe, we never close the semaphore.
>> + Arc::clone(&semaphore).acquire_owned().await.unwrap()
>> + };
>> +
>> + join_set.spawn(async move {
>> + log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
>> + let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
>> + format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
>> + });
>> +
>> + drop(permit);
>> + tasks
>> + });
>> + }
>> +
>> + while let Some(res) = join_set.join_next().await {
>> + match res {
>> + Ok(Ok((remote, request))) => {
>> + all_tasks.insert(remote, request);
>> + }
>> + Ok(Err(err)) => log::error!("{err}"),
>> + Err(err) => log::error!("could not join task fetching future: {err}"),
>> + }
>> + }
>> +
>> + if !all_tasks.is_empty() {
>> + save_tasks(cache, all_tasks).await?;
>> + }
>> +
>> + Ok(())
>> +}
>> +
>> +/// Return list of remotes that are to be polled in this cycle.
>
> some higher level description of what this means and what the rationale is
> would be nice here.
Will do, thx!
>
>> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
>> + let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
>> +
>> + let all = cycle % REGULAR_REFRESH_CYCLES == 0;
>> +
>> + if all {
>> + Ok(config.sections.into_values().collect())
>> + } else {
>> + Ok(config
>> + .sections
>> + .into_iter()
>> + .filter_map(|(name, remote)| {
>> + if let Some(tracked) = state.tracked_tasks.get(&name) {
>> + if !tracked.is_empty() {
>> + Some(remote)
>> + } else {
>> + None
>> + }
>> + } else {
>> + None
>> + }
>> + })
>> + .collect())
>> + }
>> +}
>
>> /// Fetch tasks (active and finished) from a remote
>> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
>> +/// `since` is a UNIX timestamp (seconds).
>> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
>> let mut tasks = Vec::new();
>>
>> + let mut all_successful = true;
>> +
>> match remote.ty {
>> RemoteType::Pve => {
>> + let semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
>> + let mut join_set = JoinSet::new();
>> +
>> let client = pve::connect(remote)?;
>>
>> // N+1 requests - we could use /cluster/tasks, but that one
>
> btw. as I see it from the context, re-evaluating the limit of the broadcasted
> tasks would be definitively an option too, on remotes with more than a handful
> of nodes that might be quite a bit cheaper and has some benefits outside of the
> PDM use case. At least some quick experimentation about actual impact of an
> increase there would be warranted, albeit out of scope of this series as checking
> separately is already the status quo.
In addition to limited task history, the /cluster/tasks endpoint also does not support filtering (limit, since, etc.),
so we might have to modify that as well. But as you said, that's out of scope for this series
and can be improved later on.
>
>> @@ -134,16 +377,53 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
>> let params = ListTasks {
>> // Include running tasks
>> source: Some(ListTasksSource::All),
>> - // TODO: How much task history do we want? Right now we just hard-code it
>> - // to 7 days.
>> - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
>> + since: Some(since),
>> + // If `limit` is not provided, we only receive 50 tasks
>> + limit: Some(MAX_TASKS_TO_FETCH),
>> ..Default::default()
>> };
>>
>> - let list = client.get_task_list(&node.node, params).await?;
>> - let mapped = map_tasks(list, &remote.id)?;
>> + let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
>>
>> - tasks.extend(mapped);
>> + let r = remote.clone();
>> +
>> + join_set.spawn(async move {
>> + let client = pve::connect(&r)?;
>> + let task_list =
>> + client
>> + .get_task_list(&node.node, params)
>> + .await
>> + .map_err(|err| {
>> + format_err!("remote '{}', node '{}': {err}", r.id, node.node)
>> + })?;
>> +
>> + drop(permit);
>> +
>> + Ok::<Vec<_>, Error>(task_list)
>> + });
>> + }
>> +
>> + while let Some(res) = join_set.join_next().await {
>> + match res {
>> + Ok(Ok(list)) => {
>> + let mapped = list.into_iter().filter_map(|task| {
>> + match map_pve_task(task, &remote.id) {
>> + Ok(task) => Some(task),
>> + Err(err) => {
>> + log::error!("could not map task data, skipping: {err}");
>> + None
>> + }
>> + }
>> + });
>> +
>> + tasks.extend(mapped);
>> + }
>> + Ok(Err(err)) => {
>> + all_successful = false;
>> + log::error!("could not fetch tasks: {err:?}");
>> + }
>> + Err(err) => return Err(err.into()),
>> + }
>> }
>> }
>> RemoteType::Pbs => {
>
>> +/// Check if we are due for checking for cache rotation.
>> +fn should_check_for_cache_rotation(cycle: u64) -> bool {
>> + cycle % CHECK_ROTATE_CYCLES == 0
>
> This basically depends on the fn getting called initially with cycle
> being still zereo, as otherewise we would never rotate if a PDM gets
> restarted/rebooted daily or more often, which can be a possibility for
> use cases where one just has the PDM running during their worktime,
> which is seldomly longer than a day.
>
> I.e., it works fine as is, but I'd like to see that invariant state
> a bit explicitly somewhere.
Ack!
--
- Lukas
More information about the pdm-devel
mailing list