[pdm-devel] [PATCH proxmox-datacenter-manager v6 2/6] remote tasks: add background task for task polling, use new task cache

Dominik Csapak d.csapak at proxmox.com
Wed Aug 20 10:25:29 CEST 2025


one small comments inline, aside from that LGTM

On 8/14/25 09:56, Lukas Wagner wrote:
> This commits changes the remote task module as follows:
> 
> - Add a new background task for regular polling of task data
> Instead of triggering fetching of task data from the `get_tasks` function,
> which is usually called by an API handler, we move the fetching to a
> new background task. The task fetches the latest tasks from all remotes
> and stores them in the task cache in regular intervals (10 minutes).
> The `get_tasks` function itself only reads from the cache.
> The main rationale for this change is that for large setups, fetching
> tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
> each with a >100ms connection - adds up to minutes quickly).
> If we do this from within `get_tasks`, the API handler calling the
> function is also blocked for the entire time.
> The `get_tasks` API is called every couple of seconds by the UI the get
> a list of running remote tasks, so this *must* be quick.
> 
> - Tracked tasks are also polled in the same background task, but with
> a short polling delay (10 seconds). If a tracked task finishes,
> a out-of-order fetch of tasks for a remote is performed to update
> the cache with all task data from the finished task.
> 
> - Only finished tasks are requested from the remotes. This avoids a
> foreign (as in, not started by PDM) running task to appear stuck in
> the running state until the next regular task cache refresh.
> The tracked task polling could be extended to also poll running foreign
> tasks, but this is easy addition for the future.
> 
> - Tasks are now stored in the new improved task cache implementation.
> This should make retrieving tasks much quicker and avoids
> unneeded disk IO.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> 
> Notes:
>      Changes since v5:
>        - Incorporate review feedback from @Dominik (thx!)
>        - Change task tracking approach:
>          - Instead of using the oldest running task as a cutoff and
>          switching to a lower fetching interval if there is a tracked task,
>          we poll tracked tasks directly with a 10 second interval.
>          Once a tracked task finishes, we do a regular task fetch once
>          to get full task data (endtime, status).
>          This is a nicer approach for long running tasks, since we do
>          not repeatedly request the same tasks over and over again.
>          - Use proxmox_product_config to get CreateOptions where
>          it makes sense.
>          - Use timestamps instead of cycle counts to keep track
>          of when we want to rotate the task archive or do a full
>          task fetch
>          - Be more clever about how we request the semaphores. Instead
>          of requesting all semaphores that we could potentially to poll
>          multiple nodes of a remote in parallel, request them
>          on demand.
>          - Keep track of per-node failures while fetching tasks and
>          feed this information to the cache implementation
>          so that it can maintain the per-node cutoff timestamp.
>          - Make documentation of public constants a bit easier
>          to understand.
>      
>      Changes since v4:
>        - Rebase onto latest master, adapting to changes in
>          the section config type
>      
>      Changes since v2:
>        - Adapt to new locking approach (only drops a `mut`)
>      
>      Changes since v1:
>      
>       - use const Duration instead of u64s for durations, using
>         Duration::as_secs() where needed
>       - Move the remote_task fetching task functions to
>         src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
>       - remote_tasks::get_tasks: wrap function body in a
>         tokio::task::spawn_blocking. using the TaskCache::get_tasks
>         iterator does disk IO and could block the executor
>       - Added some doc strings to make the purpose/workings of
>         some functions clearer
>       - Couple of variables have been renamed for more clarity
> 
>   server/src/api/pve/lxc.rs                     |  10 +-
>   server/src/api/pve/mod.rs                     |   4 +-
>   server/src/api/pve/qemu.rs                    |   6 +-
>   server/src/api/remote_tasks.rs                |  11 +-
>   server/src/bin/proxmox-datacenter-api/main.rs |   1 +
>   .../bin/proxmox-datacenter-api/tasks/mod.rs   |   1 +
>   .../tasks/remote_tasks.rs                     | 559 ++++++++++++++++
>   server/src/remote_tasks/mod.rs                | 625 ++++--------------
>   8 files changed, 706 insertions(+), 511 deletions(-)
>   create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> 
> diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
> index f1c31425..83f9f4aa 100644
> --- a/server/src/api/pve/lxc.rs
> +++ b/server/src/api/pve/lxc.rs
> @@ -209,7 +209,7 @@ pub async fn lxc_start(
>   
>       let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -242,7 +242,7 @@ pub async fn lxc_stop(
>   
>       let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
>           .shutdown_lxc_async(&node, vmid, Default::default())
>           .await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -357,7 +357,7 @@ pub async fn lxc_migrate(
>       };
>       let upid = pve.migrate_lxc(&node, vmid, params).await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
>       log::info!("migrating vm {vmid} of node {node:?}");
>       let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
>   
> -    new_remote_upid(source, upid)
> +    new_remote_upid(source, upid).await
>   }
> diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
> index dd7cf382..d472cf58 100644
> --- a/server/src/api/pve/mod.rs
> +++ b/server/src/api/pve/mod.rs
> @@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
>   const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
>   
>   // converts a remote + PveUpid into a RemoteUpid and starts tracking it
> -fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
> +async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
>       let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
> -    remote_tasks::track_running_task(remote_upid.clone());
> +    remote_tasks::track_running_task(remote_upid.clone()).await?;
>       Ok(remote_upid)
>   }
>   
> diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
> index 5a41a69e..54ede112 100644
> --- a/server/src/api/pve/qemu.rs
> +++ b/server/src/api/pve/qemu.rs
> @@ -216,7 +216,7 @@ pub async fn qemu_start(
>           .start_qemu_async(&node, vmid, Default::default())
>           .await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -377,7 +377,7 @@ pub async fn qemu_migrate(
>       };
>       let upid = pve.migrate_qemu(&node, vmid, params).await?;
>   
> -    new_remote_upid(remote, upid)
> +    new_remote_upid(remote, upid).await
>   }
>   
>   #[api(
> @@ -564,5 +564,5 @@ pub async fn qemu_remote_migrate(
>       log::info!("migrating vm {vmid} of node {node:?}");
>       let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
>   
> -    new_remote_upid(source, upid)
> +    new_remote_upid(source, upid).await
>   }
> diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
> index e629000c..05ce3666 100644
> --- a/server/src/api/remote_tasks.rs
> +++ b/server/src/api/remote_tasks.rs
> @@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
>       },
>       input: {
>           properties: {
> -            "max-age": {
> -                type: Integer,
> -                optional: true,
> -                // TODO: sensible default max-age
> -                default: 300,
> -                description: "Maximum age of cached task data",
> -            },
>               filters: {
>                   type: TaskFilters,
>                   flatten: true,
> @@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
>       },
>   )]
>   /// Get the list of tasks for all remotes.
> -async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> -    let tasks = remote_tasks::get_tasks(max_age, filters).await?;
> +async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> +    let tasks = remote_tasks::get_tasks(filters).await?;
>   
>       Ok(tasks)
>   }
> diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
> index db6b2585..42bc0e1e 100644
> --- a/server/src/bin/proxmox-datacenter-api/main.rs
> +++ b/server/src/bin/proxmox-datacenter-api/main.rs
> @@ -376,6 +376,7 @@ async fn run(debug: bool) -> Result<(), Error> {
>       metric_collection::start_task();
>       tasks::remote_node_mapping::start_task();
>       resource_cache::start_task();
> +    tasks::remote_tasks::start_task()?;
>   
>       server.await?;
>       log::info!("server shutting down, waiting for active workers to complete");
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> index e6ead882..a6b1f439 100644
> --- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
> @@ -1 +1,2 @@
>   pub mod remote_node_mapping;
> +pub mod remote_tasks;
> diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> new file mode 100644
> index 00000000..4701a935
> --- /dev/null
> +++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
> @@ -0,0 +1,559 @@
> +use std::{
> +    collections::{HashMap, HashSet},
> +    sync::Arc,
> +    time::{Duration, Instant},
> +};
> +
> +use anyhow::{format_err, Error};
> +use nix::sys::stat::Mode;
> +use tokio::{sync::Semaphore, task::JoinSet};
> +
> +use pdm_api_types::{
> +    remotes::{Remote, RemoteType},
> +    RemoteUpid,
> +};
> +use proxmox_section_config::typed::SectionConfigData;
> +use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
> +
> +use server::{
> +    api::pve,
> +    remote_tasks::{
> +        self,
> +        task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
> +        KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
> +    },
> +    task_utils,
> +};
> +
> +/// Tick interval for the remote task fetching task.
> +/// This is also the rate at which we check on tracked tasks.
> +const POLL_INTERVAL: Duration = Duration::from_secs(10);
> +
> +/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
> +/// task for this remote).
> +const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
> +
> +/// Interval at which to check for task cache rotation.
> +const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
> +
> +/// Interval at which the task cache journal should be applied.
> +///
> +/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
> +/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
> +/// it will be slower than the task archive itself, as the entire journal must be loaded into
> +/// memory and then sorted by task starttime. Applying the journal more often might
> +/// lead to more writes, but should yield better performance.
> +const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
> +
> +/// Maximum number of concurrent connections per remote.
> +const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
> +
> +/// Maximum number of total concurrent connections.
> +const MAX_CONNECTIONS: usize = 20;
> +
> +/// Maximum number of tasks to fetch from a single remote in one API call.
> +const MAX_TASKS_TO_FETCH: u64 = 5000;
> +
> +/// (Ephemeral) Remote task fetching task state.
> +struct TaskState {
> +    /// Time at which we last checked for archive rotation.
> +    last_rotate_check: Instant,
> +    /// Time at which we fetch tasks the last time.
> +    last_fetch: Instant,
> +    /// Time at which we last applied the journal.
> +    last_journal_apply: Instant,
> +}
> +
> +impl TaskState {
> +    fn new() -> Self {
> +        let now = Instant::now();
> +
> +        Self {
> +            last_rotate_check: now - CHECK_ROTATE_INTERVAL,
> +            last_fetch: now - TASK_FETCH_INTERVAL,
> +            last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
> +        }
> +    }
> +
> +    /// Reset the task archive rotation timestamp.
> +    fn reset_rotate_check(&mut self) {
> +        self.last_rotate_check = Instant::now();
> +    }
> +
> +    /// Reset the task fetch timestamp.
> +    fn reset_fetch(&mut self) {
> +        self.last_fetch = Instant::now();
> +    }
> +
> +    /// Reset the journal apply timestamp.
> +    fn reset_journal_apply(&mut self) {
> +        self.last_journal_apply = Instant::now();
> +    }
> +
> +    /// Should we check for archive rotation?
> +    fn is_due_for_rotate_check(&self) -> bool {
> +        Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
> +    }
> +
> +    /// Should we fetch tasks?
> +    fn is_due_for_fetch(&self) -> bool {
> +        Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
> +    }
> +
> +    /// Should we apply the task archive's journal?
> +    fn is_due_for_journal_apply(&self) -> bool {
> +        Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
> +    }
> +}
> +
> +/// Start the remote task fetching task
> +pub fn start_task() -> Result<(), Error> {
> +    let dir_options =
> +        proxmox_product_config::default_create_options().perm(Mode::from_bits_truncate(0o0750));
> +
> +    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(dir_options))?;
> +
> +    tokio::spawn(async move {
> +        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
> +        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
> +        futures::future::select(task_scheduler, abort_future).await;
> +    });
> +
> +    Ok(())
> +}
> +
> +/// Task which handles fetching remote tasks and task archive rotation.
> +/// This function never returns.
> +async fn remote_task_fetching_task() -> ! {
> +    let mut task_state = TaskState::new();
> +
> +    let mut interval = tokio::time::interval(POLL_INTERVAL);
> +    interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into());
> +
> +    // We don't really care about catching up to missed tick, we just want
> +    // a steady tick rate.
> +    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
> +
> +    if let Err(err) = init_cache().await {
> +        log::error!("error when initialized task cache: {err:#}");
> +    }
> +
> +    loop {
> +        interval.tick().await;
> +        if let Err(err) = do_tick(&mut task_state).await {
> +            log::error!("error when fetching remote tasks: {err:#}");
> +        }
> +    }
> +}
> +
> +/// Handle a single timer tick.
> +/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
> +async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
> +    let cache = remote_tasks::get_cache()?;
> +
> +    if task_state.is_due_for_rotate_check() {
> +        log::debug!("checking if remote task archive should be rotated");
> +        if rotate_cache(cache.clone()).await? {
> +            log::info!("rotated remote task archive");
> +        }
> +
> +        task_state.reset_rotate_check();
> +    }
> +
> +    if task_state.is_due_for_journal_apply() {
> +        apply_journal(cache.clone()).await?;
> +        task_state.reset_journal_apply();
> +    }
> +
> +    let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
> +
> +    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
> +
> +    let cache_state = cache.read_state();
> +    let poll_results = poll_tracked_tasks(
> +        &remote_config,
> +        cache_state.tracked_tasks(),
> +        Arc::clone(&total_connections_semaphore),
> +    )
> +    .await?;
> +
> +    // Get a list of remotes that we should poll in this cycle.
> +    let remotes = if task_state.is_due_for_fetch() {
> +        task_state.reset_fetch();
> +        get_all_remotes(&remote_config)
> +    } else {
> +        get_remotes_with_finished_tasks(&remote_config, &poll_results)
> +    };
> +
> +    let (all_tasks, update_state_for_remote) = fetch_remotes(
> +        remotes,
> +        Arc::new(cache_state),
> +        Arc::clone(&total_connections_semaphore),
> +    )
> +    .await;
> +
> +    if !all_tasks.is_empty() {
> +        update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
> +    }
> +
> +    Ok(())
> +}
> +
> +/// Initialize the remote task cache with initial archive files, in case there are not
> +/// any archive files yet.
> +///
> +/// This allows us to immediately backfill remote task history when setting up a new PDM instance
> +/// without any prior task archive rotation.
> +async fn init_cache() -> Result<(), Error> {
> +    tokio::task::spawn_blocking(|| {
> +        let cache = remote_tasks::get_cache()?;
> +        cache.write()?.init(proxmox_time::epoch_i64())?;
> +        Ok(())
> +    })
> +    .await?
> +}
> +
> +/// Fetch tasks from a list of remotes.
> +///
> +/// Returns a list of tasks and a map that shows whether we want to update the
> +/// cutoff timestamp in the statefile. We don't want to update the cutoff if
> +/// the connection to one remote failed or if we could not reach all remotes in a cluster.
> +async fn fetch_remotes(
> +    remotes: Vec<Remote>,
> +    cache_state: Arc<State>,
> +    total_connections_semaphore: Arc<Semaphore>,
> +) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
> +    let mut join_set = JoinSet::new();
> +
> +    for remote in remotes {
> +        let semaphore = Arc::clone(&total_connections_semaphore);
> +        let state_clone = Arc::clone(&cache_state);
> +
> +        join_set.spawn(async move {
> +            log::debug!("fetching remote tasks for '{}'", remote.id);
> +            fetch_tasks(&remote, state_clone, semaphore)
> +                .await
> +                .map_err(|err| {
> +                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
> +                })
> +        });
> +    }
> +
> +    let mut all_tasks = Vec::new();
> +    let mut update_state_for_remote = NodeFetchSuccessMap::default();
> +
> +    while let Some(res) = join_set.join_next().await {
> +        match res {
> +            Ok(Ok(FetchedTasks {
> +                tasks,
> +                node_results,
> +            })) => {
> +                all_tasks.extend(tasks);
> +                update_state_for_remote.merge(node_results);
> +            }
> +            Ok(Err(err)) => log::error!("{err:#}"),
> +            Err(err) => log::error!("could not join task fetching future: {err:#}"),
> +        }
> +    }
> +
> +    (all_tasks, update_state_for_remote)
> +}
> +
> +/// Return all remotes from the given config.
> +fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
> +    remote_config
> +        .into_iter()
> +        .map(|(_, section)| section)
> +        .cloned()
> +        .collect()
> +}
> +
> +/// Return all remotes that correspond to a list of finished tasks.
> +fn get_remotes_with_finished_tasks(
> +    remote_config: &SectionConfigData<Remote>,
> +    poll_results: &HashMap<RemoteUpid, PollResult>,
> +) -> Vec<Remote> {
> +    let remotes_with_finished_tasks: HashSet<&str> = poll_results
> +        .iter()
> +        .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
> +        .collect();
> +
> +    remote_config
> +        .into_iter()
> +        .filter_map(|(name, remote)| {
> +            remotes_with_finished_tasks
> +                .contains(&name)
> +                .then_some(remote)
> +        })
> +        .cloned()
> +        .collect()
> +}
> +
> +/// Rotate the task cache if necessary.
> +///
> +/// Returns Ok(true) the cache's files were rotated.
> +async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
> +    tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
> +}
> +
> +/// Apply the task cache journal.
> +async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
> +    tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
> +}
> +
> +/// Fetched tasks from a single remote.
> +struct FetchedTasks {
> +    /// List of tasks.
> +    tasks: Vec<TaskCacheItem>,
> +    /// Contains whether a cluster node was fetched successfully.
> +    node_results: NodeFetchSuccessMap,
> +}
> +
> +/// Fetch tasks (active and finished) from a remote.
> +async fn fetch_tasks(
> +    remote: &Remote,
> +    state: Arc<State>,
> +    total_connections_semaphore: Arc<Semaphore>,
> +) -> Result<FetchedTasks, Error> {
> +    let mut tasks = Vec::new();
> +
> +    let mut node_results = NodeFetchSuccessMap::default();
> +
> +    match remote.ty {
> +        RemoteType::Pve => {
> +            let client = pve::connect(remote)?;
> +
> +            let nodes = {
> +                // This permit *must* be dropped before we acquire the permits for the
> +                // per-node connections - otherwise we risk a deadlock.
> +                let _permit = total_connections_semaphore.acquire().await.unwrap();
> +                client.list_nodes().await?
> +            };
> +
> +            // This second semaphore is used to limit the number of concurrent connections
> +            // *per remote*, not in total.
> +            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
> +            let mut join_set = JoinSet::new();
> +
> +            for node in nodes {
> +                let node_name = node.node.to_string();
> +
> +                let since = state
> +                    .cutoff_timestamp(&remote.id, &node_name)
> +                    .unwrap_or_else(|| {
> +                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
> +                    });
> +
> +                let params = ListTasks {
> +                    source: Some(ListTasksSource::Archive),
> +                    since: Some(since),
> +                    // If `limit` is not provided, we only receive 50 tasks
> +                    limit: Some(MAX_TASKS_TO_FETCH),
> +                    ..Default::default()
> +                };
> +
> +                let per_remote_permit = Arc::clone(&per_remote_semaphore)
> +                    .acquire_owned()
> +                    .await
> +                    .unwrap();
> +
> +                let total_connections_permit = Arc::clone(&total_connections_semaphore)
> +                    .acquire_owned()
> +                    .await
> +                    .unwrap();
> +
> +                let remote_clone = remote.clone();
> +
> +                join_set.spawn(async move {
> +                    let res = async {
> +                        let client = pve::connect(&remote_clone)?;
> +                        let task_list =
> +                            client
> +                                .get_task_list(&node.node, params)
> +                                .await
> +                                .map_err(|err| {
> +                                    format_err!(
> +                                        "remote '{}', node '{}': {err}",
> +                                        remote_clone.id,
> +                                        node.node
> +                                    )
> +                                })?;
> +                        Ok::<Vec<_>, Error>(task_list)
> +                    }
> +                    .await;
> +
> +                    drop(total_connections_permit);
> +                    drop(per_remote_permit);
> +
> +                    (node_name, res)
> +                });
> +            }
> +
> +            while let Some(result) = join_set.join_next().await {
> +                match result {
> +                    Ok((node_name, result)) => match result {
> +                        Ok(task_list) => {
> +                            let mapped =
> +                                task_list.into_iter().filter_map(|task| {
> +                                    match map_pve_task(task, &remote.id) {
> +                                        Ok(task) => Some(task),
> +                                        Err(err) => {
> +                                            log::error!(
> +                                                "could not map task data, skipping: {err:#}"
> +                                            );
> +                                            None
> +                                        }
> +                                    }
> +                                });
> +
> +                            tasks.extend(mapped);
> +                            node_results.set_node_success(remote.id.clone(), node_name);
> +                        }
> +                        Err(error) => {
> +                            log::error!("could not fetch tasks: {error:#}");
> +                            node_results.set_node_failure(remote.id.clone(), node_name);
> +                        }
> +                    },
> +                    Err(err) => return Err(err.into()),
> +                }

two things here:

* are we sure we want to cancel every thing when wone task fetching task 
failed? iow. we maybe want Err(err) => log (err) instaed of return 
Err(err.into) ? should not happen, but if there is some panic
we still wan't to log and continue maybe?

* if not, we could do

while let Some(result) = join_next().await {
     let (node_name, result) = result?;
     match result {
         Ok(task_list) => {}
         Err(err) => {}
     }
}
or maybe even better:

while let Some(Ok((node_name, result))) = join_next() {
     match result {
     ...
     }
}

can't do those ofc if we wan't to log & continue
> +            }
> +        }
> +        RemoteType::Pbs => {
> +            // TODO: Add code for PBS
> +        }
> +    }
> +
> +    Ok(FetchedTasks {
> +        tasks,
> +        node_results,
> +    })
> +}
> +
> +#[derive(PartialEq, Debug)]
> +/// Outcome from polling a tracked task.
> +enum PollResult {
> +    /// Tasks is still running.
> +    Running,
> +    /// Task is finished, poll remote tasks to get final status/endtime.
> +    Finished,
> +    /// Should be dropped from the active file.
> +    RequestError,
> +    /// Remote does not exist any more -> remove immediately from tracked task list.
> +    RemoteGone,
> +}
> +
> +/// Poll all tracked tasks.
> +async fn poll_tracked_tasks(
> +    remote_config: &SectionConfigData<Remote>,
> +    tracked_tasks: impl Iterator<Item = &RemoteUpid>,
> +    total_connections_semaphore: Arc<Semaphore>,
> +) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
> +    let mut join_set = JoinSet::new();
> +
> +    for task in tracked_tasks.cloned() {
> +        let permit = Arc::clone(&total_connections_semaphore)
> +            .acquire_owned()
> +            .await
> +            .unwrap();
> +
> +        let remote = remote_config.get(task.remote()).cloned();
> +
> +        join_set.spawn(async move {
> +            // Move permit into this async block.
> +            let _permit = permit;
> +
> +            match remote {
> +                Some(remote) => poll_single_tracked_task(remote, task).await,
> +                None => {
> +                    log::info!(
> +                        "remote {} does not exist any more, dropping tracked task",
> +                        task.remote()
> +                    );
> +                    (task, PollResult::RemoteGone)
> +                }
> +            }
> +        });
> +    }
> +
> +    let mut results = HashMap::new();
> +    while let Some(task_result) = join_set.join_next().await {
> +        let (upid, result) = task_result?;
> +        results.insert(upid, result);
> +    }
> +
> +    Ok(results)
> +}
> +
> +/// Poll a single tracked task.
> +async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
> +    match remote.ty {
> +        RemoteType::Pve => {
> +            log::debug!("polling tracked task {}", task);
> +
> +            let status = match server::api::pve::tasks::get_task_status(
> +                remote.id.clone(),
> +                task.clone(),
> +                false,
> +            )
> +            .await
> +            {
> +                Ok(status) => status,
> +                Err(err) => {
> +                    log::error!("could not get status from remote: {err:#}");
> +                    return (task, PollResult::RequestError);
> +                }
> +            };
> +
> +            let result = if status.exitstatus.is_some() {
> +                PollResult::Finished
> +            } else {
> +                PollResult::Running
> +            };
> +
> +            (task, result)
> +        }
> +        RemoteType::Pbs => {
> +            // TODO: Implement for PBS
> +            (task, PollResult::RequestError)
> +        }
> +    }
> +}
> +
> +/// Map a `ListTasksResponse` to `TaskCacheItem`
> +fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
> +    let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
> +
> +    Ok(TaskCacheItem {
> +        upid: remote_upid,
> +        starttime: task.starttime,
> +        endtime: task.endtime,
> +        status: task.status,
> +    })
> +}
> +
> +/// Update task cache with results from tracked task polling & regular task fetching.
> +async fn update_task_cache(
> +    cache: TaskCache,
> +    new_tasks: Vec<TaskCacheItem>,
> +    update_state_for_remote: NodeFetchSuccessMap,
> +    poll_results: HashMap<RemoteUpid, PollResult>,
> +) -> Result<(), Error> {
> +    tokio::task::spawn_blocking(move || {
> +        let drop_tracked = poll_results
> +            .into_iter()
> +            .filter_map(|(upid, result)| match result {
> +                PollResult::Running => None,
> +                PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
> +                    Some(upid)
> +                }
> +            })
> +            .collect();
> +
> +        cache
> +            .write()?
> +            .update(new_tasks, &update_state_for_remote, drop_tracked)?;
> +
> +        Ok(())
> +    })
> +    .await?
> +}
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 7c8e31ef..cec2cc1e 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -1,515 +1,156 @@
> -use std::{
> -    collections::{HashMap, HashSet},
> -    fs::File,
> -    path::{Path, PathBuf},
> -    sync::{LazyLock, RwLock},
> -    time::Duration,
> -};
> +use std::path::Path;
>   
>   use anyhow::Error;
> -use pdm_api_types::{
> -    remotes::{Remote, RemoteType},
> -    RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
> -};
> -use proxmox_sys::fs::CreateOptions;
> -use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
> -use serde::{Deserialize, Serialize};
> -use tokio::task::JoinHandle;
>   
> -use crate::{api::pve, task_utils};
> +use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
> +use pve_api_types::PveUpid;
>   
> -mod task_cache;
> +pub mod task_cache;
> +
> +use task_cache::{GetTasks, TaskCache, TaskCacheItem};
> +
> +/// Base directory for the remote task cache.
> +pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
> +
> +/// Maximum size at which the journal will applied early when adding new tasks.
> +const JOURNAL_MAX_SIZE: u64 = 5 * 1024 * 1024;
> +
> +/// Rotate once the most recent archive file is at least 24 hour old.
> +pub const ROTATE_AFTER: u64 = 24 * 3600;
> +
> +/// Keep 7 days worth of tasks.
> +pub const KEEP_OLD_FILES: u32 = 7;
> +
> +/// Number of uncompressed archive files. These will be be the most recent ones.
> +const NUMBER_OF_UNCOMPRESSED_FILES: u32 = 2;
>   
>   /// Get tasks for all remotes
>   // FIXME: filter for privileges
> -pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> -    let (remotes, _) = pdm_config::remotes::config()?;
> +pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> +    tokio::task::spawn_blocking(move || {
> +        let cache = get_cache()?.read()?;
>   
> -    let mut all_tasks = Vec::new();
> -
> -    let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
> -    let mut cache = TaskCache::new(cache_path)?;
> -
> -    // Force a refresh for all tasks of a remote if a task is finished.
> -    // Not super nice, but saves us from persisting finished tasks. Also,
> -    // the /nodes/<node>/tasks/<upid>/status endpoint does not return
> -    // a task's endtime, which is only returned by
> -    // /nodes/<node>/tasks...
> -    // Room for improvements in the future.
> -    invalidate_cache_for_finished_tasks(&mut cache);
> -
> -    for (remote_name, remote) in remotes.iter() {
> -        let now = proxmox_time::epoch_i64();
> -
> -        if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) {
> -            // Data in cache is recent enough and has not been invalidated.
> -            all_tasks.extend(tasks);
> +        let which = if filters.running {
> +            GetTasks::Active
>           } else {
> -            let tasks = match fetch_tasks(remote).await {
> -                Ok(tasks) => tasks,
> -                Err(err) => {
> -                    // ignore errors for not reachable remotes
> -                    continue;
> +            GetTasks::All
> +        };
> +
> +        let returned_tasks = cache
> +            .get_tasks(which)?
> +            .skip(filters.start as usize)
> +            .take(filters.limit as usize)
> +            .filter_map(|task| {
> +                // TODO: Handle PBS tasks
> +                let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
> +                match pve_upid {
> +                    Ok(pve_upid) => Some(TaskListItem {
> +                        upid: task.upid.to_string(),
> +                        node: pve_upid.node,
> +                        pid: pve_upid.pid as i64,
> +                        pstart: pve_upid.pstart,
> +                        starttime: pve_upid.starttime,
> +                        worker_type: pve_upid.worker_type,
> +                        worker_id: None,
> +                        user: pve_upid.auth_id,
> +                        endtime: task.endtime,
> +                        status: task.status,
> +                    }),
> +                    Err(err) => {
> +                        log::error!("could not parse UPID: {err:#}");
> +                        None
> +                    }
>                   }
> -            };
> -            cache.set_tasks(remote_name, tasks.clone(), now);
> -
> -            all_tasks.extend(tasks);
> -        }
> -    }
> -
> -    let mut returned_tasks = add_running_tasks(all_tasks)?;
> -    returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
> -    let returned_tasks = returned_tasks
> -        .into_iter()
> -        .filter(|item| {
> -            if filters.running && item.endtime.is_some() {
> -                return false;
> -            }
> -
> -            if let Some(until) = filters.until {
> -                if item.starttime > until {
> +            })
> +            .filter(|item| {
> +                if filters.running && item.endtime.is_some() {
>                       return false;
>                   }
> -            }
>   
> -            if let Some(since) = filters.since {
> -                if item.starttime < since {
> -                    return false;
> -                }
> -            }
> -
> -            if let Some(needle) = &filters.userfilter {
> -                if !item.user.contains(needle) {
> -                    return false;
> -                }
> -            }
> -
> -            if let Some(typefilter) = &filters.typefilter {
> -                if !item.worker_type.contains(typefilter) {
> -                    return false;
> -                }
> -            }
> -
> -            let state = item.status.as_ref().map(|status| tasktype(status));
> -
> -            match (state, &filters.statusfilter) {
> -                (Some(TaskStateType::OK), _) if filters.errors => return false,
> -                (Some(state), Some(filters)) => {
> -                    if !filters.contains(&state) {
> +                if let Some(until) = filters.until {
> +                    if item.starttime > until {
>                           return false;
>                       }
>                   }
> -                (None, Some(_)) => return false,
> -                _ => {}
> -            }
>   
> -            true
> -        })
> -        .skip(filters.start as usize)
> -        .take(filters.limit as usize)
> -        .collect();
> -
> -    // We don't need to wait for this task to finish
> -    tokio::task::spawn_blocking(move || {
> -        if let Err(e) = cache.save() {
> -            log::error!("could not save task cache: {e}");
> -        }
> -    });
> -
> -    Ok(returned_tasks)
> -}
> -
> -/// Fetch tasks (active and finished) from a remote
> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> -    let mut tasks = Vec::new();
> -
> -    match remote.ty {
> -        RemoteType::Pve => {
> -            let client = pve::connect(remote)?;
> -
> -            // N+1 requests - we could use /cluster/tasks, but that one
> -            // only gives a limited task history
> -            for node in client.list_nodes().await? {
> -                let params = ListTasks {
> -                    // Include running tasks
> -                    source: Some(ListTasksSource::All),
> -                    // TODO: How much task history do we want? Right now we just hard-code it
> -                    // to 7 days.
> -                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> -                    ..Default::default()
> -                };
> -
> -                let list = client.get_task_list(&node.node, params).await?;
> -                let mapped = map_tasks(list, &remote.id)?;
> -
> -                tasks.extend(mapped);
> -            }
> -        }
> -        RemoteType::Pbs => {
> -            // TODO: Add code for PBS
> -        }
> -    }
> -
> -    Ok(tasks)
> -}
> -
> -/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
> -fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
> -    let mut mapped = Vec::new();
> -
> -    for task in tasks {
> -        let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
> -
> -        mapped.push(TaskListItem {
> -            upid: remote_upid.to_string(),
> -            node: task.node,
> -            pid: task.pid,
> -            pstart: task.pstart as u64,
> -            starttime: task.starttime,
> -            worker_type: task.ty,
> -            worker_id: Some(task.id),
> -            user: task.user,
> -            endtime: task.endtime,
> -            status: task.status,
> -        })
> -    }
> -
> -    Ok(mapped)
> -}
> -
> -/// Drops the cached task list of a remote for all finished tasks.
> -///
> -/// We use this to force a refresh so that we get the full task
> -/// info (including `endtime`) in the next API call.
> -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
> -    let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
> -
> -    // If a task is finished, we force a refresh for the remote - otherwise
> -    // we don't get the 'endtime' for the task.
> -    for task in finished.drain() {
> -        cache.invalidate_cache_for_remote(task.remote());
> -    }
> -}
> -
> -/// Supplement the list of tasks that we received from the remote with
> -/// the tasks that were started by PDM and are currently running.
> -fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
> -    let mut returned_tasks = Vec::new();
> -
> -    let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
> -    for task in cached_tasks {
> -        let remote_upid = task.upid.parse()?;
> -
> -        if running_tasks.contains(&remote_upid) {
> -            if task.endtime.is_some() {
> -                // Task is finished but we still think it is running ->
> -                // Drop it from RUNNING_FOREIGN_TASKS
> -                running_tasks.remove(&remote_upid);
> -
> -                // No need to put it in FINISHED_TASKS, since we already
> -                // got its state recently enough (we know the status and endtime)
> -            }
> -        } else {
> -            returned_tasks.push(task);
> -        }
> -    }
> -
> -    for task in running_tasks.iter() {
> -        let upid: PveUpid = task.upid.parse()?;
> -        returned_tasks.push(TaskListItem {
> -            upid: task.to_string(),
> -            node: upid.node,
> -            pid: upid.pid as i64,
> -            pstart: upid.pstart,
> -            starttime: upid.starttime,
> -            worker_type: upid.worker_type,
> -            worker_id: upid.worker_id,
> -            user: upid.auth_id,
> -            endtime: None,
> -            status: None,
> -        });
> -    }
> -
> -    Ok(returned_tasks)
> -}
> -
> -/// A cache for fetched remote tasks.
> -struct TaskCache {
> -    /// Cache entries
> -    content: TaskCacheContent,
> -
> -    /// Entries that were added or updated - these will be persistet
> -    /// when `save` is called.
> -    new_or_updated: TaskCacheContent,
> -
> -    /// Cache entries were changed/removed.
> -    dirty: bool,
> -
> -    /// File-location at which the cached tasks are stored.
> -    cachefile_path: PathBuf,
> -}
> -
> -impl TaskCache {
> -    /// Create a new tasks cache instance by loading
> -    /// the cache from disk.
> -    fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
> -        Ok(Self {
> -            content: Self::load_content()?,
> -            new_or_updated: Default::default(),
> -            dirty: false,
> -            cachefile_path,
> -        })
> -    }
> -
> -    /// Load the task cache contents from disk.
> -    fn load_content() -> Result<TaskCacheContent, Error> {
> -        let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
> -        let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
> -
> -        let content = if let Some(content) = content {
> -            serde_json::from_str(&content)?
> -        } else {
> -            Default::default()
> -        };
> -
> -        Ok(content)
> -    }
> -
> -    /// Get path for the cache's lockfile.
> -    fn lockfile_path(&self) -> PathBuf {
> -        let mut path = self.cachefile_path.clone();
> -        path.set_extension("lock");
> -        path
> -    }
> -
> -    /// Persist the task cache
> -    ///
> -    /// This method requests an exclusive lock for the task cache lockfile.
> -    fn save(&mut self) -> Result<(), Error> {
> -        // if we have not updated anything, we don't have to update the cache file
> -        if !self.dirty {
> -            return Ok(());
> -        }
> -
> -        let _guard = self.lock(Duration::from_secs(5))?;
> -
> -        // Read content again, in case somebody has changed it in the meanwhile
> -        let mut content = Self::load_content()?;
> -
> -        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
> -            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
> -                // Only update entry if nobody else has updated it in the meanwhile
> -                if existing_entry.timestamp < entry.timestamp {
> -                    *existing_entry = entry;
> -                }
> -            } else {
> -                content.remote_tasks.insert(remote_name, entry);
> -            }
> -        }
> -
> -        let bytes = serde_json::to_vec_pretty(&content)?;
> -
> -        let api_uid = pdm_config::api_user()?.uid;
> -        let api_gid = pdm_config::api_group()?.gid;
> -
> -        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
> -
> -        proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
> -
> -        self.dirty = false;
> -
> -        Ok(())
> -    }
> -
> -    // Update task data for a given remote.
> -    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
> -        self.dirty = true;
> -        self.new_or_updated
> -            .remote_tasks
> -            .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
> -    }
> -
> -    // Get task data for a given remote.
> -    fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
> -        if let Some(entry) = self.content.remote_tasks.get(remote) {
> -            if (entry.timestamp + max_age) < now {
> -                return None;
> -            }
> -
> -            Some(entry.tasks.clone())
> -        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
> -            if (entry.timestamp + max_age) < now {
> -                return None;
> -            }
> -            Some(entry.tasks.clone())
> -        } else {
> -            None
> -        }
> -    }
> -
> -    // Invalidate cache for a given remote.
> -    fn invalidate_cache_for_remote(&mut self, remote: &str) {
> -        self.dirty = true;
> -        self.content.remote_tasks.remove(remote);
> -    }
> -
> -    // Lock the cache for modification.
> -    //
> -    // While the cache is locked, other users can still read the cache
> -    // without a lock, since the cache file is replaced atomically
> -    // when updating.
> -    fn lock(&self, duration: Duration) -> Result<File, Error> {
> -        let api_uid = pdm_config::api_user()?.uid;
> -        let api_gid = pdm_config::api_group()?.gid;
> -
> -        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
> -        proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
> -    }
> -}
> -
> -#[derive(Serialize, Deserialize)]
> -/// Per-remote entry in the task cache.
> -struct TaskCacheEntry {
> -    timestamp: i64,
> -    tasks: Vec<TaskListItem>,
> -}
> -
> -#[derive(Default, Serialize, Deserialize)]
> -/// Content of the task cache file.
> -struct TaskCacheContent {
> -    remote_tasks: HashMap<String, TaskCacheEntry>,
> -}
> -
> -/// Interval at which tracked tasks are polled
> -const RUNNING_CHECK_INTERVAL_S: u64 = 10;
> -
> -/// Tasks which were started by PDM and are still running
> -static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
> -/// Tasks which were started by PDM and w
> -static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
> -
> -fn init() -> RwLock<HashSet<RemoteUpid>> {
> -    RwLock::new(HashSet::new())
> -}
> -
> -/// Insert a remote UPID into the running list
> -///
> -/// If it is the first entry in the list, a background task is started to track its state
> -///
> -/// Returns the [`JoinHandle`] if a task was started.
> -///
> -/// panics on a poisoned mutex
> -pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
> -    let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
> -
> -    // the call inserting the first task in the list needs to start the checking task
> -    let need_start_task = tasks.is_empty();
> -    tasks.insert(task);
> -
> -    if !need_start_task {
> -        return None;
> -    }
> -    drop(tasks);
> -
> -    Some(tokio::spawn(async move {
> -        loop {
> -            let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
> -            tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
> -
> -            let finished_tasks = get_finished_tasks().await;
> -
> -            // skip iteration if we still have tasks, just not finished ones
> -            if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
> -                continue;
> -            }
> -
> -            let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
> -            // we either have finished tasks, or the running task list was empty
> -            let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
> -
> -            for (upid, _status) in finished_tasks {
> -                if set.remove(&upid) {
> -                    finished.insert(upid);
> -                } else {
> -                    // someone else removed & persisted the task in the meantime
> -                }
> -            }
> -
> -            // if no task remains, end the current task
> -            // it will be restarted by the next caller that inserts one
> -            if set.is_empty() {
> -                return;
> -            }
> -        }
> -    }))
> -}
> -
> -/// Get a list of running foreign tasks
> -///
> -/// panics on a poisoned mutex
> -pub fn get_running_tasks() -> Vec<RemoteUpid> {
> -    RUNNING_FOREIGN_TASKS
> -        .read()
> -        .unwrap()
> -        .iter()
> -        .cloned()
> -        .collect()
> -}
> -
> -/// Checks all current saved UPIDs if they're still running, and if not,
> -/// returns their upids + status
> -///
> -/// panics on a poisoned mutex
> -pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
> -    let mut finished = Vec::new();
> -    let config = match pdm_config::remotes::config() {
> -        Ok((config, _)) => config,
> -        Err(err) => {
> -            log::error!("could not open remotes config: {err}");
> -            return Vec::new();
> -        }
> -    };
> -    for task in get_running_tasks() {
> -        match config.get(task.remote()) {
> -            Some(remote) => match remote.ty {
> -                RemoteType::Pve => {
> -                    let status = match crate::api::pve::tasks::get_task_status(
> -                        remote.id.clone(),
> -                        task.clone(),
> -                        false,
> -                    )
> -                    .await
> -                    {
> -                        Ok(status) => status,
> -                        Err(err) => {
> -                            log::error!("could not get status from remote: {err}");
> -                            finished.push((task, "could not get status".to_string()));
> -                            continue;
> -                        }
> -                    };
> -                    if let Some(status) = status.exitstatus {
> -                        finished.push((task, status.to_string()));
> +                if let Some(since) = filters.since {
> +                    if item.starttime < since {
> +                        return false;
>                       }
>                   }
> -                RemoteType::Pbs => {
> -                    let _client = match crate::pbs_client::connect(remote) {
> -                        Ok(client) => client,
> -                        Err(err) => {
> -                            log::error!("could not get status from remote: {err}");
> -                            finished.push((task, "could not get status".to_string()));
> -                            continue;
> -                        }
> -                    };
> -                    // FIXME implement get task status
> -                    finished.push((task, "unknown state".to_string()));
> -                }
> -            },
> -            None => finished.push((task, "unknown remote".to_string())),
> -        }
> -    }
>   
> -    finished
> +                if let Some(needle) = &filters.userfilter {
> +                    if !item.user.contains(needle) {
> +                        return false;
> +                    }
> +                }
> +
> +                if let Some(typefilter) = &filters.typefilter {
> +                    if !item.worker_type.contains(typefilter) {
> +                        return false;
> +                    }
> +                }
> +
> +                let state = item.status.as_ref().map(|status| tasktype(status));
> +
> +                match (state, &filters.statusfilter) {
> +                    (Some(TaskStateType::OK), _) if filters.errors => return false,
> +                    (Some(state), Some(filters)) => {
> +                        if !filters.contains(&state) {
> +                            return false;
> +                        }
> +                    }
> +                    (None, Some(_)) => return false,
> +                    _ => {}
> +                }
> +
> +                true
> +            })
> +            .collect();
> +
> +        Ok(returned_tasks)
> +    })
> +    .await?
> +}
> +
> +/// Insert a newly created tasks into the list of tracked tasks.
> +///
> +/// Any tracked task will be polled with a short interval until the task
> +/// has finished.
> +pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
> +    tokio::task::spawn_blocking(move || {
> +        let cache = get_cache()?.write()?;
> +        // TODO:: Handle PBS tasks correctly.
> +        let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
> +        let task = TaskCacheItem {
> +            upid: task.clone(),
> +            starttime: pve_upid.starttime,
> +            status: None,
> +            endtime: None,
> +        };
> +        cache.add_tracked_task(task)
> +    })
> +    .await?
> +}
> +
> +/// Get a new [`TaskCache`] instance.
> +///
> +/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
> +/// update loop.
> +pub fn get_cache() -> Result<TaskCache, Error> {
> +    let file_options = proxmox_product_config::default_create_options();
> +
> +    let cache_path = Path::new(REMOTE_TASKS_DIR);
> +    let cache = TaskCache::new(
> +        cache_path,
> +        file_options,
> +        KEEP_OLD_FILES,
> +        NUMBER_OF_UNCOMPRESSED_FILES,
> +        ROTATE_AFTER,
> +        JOURNAL_MAX_SIZE,
> +    )?;
> +
> +    Ok(cache)
>   }
>   
>   /// Parses a task status string into a TaskStateType





More information about the pdm-devel mailing list