[pdm-devel] [PATCH proxmox-datacenter-manager v5 2/6] remote tasks: add background task for task polling, use new task cache

Lukas Wagner l.wagner at proxmox.com
Wed Jul 9 13:22:41 CEST 2025



On  2025-07-03 10:05, Lukas Wagner wrote:
>>> +}
>>> +
>>> +/// Handle a single timer tick.
>>> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
>>> +async fn do_tick(cycle: u64) -> Result<(), Error> {
>>> +    let cache = remote_tasks::get_cache()?;
>>> +
>>> +    if should_check_for_cache_rotation(cycle) {
>>> +        log::debug!("checking if remote task archive should be rotated");
>>> +        if rotate_cache(cache.clone()).await? {
>>> +            log::info!("rotated remote task archive");
>>> +        }
>>> +    }
>>> +
>>> +    let state = cache.read_state();
>>> +
>>> +    let mut all_tasks = HashMap::new();
>>> +
>>> +    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
>>> +    let mut join_set = JoinSet::new();
>>> +
>>> +    // Get a list of remotes that we should poll in this cycle.
>>> +    let remotes = remotes_to_check(cycle, &state).await?;
>>> +    for remote in remotes {
>>> +        let since = get_cutoff_timestamp(&remote, &state);
>>> +
>>> +        let permit = if remote.ty == RemoteType::Pve {
>>> +            // Acquire multiple permits, for PVE remotes we want
>>> +            // to multiple nodes in parallel.
>>> +            //
>>> +            // `.unwrap()` is safe, we never close the semaphore.
>>> +            Arc::clone(&total_connections_semaphore)
>>> +                .acquire_many_owned(CONNECTIONS_PER_PVE_REMOTE as u32)
>>> +                .await
>>> +                .unwrap()
>>
>> would it be possible to acquire the connection semaphores dynamicall inside the
>> `fetch_tasks` call up to the maximum?
>>
>> that way, we could e.g. connect to 20 remotes with one host in parallel
>> instead of always having maximum of 4 ?
>> (not sure about the tokio semaphore possibilities here)
>>
>> I'd still limit it to CONNECTIONS_PER_PVE_REMOTE for each remote,
>> but in case one remote has less nodes, we could utilize the connection count
>> for more remotes, doing more work in parallel.
> 
> IIRC there was some problem with allocating these on demand, I think there was some potential
> for a deadlock - though I can't come up with the 'why' right now. I'll check again and
> add some comment if I remember the reason again.
> 


Revisiting this again right now.

The problem is that for PVE remotes, fetching the tasks is a 2 step process. First, we have to
get a list of all nodes, and second, we have to connect to all nodes to get a list of the node's
tasks. The 'get list of nodes' should be guarded by a semaphore, because if you have a huge amount
of remotes, you don't want to connect to all of them at the same time.

This means, if we aquire the permits on demand, we'd have to (pseude code):

semaphore = Semaphore::new()

for remote in remotes {
  spawn(|| {
    single_permit = acquire(&semaphore)
    nodes = list_nodes()
    remaining_permits = acquire_many(&semaphore, min(nodes.len(), CONNECTIONS_PER_PVE_REMOTE) - 1)
    for node in nodes {
      // Fetch tasks from node concurrently
    }
    drop(single_permit)
    drop(remaining_permits)
  })
}

Since the inner part is execute for many remotes at once, it could happen that the
semaphore's  number of permits is already exhausted by the concurrent calls
to the first acquire, leading to a deadlock when the additional permits are requested.

This is why we need to request all permits in advance for now. I'll add some comment
to the code to document this.

Once we are based on trixie, we could use SemaphorePermit::split() [1] and then release the
permits we don't actually need.

[1] https://docs.rs/tokio/1.46.1/tokio/sync/struct.SemaphorePermit.html#method.split


-- 
- Lukas





More information about the pdm-devel mailing list