[pdm-devel] [RFC proxmox-datacenter-manager 2/2] remote task fetching: use ParallelFetcher helper

Dominik Csapak d.csapak at proxmox.com
Fri Aug 29 11:15:46 CEST 2025


some minor comments inline, rest LGTM

On 8/28/25 4:42 PM, Lukas Wagner wrote:
> This allows us to simplify the fetching logic quite a bit.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
>   .../tasks/remote_tasks.rs                     | 239 ++++++------------
>   1 file changed, 77 insertions(+), 162 deletions(-)
> 
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> index 04c51dac..967e633c 100644
> --- a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> @@ -17,6 +17,7 @@ use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
>   
>   use server::{
>       api::pve,
> +    parallel_fetcher::{NodeResults, ParallelFetcher},
>       remote_tasks::{
>           self,
>           task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
> @@ -185,12 +186,7 @@ async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
>           get_remotes_with_finished_tasks(&remote_config, &poll_results)
>       };
>   
> -    let (all_tasks, update_state_for_remote) = fetch_remotes(
> -        remotes,
> -        Arc::new(cache_state),
> -        Arc::clone(&total_connections_semaphore),
> -    )
> -    .await;
> +    let (all_tasks, update_state_for_remote) = fetch_remotes(remotes, Arc::new(cache_state)).await;
>   
>       if !all_tasks.is_empty() {
>           update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
> @@ -221,42 +217,90 @@ async fn init_cache() -> Result<(), Error> {
>   async fn fetch_remotes(
>       remotes: Vec<Remote>,
>       cache_state: Arc<State>,
> -    total_connections_semaphore: Arc<Semaphore>,
>   ) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
> -    let mut join_set = JoinSet::new();
> +    let fetcher = ParallelFetcher {
> +        max_connections: MAX_CONNECTIONS,
> +        max_connections_per_remote: CONNECTIONS_PER_PVE_REMOTE,
> +        context: cache_state,
> +    };
>   
> -    for remote in remotes {
> -        let semaphore = Arc::clone(&total_connections_semaphore);
> -        let state_clone = Arc::clone(&cache_state);
> -
> -        join_set.spawn(async move {
> -            log::debug!("fetching remote tasks for '{}'", remote.id);
> -            fetch_tasks(&remote, state_clone, semaphore)
> -                .await
> -                .map_err(|err| {
> -                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> -                })
> -        });
> -    }
> +    let fetch_results = fetcher
> +        .do_for_all_remote_nodes(remotes.into_iter(), fetch_tasks_from_single_node)
> +        .await;
>   
>       let mut all_tasks = Vec::new();
> -    let mut update_state_for_remote = NodeFetchSuccessMap::default();
> +    let mut node_success_map = NodeFetchSuccessMap::default();
>   
> -    while let Some(res) = join_set.join_next().await {
> -        match res {
> -            Ok(Ok(FetchedTasks {
> -                tasks,
> -                node_results,
> -            })) => {
> -                all_tasks.extend(tasks);
> -                update_state_for_remote.merge(node_results);
> +    for (remote_name, result) in fetch_results.remote_results {
> +        match result {
> +            Ok(remote_result) => {
> +                for (node_name, node_result) in remote_result.node_results {
> +                    match node_result {
> +                        Ok(NodeResults { data, .. }) => {
> +                            all_tasks.extend(data);
> +                            node_success_map.set_node_success(remote_name.clone(), node_name);
> +                        }
> +                        Err(err) => {
> +                            log::error!("could not fetch tasks from remote '{remote_name}', node {node_name}: {err:#}");
> +                        }
> +                    }
> +                }
> +            }
> +            Err(err) => {
> +                log::error!("could not fetch tasks from remote '{remote_name}': {err:#}");
>               }
> -            Ok(Err(err)) => log::error!("{err:#}"),
> -            Err(err) => log::error!("could not join task fetching future: {err:#}"),
>           }
>       }
>   
> -    (all_tasks, update_state_for_remote)
> +    (all_tasks, node_success_map)
> +}
> +
> +async fn fetch_tasks_from_single_node(
> +    context: Arc<State>,
> +    remote: Remote,
> +    node: String,
> +) -> Result<Vec<TaskCacheItem>, Error> {
> +    match remote.ty {
> +        RemoteType::Pve => {
> +            let since = context
> +                .cutoff_timestamp(&remote.id, &node)
> +                .unwrap_or_else(|| {
> +                    proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
> +                });
> +
> +            let params = ListTasks {
> +                source: Some(ListTasksSource::Archive),
> +                since: Some(since),
> +                // If `limit` is not provided, we only receive 50 tasks
> +                limit: Some(MAX_TASKS_TO_FETCH),
> +                ..Default::default()
> +            };
> +
> +            let remote_clone = remote.clone();
> +            let client = pve::connect(&remote_clone)?;

this clone can simply be omitted by using remote directly

> +            let task_list = client.get_task_list(&node, params).await.map_err(|err| {
> +                format_err!("remote '{}', node '{}': {err}", remote_clone.id, node)
> +            })?;
> +
> +            let task_list = task_list
> +                .into_iter()
> +                .map(|task| map_pve_task(task, &remote.id))
> +                .filter_map(|task| match task {
> +                    Ok(task) => Some(task),
> +                    Err(err) => {
> +                        log::error!("could not map PVE task: {err:#}");
> +                        None
> +                    }
> +                })
> +                .collect();

two things here:

you could do just one filter_map calling map_pve_task inside

and you can simply append `into_iter().filter_map(..)` on the original 
get_task_list call. no need for the extra `let task_list = task_list....`

> +
> +            Ok(task_list)
> +        }
> +        RemoteType::Pbs => {
> +            // TODO: Support PBS.
> +            Ok(vec![])
> +        }
> +    }
>   }
>   
>   /// Return all remotes from the given config.
> @@ -301,135 +345,6 @@ async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
>       tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
>   }
>   
> -/// Fetched tasks from a single remote.
> -struct FetchedTasks {
> -    /// List of tasks.
> -    tasks: Vec<TaskCacheItem>,
> -    /// Contains whether a cluster node was fetched successfully.
> -    node_results: NodeFetchSuccessMap,
> -}
> -
> -/// Fetch tasks (active and finished) from a remote.
> -async fn fetch_tasks(
> -    remote: &Remote,
> -    state: Arc<State>,
> -    total_connections_semaphore: Arc<Semaphore>,
> -) -> Result<FetchedTasks, Error> {
> -    let mut tasks = Vec::new();
> -
> -    let mut node_results = NodeFetchSuccessMap::default();
> -
> -    match remote.ty {
> -        RemoteType::Pve => {
> -            let client = pve::connect(remote)?;
> -
> -            let nodes = {
> -                // This permit *must* be dropped before we acquire the permits for the
> -                // per-node connections - otherwise we risk a deadlock.
> -                let _permit = total_connections_semaphore.acquire().await.unwrap();
> -                client.list_nodes().await?
> -            };
> -
> -            // This second semaphore is used to limit the number of concurrent connections
> -            // *per remote*, not in total.
> -            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> -            let mut join_set = JoinSet::new();
> -
> -            for node in nodes {
> -                let node_name = node.node.to_string();
> -
> -                let since = state
> -                    .cutoff_timestamp(&remote.id, &node_name)
> -                    .unwrap_or_else(|| {
> -                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
> -                    });
> -
> -                let params = ListTasks {
> -                    source: Some(ListTasksSource::Archive),
> -                    since: Some(since),
> -                    // If `limit` is not provided, we only receive 50 tasks
> -                    limit: Some(MAX_TASKS_TO_FETCH),
> -                    ..Default::default()
> -                };
> -
> -                let per_remote_permit = Arc::clone(&per_remote_semaphore)
> -                    .acquire_owned()
> -                    .await
> -                    .unwrap();
> -
> -                let total_connections_permit = Arc::clone(&total_connections_semaphore)
> -                    .acquire_owned()
> -                    .await
> -                    .unwrap();
> -
> -                let remote_clone = remote.clone();
> -
> -                join_set.spawn(async move {
> -                    let res = async {
> -                        let client = pve::connect(&remote_clone)?;
> -                        let task_list =
> -                            client
> -                                .get_task_list(&node.node, params)
> -                                .await
> -                                .map_err(|err| {
> -                                    format_err!(
> -                                        "remote '{}', node '{}': {err}",
> -                                        remote_clone.id,
> -                                        node.node
> -                                    )
> -                                })?;
> -                        Ok::<Vec<_>, Error>(task_list)
> -                    }
> -                    .await;
> -
> -                    drop(total_connections_permit);
> -                    drop(per_remote_permit);
> -
> -                    (node_name, res)
> -                });
> -            }
> -
> -            while let Some(result) = join_set.join_next().await {
> -                match result {
> -                    Ok((node_name, result)) => match result {
> -                        Ok(task_list) => {
> -                            let mapped =
> -                                task_list.into_iter().filter_map(|task| {
> -                                    match map_pve_task(task, &remote.id) {
> -                                        Ok(task) => Some(task),
> -                                        Err(err) => {
> -                                            log::error!(
> -                                                "could not map task data, skipping: {err:#}"
> -                                            );
> -                                            None
> -                                        }
> -                                    }
> -                                });
> -
> -                            tasks.extend(mapped);
> -                            node_results.set_node_success(remote.id.clone(), node_name);
> -                        }
> -                        Err(error) => {
> -                            log::error!("could not fetch tasks: {error:#}");
> -                        }
> -                    },
> -                    Err(error) => {
> -                        log::error!("could not join task fetching task: {error:#}");
> -                    }
> -                }
> -            }
> -        }
> -        RemoteType::Pbs => {
> -            // TODO: Add code for PBS
> -        }
> -    }
> -
> -    Ok(FetchedTasks {
> -        tasks,
> -        node_results,
> -    })
> -}
> -
>   #[derive(PartialEq, Debug)]
>   /// Outcome from polling a tracked task.
>   enum PollResult {





More information about the pdm-devel mailing list