[pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache

Lukas Wagner l.wagner at proxmox.com
Thu Jul 3 10:05:47 CEST 2025


On  2025-05-14 17:27, Dominik Csapak wrote:
>> +
>> +/// Tick interval for the remote task fetching task.
>> +/// This is also the rate at which we check on tracked tasks.
>> +const TASK_REFRESH_INTERVAL: Duration = Duration::from_secs(10);
>> +
>> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
>> +/// task for this remote).
>> +const REGULAR_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
> 
> imho those two are confusingly documented, if 'REGULAR_..' is used
> for the normal interval, why does 'TASK_REFRE..' say its used too for that?
> 

I'll make sure to phrase it a bit more clear. Thanks!

>> +/// Number of cycles until a regular refresh.
>> +const REGULAR_REFRESH_CYCLES: u64 =
>> +    REGULAR_REFRESH_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Check if we want to rotate once every hour.
> 
> commas are important, one can read this sentence in two ways:
> 
> Check if we want to rotate, once every hour
> or
> Check if want to (rotate once every hour) (brackets for clarity)
> 
> IMHO a better way to write that is:
> 
> Check once every hour if we want to rotate.

Ack

> 
>> +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
>> +/// Number of cycles before we want to check if we should rotate the task archives.
>> +const CHECK_ROTATE_CYCLES: u64 = CHECK_ROTATE_INTERVAL.as_secs() / TASK_REFRESH_INTERVAL.as_secs();
>> +
>> +/// Rotate once the most recent archive file is at least 24 hour old.
>> +const ROTATE_AFTER: Duration = Duration::from_secs(24 * 3600);
>> +
>> +/// Keep 7 days worth of tasks.
>> +const KEEP_OLD_FILES: u64 = 7;
>> +
>> +/// Maximum number of concurrent connections per remote.
>> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
>> +/// Maximum number of total concurrent connections. `CONNECTIONS_PER_PVE_REMOTE` is taken into
>> +/// consideration when accounting for the total number of connections.
>> +/// For instance, if `MAX_CONNECTIONS` is 20 and `CONNECTIONS_PER_PVE_REMOTE` is 5, we can connect
>> +/// to 4 PVE remotes in parallel.
>> +const MAX_CONNECTIONS: usize = 20;
>> +
>> +/// Maximum number of tasks to fetch from a single remote in one API call.
>> +const MAX_TASKS_TO_FETCH: u64 = 5000;
>> +
>> +/// Start the remote task fetching task
>> +pub fn start_task() -> Result<(), Error> {
>> +    let api_uid = pdm_config::api_user()?.uid;
>> +    let api_gid = pdm_config::api_group()?.gid;
>> +    let file_options = CreateOptions::new()
>> +        .owner(api_uid)
>> +        .group(api_gid)
>> +        .perm(Mode::from_bits_truncate(0o0750));
>> +    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(file_options))?;
> 
> this should probably use the proxmox_product_config crate
> 

Ack, I'll check that out

>> +
>> +    tokio::spawn(async move {
>> +        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
>> +        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
>> +        futures::future::select(task_scheduler, abort_future).await;
>> +    });
>> +
>> +    Ok(())
>> +}
>> +
>> +/// Task which handles fetching remote tasks and task archive rotation.
>> +/// This function never returns.
>> +async fn remote_task_fetching_task() -> ! {
>> +    let mut cycle = 0u64;
>> +    let mut interval = tokio::time::interval(TASK_REFRESH_INTERVAL);
>> +    interval.reset_at(task_utils::next_aligned_instant(TASK_REFRESH_INTERVAL.as_secs()).into());
>> +
>> +    // We don't really care about catching up to missed tick, we just want
>> +    // a steady tick rate.
>> +    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
>> +
>> +    if let Err(err) = init_cache().await {
>> +        log::error!("error when initialized task cache: {err}");
>> +    }
>> +
>> +    loop {
>> +        interval.tick().await;
>> +        if let Err(err) = do_tick(cycle).await {
>> +            log::error!("error when fetching remote tasks: {err}");
>> +        }
>> +
>> +        // At a rate of one tick every 10s we wrap around in *only* 5 trillion years,
>> +        // better be safe and use .wrapping_add(1) :)
>> +        cycle = cycle.wrapping_add(1);
>> +    }
> 
> you do the cycle check here manually, but this can be bad, e.g.
> 
> if one cycle takes a long time (say 1 minute instead of a 10 seconds) and that
> regularly, you fetch the remotes not every 10 minutes (as the comment above would indicate)
> but only every hour
> 
> I guess you wanted to be on the safe side and not being too overly aggressive with the polling,
> but having the fetch/rotation interval be that dependent on the cycle duration seems
> also not very good to me.

Yeah, the idea was to combine the regular polling and the tracked task polling into a single task.
But I agree, it'd be probably better to use timestamps instead of cycle counts for keeping
track of what should be done when.

Thanks

> 
>> +}
>> +
>> +/// Handle a single timer tick.
>> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
>> +async fn do_tick(cycle: u64) -> Result<(), Error> {
>> +    let cache = remote_tasks::get_cache()?;
>> +
>> +    if should_check_for_cache_rotation(cycle) {
>> +        log::debug!("checking if remote task archive should be rotated");
>> +        if rotate_cache(cache.clone()).await? {
>> +            log::info!("rotated remote task archive");
>> +        }
>> +    }
>> +
>> +    let state = cache.read_state();
>> +
>> +    let mut all_tasks = HashMap::new();
>> +
>> +    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>> +    let mut join_set = JoinSet::new();
>> +
>> +    // Get a list of remotes that we should poll in this cycle.
>> +    let remotes = remotes_to_check(cycle, &state).await?;
>> +    for remote in remotes {
>> +        let since = get_cutoff_timestamp(&remote, &state);
>> +
>> +        let permit = if remote.ty == RemoteType::Pve {
>> +            // Acquire multiple permits, for PVE remotes we want
>> +            // to multiple nodes in parallel.
>> +            //
>> +            // `.unwrap()` is safe, we never close the semaphore.
>> +            Arc::clone(&total_connections_semaphore)
>> +                .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
>> +                .await
>> +                .unwrap()
> 
> would it be possible to acquire the connection semaphores dynamicall inside the
> `fetch_tasks` call up to the maximum?
> 
> that way, we could e.g. connect to 20 remotes with one host in parallel
> instead of always having maximum of 4 ?
> (not sure about the tokio semaphore possibilities here)
> 
> I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote,
> but in case one remote has less nodes, we could utilize the connection count
> for more remotes, doing more work in parallel.

IIRC there was some problem with allocating these on demand, I think there was some potential
for a deadlock - though I can't come up with the 'why' right now. I'll check again and
add some comment if I remember the reason again.

> 
>> +        } else {
>> +            // For PBS remotes we only have a single outgoing connection
>> +            //
>> +            // `.unwrap()` is safe, we never close the semaphore.
>> +            Arc::clone(&total_connections_semaphore)
>> +                .acquire_owned()
>> +                .await
>> +                .unwrap()
>> +        };
>> +
>> +        join_set.spawn(async move {
>> +            log::debug!("fetching remote tasks for '{}' since {since}", remote.id);
>> +            let tasks = fetch_tasks(&remote, since).await.map_err(|err| {
>> +                format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
>> +            });
>> +
>> +            drop(permit);
>> +            tasks
>> +        });
>> +    }
>> +
>> +
>> +/// Return list of remotes that are to be polled in this cycle.
>> +///
>> +/// If `cycle` is a multiple of `REGULAR_REFRESH_CYCLES`, the function will
>> +/// return all remotes from the remote config. This ensures that
>> +/// all remotes are polled at regular intervals.
>> +/// In any other case we only return remotes which currently have a tracked
>> +/// task.
>> +/// On daemon startup (when cycle is 0) we return all remotes to ensure
>> +/// that we get an up-to-date task list from all remotes.
>> +async fn remotes_to_check(cycle: u64, state: &State) -> Result<Vec<Remote>, Error> {
>> +    let (config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
>> +
>> +    let all = cycle % REGULAR_REFRESH_CYCLES == 0;
>> +
>> +    if all {
>> +        Ok(config.into_iter().map(|(_, section)| section).collect())
>> +    } else {
>> +        Ok(config
>> +            .into_iter()
>> +            .filter_map(|(name, remote)| {
>> +                if let Some(tracked) = state.tracked_tasks.get(&name) {
>> +                    if !tracked.is_empty() {
>> +                        Some(remote)
>> +                    } else {
>> +                        None
>> +                    }
>> +                } else {
>> +                    None
>> +                }
>> +            })
> 
> i think this could be more succinctly written as:
> 
> state
>     .tracked_tasks
>     .get(&name)
>     .and_then(|tracked| (!tracked.is_empty()).then_some(remote))
> 

Thanks, will do that.

>> +            .collect())
>> +    }
>> +}
>> +
>> +/// Get the timestamp from which on we should fetch tasks for a given remote.
>> +/// The returned timestamp is a UNIX timestamp (in seconds).
>> +fn get_cutoff_timestamp(remote: &Remote, state: &State) -> i64 {
>> +    let oldest_active = state.oldest_active_task.get(&remote.id).copied();
>> +    let youngest_archived = state.most_recent_archive_starttime.get(&remote.id).copied();
>> +
>> +    match (oldest_active, youngest_archived) {
>> +        (None, None) => 0,
>> +        (None, Some(youngest_archived)) => youngest_archived,
>> +        (Some(oldest_active), None) => oldest_active,
>> +        (Some(oldest_active), Some(youngest_active)) => oldest_active.min(youngest_active),
>> +    }
>> +}
>> +
>> +/// Rotate the task cache if necessary.
>> +///
>> +/// Returns Ok(true) the cache's files were rotated.
>> +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
>> +    tokio::task::spawn_blocking(move || {
>> +        cache.rotate(
>> +            proxmox_time::epoch_i64(),
>> +            ROTATE_AFTER.as_secs(),
>> +            KEEP_OLD_FILES,
>> +        )
>> +    })
>> +    .await?
>> +}
> 
> in pbs, we start a worker task for the log rotation, maybe we want here too ?
> 

Hmmm, do we have any guidelines when something should be a worker task and when not? 
I'm not opposed to it, but I'm just curious where to draw the line.
Also, being a worker task also implies that it can be cancelled, right? Does that make sense for
something like this?

>> +
>> +/// Fetch tasks (active and finished) from a remote
>> +/// `since` is a UNIX timestamp (seconds).
>> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<(String, AddTasks), Error> {
>> +    let mut tasks = Vec::new();
>> +
>> +    let mut all_successful = true;
>> +
>> +    match remote.ty {
>> +        RemoteType::Pve => {
>> +            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
>> +            let mut join_set = JoinSet::new();


-- 
- Lukas





More information about the pdm-devel mailing list