[pdm-devel] [PATCH proxmox-datacenter-manager v7 2/7] remote tasks: add background task for task polling, use new task cache

Lukas Wagner l.wagner at proxmox.com
Wed Aug 20 14:43:24 CEST 2025


This commits changes the remote task module as follows:

- Add a new background task for regular polling of task data
Instead of triggering fetching of task data from the `get_tasks` function,
which is usually called by an API handler, we move the fetching to a
new background task. The task fetches the latest tasks from all remotes
and stores them in the task cache in regular intervals (10 minutes).
The `get_tasks` function itself only reads from the cache.
The main rationale for this change is that for large setups, fetching
tasks from all remotes can take a *long* time (e.g. hundreds of remotes,
each with a >100ms connection - adds up to minutes quickly).
If we do this from within `get_tasks`, the API handler calling the
function is also blocked for the entire time.
The `get_tasks` API is called every couple of seconds by the UI the get
a list of running remote tasks, so this *must* be quick.

- Tracked tasks are also polled in the same background task, but with
a short polling delay (10 seconds). If a tracked task finishes,
a out-of-order fetch of tasks for a remote is performed to update
the cache with all task data from the finished task.

- Only finished tasks are requested from the remotes. This avoids a
foreign (as in, not started by PDM) running task to appear stuck in
the running state until the next regular task cache refresh.
The tracked task polling could be extended to also poll running foreign
tasks, but this is easy addition for the future.

- Tasks are now stored in the new improved task cache implementation.
This should make retrieving tasks much quicker and avoids
unneeded disk IO.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
Reviewed-by: Dominik Csapak <d.csapak at proxmox.com>
---

Notes:
    Changes since v5:
      - Incorporate review feedback from @Dominik (thx!)
      - Change task tracking approach:
        - Instead of using the oldest running task as a cutoff and
        switching to a lower fetching interval if there is a tracked task,
        we poll tracked tasks directly with a 10 second interval.
        Once a tracked task finishes, we do a regular task fetch once
        to get full task data (endtime, status).
        This is a nicer approach for long running tasks, since we do
        not repeatedly request the same tasks over and over again.
        - Use proxmox_product_config to get CreateOptions where
        it makes sense.
        - Use timestamps instead of cycle counts to keep track
        of when we want to rotate the task archive or do a full
        task fetch
        - Be more clever about how we request the semaphores. Instead
        of requesting all semaphores that we could potentially to poll
        multiple nodes of a remote in parallel, request them
        on demand.
        - Keep track of per-node failures while fetching tasks and
        feed this information to the cache implementation
        so that it can maintain the per-node cutoff timestamp.
        - Make documentation of public constants a bit easier
        to understand.
    
    Changes since v4:
      - Rebase onto latest master, adapting to changes in
        the section config type
    
    Changes since v2:
      - Adapt to new locking approach (only drops a `mut`)
    
    Changes since v1:
    
     - use const Duration instead of u64s for durations, using
       Duration::as_secs() where needed
     - Move the remote_task fetching task functions to
       src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
     - remote_tasks::get_tasks: wrap function body in a
       tokio::task::spawn_blocking. using the TaskCache::get_tasks
       iterator does disk IO and could block the executor
     - Added some doc strings to make the purpose/workings of
       some functions clearer
     - Couple of variables have been renamed for more clarity

 server/src/api/pve/lxc.rs                     |  10 +-
 server/src/api/pve/mod.rs                     |   4 +-
 server/src/api/pve/qemu.rs                    |   6 +-
 server/src/api/remote_tasks.rs                |  11 +-
 server/src/bin/proxmox-datacenter-api/main.rs |   1 +
 .../bin/proxmox-datacenter-api/tasks/mod.rs   |   1 +
 .../tasks/remote_tasks.rs                     | 559 ++++++++++++++++
 server/src/remote_tasks/mod.rs                | 625 ++++--------------
 8 files changed, 706 insertions(+), 511 deletions(-)
 create mode 100644 server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs

diff --git a/server/src/api/pve/lxc.rs b/server/src/api/pve/lxc.rs
index f1c31425..83f9f4aa 100644
--- a/server/src/api/pve/lxc.rs
+++ b/server/src/api/pve/lxc.rs
@@ -209,7 +209,7 @@ pub async fn lxc_start(
 
     let upid = pve.start_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -242,7 +242,7 @@ pub async fn lxc_stop(
 
     let upid = pve.stop_lxc_async(&node, vmid, Default::default()).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -277,7 +277,7 @@ pub async fn lxc_shutdown(
         .shutdown_lxc_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -357,7 +357,7 @@ pub async fn lxc_migrate(
     };
     let upid = pve.migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -518,5 +518,5 @@ pub async fn lxc_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_lxc(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index dd7cf382..d472cf58 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -76,9 +76,9 @@ const RESOURCES_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_RESOURCES
 const STATUS_ROUTER: Router = Router::new().get(&API_METHOD_CLUSTER_STATUS);
 
 // converts a remote + PveUpid into a RemoteUpid and starts tracking it
-fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
+async fn new_remote_upid(remote: String, upid: PveUpid) -> Result<RemoteUpid, Error> {
     let remote_upid: RemoteUpid = (remote, upid.to_string()).try_into()?;
-    remote_tasks::track_running_task(remote_upid.clone());
+    remote_tasks::track_running_task(remote_upid.clone()).await?;
     Ok(remote_upid)
 }
 
diff --git a/server/src/api/pve/qemu.rs b/server/src/api/pve/qemu.rs
index 5a41a69e..54ede112 100644
--- a/server/src/api/pve/qemu.rs
+++ b/server/src/api/pve/qemu.rs
@@ -216,7 +216,7 @@ pub async fn qemu_start(
         .start_qemu_async(&node, vmid, Default::default())
         .await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -377,7 +377,7 @@ pub async fn qemu_migrate(
     };
     let upid = pve.migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(remote, upid)
+    new_remote_upid(remote, upid).await
 }
 
 #[api(
@@ -564,5 +564,5 @@ pub async fn qemu_remote_migrate(
     log::info!("migrating vm {vmid} of node {node:?}");
     let upid = source_conn.remote_migrate_qemu(&node, vmid, params).await?;
 
-    new_remote_upid(source, upid)
+    new_remote_upid(source, upid).await
 }
diff --git a/server/src/api/remote_tasks.rs b/server/src/api/remote_tasks.rs
index e629000c..05ce3666 100644
--- a/server/src/api/remote_tasks.rs
+++ b/server/src/api/remote_tasks.rs
@@ -21,13 +21,6 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
     input: {
         properties: {
-            "max-age": {
-                type: Integer,
-                optional: true,
-                // TODO: sensible default max-age
-                default: 300,
-                description: "Maximum age of cached task data",
-            },
             filters: {
                 type: TaskFilters,
                 flatten: true,
@@ -36,8 +29,8 @@ const SUBDIRS: SubdirMap = &sorted!([("list", &Router::new().get(&API_METHOD_LIS
     },
 )]
 /// Get the list of tasks for all remotes.
-async fn list_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let tasks = remote_tasks::get_tasks(max_age, filters).await?;
+async fn list_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    let tasks = remote_tasks::get_tasks(filters).await?;
 
     Ok(tasks)
 }
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index db6b2585..42bc0e1e 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -376,6 +376,7 @@ async fn run(debug: bool) -> Result<(), Error> {
     metric_collection::start_task();
     tasks::remote_node_mapping::start_task();
     resource_cache::start_task();
+    tasks::remote_tasks::start_task()?;
 
     server.await?;
     log::info!("server shutting down, waiting for active workers to complete");
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
index e6ead882..a6b1f439 100644
--- a/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
+++ b/server/src/bin/proxmox-datacenter-api/tasks/mod.rs
@@ -1 +1,2 @@
 pub mod remote_node_mapping;
+pub mod remote_tasks;
diff --git a/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
new file mode 100644
index 00000000..4701a935
--- /dev/null
+++ b/server/src/bin/proxmox-datacenter-api/tasks/remote_tasks.rs
@@ -0,0 +1,559 @@
+use std::{
+    collections::{HashMap, HashSet},
+    sync::Arc,
+    time::{Duration, Instant},
+};
+
+use anyhow::{format_err, Error};
+use nix::sys::stat::Mode;
+use tokio::{sync::Semaphore, task::JoinSet};
+
+use pdm_api_types::{
+    remotes::{Remote, RemoteType},
+    RemoteUpid,
+};
+use proxmox_section_config::typed::SectionConfigData;
+use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource};
+
+use server::{
+    api::pve,
+    remote_tasks::{
+        self,
+        task_cache::{NodeFetchSuccessMap, State, TaskCache, TaskCacheItem},
+        KEEP_OLD_FILES, REMOTE_TASKS_DIR, ROTATE_AFTER,
+    },
+    task_utils,
+};
+
+/// Tick interval for the remote task fetching task.
+/// This is also the rate at which we check on tracked tasks.
+const POLL_INTERVAL: Duration = Duration::from_secs(10);
+
+/// Interval in seconds at which to fetch the newest tasks from remotes (if there is no tracked
+/// task for this remote).
+const TASK_FETCH_INTERVAL: Duration = Duration::from_secs(600);
+
+/// Interval at which to check for task cache rotation.
+const CHECK_ROTATE_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Interval at which the task cache journal should be applied.
+///
+/// Choosing a value here is a trade-off between performance and avoiding unnecessary writes.
+/// Letting the journal grow large avoids writes, but since the journal is not sorted, accessing
+/// it will be slower than the task archive itself, as the entire journal must be loaded into
+/// memory and then sorted by task starttime. Applying the journal more often might
+/// lead to more writes, but should yield better performance.
+const APPLY_JOURNAL_INTERVAL: Duration = Duration::from_secs(3600);
+
+/// Maximum number of concurrent connections per remote.
+const CONNECTIONS_PER_PVE_REMOTE: usize = 5;
+
+/// Maximum number of total concurrent connections.
+const MAX_CONNECTIONS: usize = 20;
+
+/// Maximum number of tasks to fetch from a single remote in one API call.
+const MAX_TASKS_TO_FETCH: u64 = 5000;
+
+/// (Ephemeral) Remote task fetching task state.
+struct TaskState {
+    /// Time at which we last checked for archive rotation.
+    last_rotate_check: Instant,
+    /// Time at which we fetch tasks the last time.
+    last_fetch: Instant,
+    /// Time at which we last applied the journal.
+    last_journal_apply: Instant,
+}
+
+impl TaskState {
+    fn new() -> Self {
+        let now = Instant::now();
+
+        Self {
+            last_rotate_check: now - CHECK_ROTATE_INTERVAL,
+            last_fetch: now - TASK_FETCH_INTERVAL,
+            last_journal_apply: now - APPLY_JOURNAL_INTERVAL,
+        }
+    }
+
+    /// Reset the task archive rotation timestamp.
+    fn reset_rotate_check(&mut self) {
+        self.last_rotate_check = Instant::now();
+    }
+
+    /// Reset the task fetch timestamp.
+    fn reset_fetch(&mut self) {
+        self.last_fetch = Instant::now();
+    }
+
+    /// Reset the journal apply timestamp.
+    fn reset_journal_apply(&mut self) {
+        self.last_journal_apply = Instant::now();
+    }
+
+    /// Should we check for archive rotation?
+    fn is_due_for_rotate_check(&self) -> bool {
+        Instant::now().duration_since(self.last_rotate_check) > CHECK_ROTATE_INTERVAL
+    }
+
+    /// Should we fetch tasks?
+    fn is_due_for_fetch(&self) -> bool {
+        Instant::now().duration_since(self.last_fetch) > TASK_FETCH_INTERVAL
+    }
+
+    /// Should we apply the task archive's journal?
+    fn is_due_for_journal_apply(&self) -> bool {
+        Instant::now().duration_since(self.last_journal_apply) > APPLY_JOURNAL_INTERVAL
+    }
+}
+
+/// Start the remote task fetching task
+pub fn start_task() -> Result<(), Error> {
+    let dir_options =
+        proxmox_product_config::default_create_options().perm(Mode::from_bits_truncate(0o0750));
+
+    proxmox_sys::fs::create_path(REMOTE_TASKS_DIR, None, Some(dir_options))?;
+
+    tokio::spawn(async move {
+        let task_scheduler = std::pin::pin!(remote_task_fetching_task());
+        let abort_future = std::pin::pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
+    Ok(())
+}
+
+/// Task which handles fetching remote tasks and task archive rotation.
+/// This function never returns.
+async fn remote_task_fetching_task() -> ! {
+    let mut task_state = TaskState::new();
+
+    let mut interval = tokio::time::interval(POLL_INTERVAL);
+    interval.reset_at(task_utils::next_aligned_instant(POLL_INTERVAL.as_secs()).into());
+
+    // We don't really care about catching up to missed tick, we just want
+    // a steady tick rate.
+    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
+    if let Err(err) = init_cache().await {
+        log::error!("error when initialized task cache: {err:#}");
+    }
+
+    loop {
+        interval.tick().await;
+        if let Err(err) = do_tick(&mut task_state).await {
+            log::error!("error when fetching remote tasks: {err:#}");
+        }
+    }
+}
+
+/// Handle a single timer tick.
+/// Will handle archive file rotation, polling of tracked tasks and fetching or remote tasks.
+async fn do_tick(task_state: &mut TaskState) -> Result<(), Error> {
+    let cache = remote_tasks::get_cache()?;
+
+    if task_state.is_due_for_rotate_check() {
+        log::debug!("checking if remote task archive should be rotated");
+        if rotate_cache(cache.clone()).await? {
+            log::info!("rotated remote task archive");
+        }
+
+        task_state.reset_rotate_check();
+    }
+
+    if task_state.is_due_for_journal_apply() {
+        apply_journal(cache.clone()).await?;
+        task_state.reset_journal_apply();
+    }
+
+    let (remote_config, _) = tokio::task::spawn_blocking(pdm_config::remotes::config).await??;
+
+    let total_connections_semaphore = Arc::new(Semaphore::new(MAX_CONNECTIONS));
+
+    let cache_state = cache.read_state();
+    let poll_results = poll_tracked_tasks(
+        &remote_config,
+        cache_state.tracked_tasks(),
+        Arc::clone(&total_connections_semaphore),
+    )
+    .await?;
+
+    // Get a list of remotes that we should poll in this cycle.
+    let remotes = if task_state.is_due_for_fetch() {
+        task_state.reset_fetch();
+        get_all_remotes(&remote_config)
+    } else {
+        get_remotes_with_finished_tasks(&remote_config, &poll_results)
+    };
+
+    let (all_tasks, update_state_for_remote) = fetch_remotes(
+        remotes,
+        Arc::new(cache_state),
+        Arc::clone(&total_connections_semaphore),
+    )
+    .await;
+
+    if !all_tasks.is_empty() {
+        update_task_cache(cache, all_tasks, update_state_for_remote, poll_results).await?;
+    }
+
+    Ok(())
+}
+
+/// Initialize the remote task cache with initial archive files, in case there are not
+/// any archive files yet.
+///
+/// This allows us to immediately backfill remote task history when setting up a new PDM instance
+/// without any prior task archive rotation.
+async fn init_cache() -> Result<(), Error> {
+    tokio::task::spawn_blocking(|| {
+        let cache = remote_tasks::get_cache()?;
+        cache.write()?.init(proxmox_time::epoch_i64())?;
+        Ok(())
+    })
+    .await?
+}
+
+/// Fetch tasks from a list of remotes.
+///
+/// Returns a list of tasks and a map that shows whether we want to update the
+/// cutoff timestamp in the statefile. We don't want to update the cutoff if
+/// the connection to one remote failed or if we could not reach all remotes in a cluster.
+async fn fetch_remotes(
+    remotes: Vec<Remote>,
+    cache_state: Arc<State>,
+    total_connections_semaphore: Arc<Semaphore>,
+) -> (Vec<TaskCacheItem>, NodeFetchSuccessMap) {
+    let mut join_set = JoinSet::new();
+
+    for remote in remotes {
+        let semaphore = Arc::clone(&total_connections_semaphore);
+        let state_clone = Arc::clone(&cache_state);
+
+        join_set.spawn(async move {
+            log::debug!("fetching remote tasks for '{}'", remote.id);
+            fetch_tasks(&remote, state_clone, semaphore)
+                .await
+                .map_err(|err| {
+                    format_err!("could not fetch tasks from remote '{}': {err}", remote.id)
+                })
+        });
+    }
+
+    let mut all_tasks = Vec::new();
+    let mut update_state_for_remote = NodeFetchSuccessMap::default();
+
+    while let Some(res) = join_set.join_next().await {
+        match res {
+            Ok(Ok(FetchedTasks {
+                tasks,
+                node_results,
+            })) => {
+                all_tasks.extend(tasks);
+                update_state_for_remote.merge(node_results);
+            }
+            Ok(Err(err)) => log::error!("{err:#}"),
+            Err(err) => log::error!("could not join task fetching future: {err:#}"),
+        }
+    }
+
+    (all_tasks, update_state_for_remote)
+}
+
+/// Return all remotes from the given config.
+fn get_all_remotes(remote_config: &SectionConfigData<Remote>) -> Vec<Remote> {
+    remote_config
+        .into_iter()
+        .map(|(_, section)| section)
+        .cloned()
+        .collect()
+}
+
+/// Return all remotes that correspond to a list of finished tasks.
+fn get_remotes_with_finished_tasks(
+    remote_config: &SectionConfigData<Remote>,
+    poll_results: &HashMap<RemoteUpid, PollResult>,
+) -> Vec<Remote> {
+    let remotes_with_finished_tasks: HashSet<&str> = poll_results
+        .iter()
+        .filter_map(|(upid, status)| (*status == PollResult::Finished).then_some(upid.remote()))
+        .collect();
+
+    remote_config
+        .into_iter()
+        .filter_map(|(name, remote)| {
+            remotes_with_finished_tasks
+                .contains(&name)
+                .then_some(remote)
+        })
+        .cloned()
+        .collect()
+}
+
+/// Rotate the task cache if necessary.
+///
+/// Returns Ok(true) the cache's files were rotated.
+async fn rotate_cache(cache: TaskCache) -> Result<bool, Error> {
+    tokio::task::spawn_blocking(move || cache.write()?.rotate(proxmox_time::epoch_i64())).await?
+}
+
+/// Apply the task cache journal.
+async fn apply_journal(cache: TaskCache) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || cache.write()?.apply_journal()).await?
+}
+
+/// Fetched tasks from a single remote.
+struct FetchedTasks {
+    /// List of tasks.
+    tasks: Vec<TaskCacheItem>,
+    /// Contains whether a cluster node was fetched successfully.
+    node_results: NodeFetchSuccessMap,
+}
+
+/// Fetch tasks (active and finished) from a remote.
+async fn fetch_tasks(
+    remote: &Remote,
+    state: Arc<State>,
+    total_connections_semaphore: Arc<Semaphore>,
+) -> Result<FetchedTasks, Error> {
+    let mut tasks = Vec::new();
+
+    let mut node_results = NodeFetchSuccessMap::default();
+
+    match remote.ty {
+        RemoteType::Pve => {
+            let client = pve::connect(remote)?;
+
+            let nodes = {
+                // This permit *must* be dropped before we acquire the permits for the
+                // per-node connections - otherwise we risk a deadlock.
+                let _permit = total_connections_semaphore.acquire().await.unwrap();
+                client.list_nodes().await?
+            };
+
+            // This second semaphore is used to limit the number of concurrent connections
+            // *per remote*, not in total.
+            let per_remote_semaphore = Arc::new(Semaphore::new(CONNECTIONS_PER_PVE_REMOTE));
+            let mut join_set = JoinSet::new();
+
+            for node in nodes {
+                let node_name = node.node.to_string();
+
+                let since = state
+                    .cutoff_timestamp(&remote.id, &node_name)
+                    .unwrap_or_else(|| {
+                        proxmox_time::epoch_i64() - (KEEP_OLD_FILES as u64 * ROTATE_AFTER) as i64
+                    });
+
+                let params = ListTasks {
+                    source: Some(ListTasksSource::Archive),
+                    since: Some(since),
+                    // If `limit` is not provided, we only receive 50 tasks
+                    limit: Some(MAX_TASKS_TO_FETCH),
+                    ..Default::default()
+                };
+
+                let per_remote_permit = Arc::clone(&per_remote_semaphore)
+                    .acquire_owned()
+                    .await
+                    .unwrap();
+
+                let total_connections_permit = Arc::clone(&total_connections_semaphore)
+                    .acquire_owned()
+                    .await
+                    .unwrap();
+
+                let remote_clone = remote.clone();
+
+                join_set.spawn(async move {
+                    let res = async {
+                        let client = pve::connect(&remote_clone)?;
+                        let task_list =
+                            client
+                                .get_task_list(&node.node, params)
+                                .await
+                                .map_err(|err| {
+                                    format_err!(
+                                        "remote '{}', node '{}': {err}",
+                                        remote_clone.id,
+                                        node.node
+                                    )
+                                })?;
+                        Ok::<Vec<_>, Error>(task_list)
+                    }
+                    .await;
+
+                    drop(total_connections_permit);
+                    drop(per_remote_permit);
+
+                    (node_name, res)
+                });
+            }
+
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok((node_name, result)) => match result {
+                        Ok(task_list) => {
+                            let mapped =
+                                task_list.into_iter().filter_map(|task| {
+                                    match map_pve_task(task, &remote.id) {
+                                        Ok(task) => Some(task),
+                                        Err(err) => {
+                                            log::error!(
+                                                "could not map task data, skipping: {err:#}"
+                                            );
+                                            None
+                                        }
+                                    }
+                                });
+
+                            tasks.extend(mapped);
+                            node_results.set_node_success(remote.id.clone(), node_name);
+                        }
+                        Err(error) => {
+                            log::error!("could not fetch tasks: {error:#}");
+                            node_results.set_node_failure(remote.id.clone(), node_name);
+                        }
+                    },
+                    Err(err) => return Err(err.into()),
+                }
+            }
+        }
+        RemoteType::Pbs => {
+            // TODO: Add code for PBS
+        }
+    }
+
+    Ok(FetchedTasks {
+        tasks,
+        node_results,
+    })
+}
+
+#[derive(PartialEq, Debug)]
+/// Outcome from polling a tracked task.
+enum PollResult {
+    /// Tasks is still running.
+    Running,
+    /// Task is finished, poll remote tasks to get final status/endtime.
+    Finished,
+    /// Should be dropped from the active file.
+    RequestError,
+    /// Remote does not exist any more -> remove immediately from tracked task list.
+    RemoteGone,
+}
+
+/// Poll all tracked tasks.
+async fn poll_tracked_tasks(
+    remote_config: &SectionConfigData<Remote>,
+    tracked_tasks: impl Iterator<Item = &RemoteUpid>,
+    total_connections_semaphore: Arc<Semaphore>,
+) -> Result<HashMap<RemoteUpid, PollResult>, Error> {
+    let mut join_set = JoinSet::new();
+
+    for task in tracked_tasks.cloned() {
+        let permit = Arc::clone(&total_connections_semaphore)
+            .acquire_owned()
+            .await
+            .unwrap();
+
+        let remote = remote_config.get(task.remote()).cloned();
+
+        join_set.spawn(async move {
+            // Move permit into this async block.
+            let _permit = permit;
+
+            match remote {
+                Some(remote) => poll_single_tracked_task(remote, task).await,
+                None => {
+                    log::info!(
+                        "remote {} does not exist any more, dropping tracked task",
+                        task.remote()
+                    );
+                    (task, PollResult::RemoteGone)
+                }
+            }
+        });
+    }
+
+    let mut results = HashMap::new();
+    while let Some(task_result) = join_set.join_next().await {
+        let (upid, result) = task_result?;
+        results.insert(upid, result);
+    }
+
+    Ok(results)
+}
+
+/// Poll a single tracked task.
+async fn poll_single_tracked_task(remote: Remote, task: RemoteUpid) -> (RemoteUpid, PollResult) {
+    match remote.ty {
+        RemoteType::Pve => {
+            log::debug!("polling tracked task {}", task);
+
+            let status = match server::api::pve::tasks::get_task_status(
+                remote.id.clone(),
+                task.clone(),
+                false,
+            )
+            .await
+            {
+                Ok(status) => status,
+                Err(err) => {
+                    log::error!("could not get status from remote: {err:#}");
+                    return (task, PollResult::RequestError);
+                }
+            };
+
+            let result = if status.exitstatus.is_some() {
+                PollResult::Finished
+            } else {
+                PollResult::Running
+            };
+
+            (task, result)
+        }
+        RemoteType::Pbs => {
+            // TODO: Implement for PBS
+            (task, PollResult::RequestError)
+        }
+    }
+}
+
+/// Map a `ListTasksResponse` to `TaskCacheItem`
+fn map_pve_task(task: ListTasksResponse, remote: &str) -> Result<TaskCacheItem, Error> {
+    let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
+
+    Ok(TaskCacheItem {
+        upid: remote_upid,
+        starttime: task.starttime,
+        endtime: task.endtime,
+        status: task.status,
+    })
+}
+
+/// Update task cache with results from tracked task polling & regular task fetching.
+async fn update_task_cache(
+    cache: TaskCache,
+    new_tasks: Vec<TaskCacheItem>,
+    update_state_for_remote: NodeFetchSuccessMap,
+    poll_results: HashMap<RemoteUpid, PollResult>,
+) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || {
+        let drop_tracked = poll_results
+            .into_iter()
+            .filter_map(|(upid, result)| match result {
+                PollResult::Running => None,
+                PollResult::Finished | PollResult::RequestError | PollResult::RemoteGone => {
+                    Some(upid)
+                }
+            })
+            .collect();
+
+        cache
+            .write()?
+            .update(new_tasks, &update_state_for_remote, drop_tracked)?;
+
+        Ok(())
+    })
+    .await?
+}
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 7c8e31ef..cec2cc1e 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,515 +1,156 @@
-use std::{
-    collections::{HashMap, HashSet},
-    fs::File,
-    path::{Path, PathBuf},
-    sync::{LazyLock, RwLock},
-    time::Duration,
-};
+use std::path::Path;
 
 use anyhow::Error;
-use pdm_api_types::{
-    remotes::{Remote, RemoteType},
-    RemoteUpid, TaskFilters, TaskListItem, TaskStateType,
-};
-use proxmox_sys::fs::CreateOptions;
-use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
-use tokio::task::JoinHandle;
 
-use crate::{api::pve, task_utils};
+use pdm_api_types::{RemoteUpid, TaskFilters, TaskListItem, TaskStateType};
+use pve_api_types::PveUpid;
 
-mod task_cache;
+pub mod task_cache;
+
+use task_cache::{GetTasks, TaskCache, TaskCacheItem};
+
+/// Base directory for the remote task cache.
+pub const REMOTE_TASKS_DIR: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-tasks");
+
+/// Maximum size at which the journal will applied early when adding new tasks.
+const JOURNAL_MAX_SIZE: u64 = 5 * 1024 * 1024;
+
+/// Rotate once the most recent archive file is at least 24 hour old.
+pub const ROTATE_AFTER: u64 = 24 * 3600;
+
+/// Keep 7 days worth of tasks.
+pub const KEEP_OLD_FILES: u32 = 7;
+
+/// Number of uncompressed archive files. These will be be the most recent ones.
+const NUMBER_OF_UNCOMPRESSED_FILES: u32 = 2;
 
 /// Get tasks for all remotes
 // FIXME: filter for privileges
-pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
-    let (remotes, _) = pdm_config::remotes::config()?;
+pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
+    tokio::task::spawn_blocking(move || {
+        let cache = get_cache()?.read()?;
 
-    let mut all_tasks = Vec::new();
-
-    let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-    let mut cache = TaskCache::new(cache_path)?;
-
-    // Force a refresh for all tasks of a remote if a task is finished.
-    // Not super nice, but saves us from persisting finished tasks. Also,
-    // the /nodes/<node>/tasks/<upid>/status endpoint does not return
-    // a task's endtime, which is only returned by
-    // /nodes/<node>/tasks...
-    // Room for improvements in the future.
-    invalidate_cache_for_finished_tasks(&mut cache);
-
-    for (remote_name, remote) in remotes.iter() {
-        let now = proxmox_time::epoch_i64();
-
-        if let Some(tasks) = cache.get_tasks(remote_name, now, max_age) {
-            // Data in cache is recent enough and has not been invalidated.
-            all_tasks.extend(tasks);
+        let which = if filters.running {
+            GetTasks::Active
         } else {
-            let tasks = match fetch_tasks(remote).await {
-                Ok(tasks) => tasks,
-                Err(err) => {
-                    // ignore errors for not reachable remotes
-                    continue;
+            GetTasks::All
+        };
+
+        let returned_tasks = cache
+            .get_tasks(which)?
+            .skip(filters.start as usize)
+            .take(filters.limit as usize)
+            .filter_map(|task| {
+                // TODO: Handle PBS tasks
+                let pve_upid: Result<PveUpid, Error> = task.upid.upid.parse();
+                match pve_upid {
+                    Ok(pve_upid) => Some(TaskListItem {
+                        upid: task.upid.to_string(),
+                        node: pve_upid.node,
+                        pid: pve_upid.pid as i64,
+                        pstart: pve_upid.pstart,
+                        starttime: pve_upid.starttime,
+                        worker_type: pve_upid.worker_type,
+                        worker_id: None,
+                        user: pve_upid.auth_id,
+                        endtime: task.endtime,
+                        status: task.status,
+                    }),
+                    Err(err) => {
+                        log::error!("could not parse UPID: {err:#}");
+                        None
+                    }
                 }
-            };
-            cache.set_tasks(remote_name, tasks.clone(), now);
-
-            all_tasks.extend(tasks);
-        }
-    }
-
-    let mut returned_tasks = add_running_tasks(all_tasks)?;
-    returned_tasks.sort_by(|a, b| b.starttime.cmp(&a.starttime));
-    let returned_tasks = returned_tasks
-        .into_iter()
-        .filter(|item| {
-            if filters.running && item.endtime.is_some() {
-                return false;
-            }
-
-            if let Some(until) = filters.until {
-                if item.starttime > until {
+            })
+            .filter(|item| {
+                if filters.running && item.endtime.is_some() {
                     return false;
                 }
-            }
 
-            if let Some(since) = filters.since {
-                if item.starttime < since {
-                    return false;
-                }
-            }
-
-            if let Some(needle) = &filters.userfilter {
-                if !item.user.contains(needle) {
-                    return false;
-                }
-            }
-
-            if let Some(typefilter) = &filters.typefilter {
-                if !item.worker_type.contains(typefilter) {
-                    return false;
-                }
-            }
-
-            let state = item.status.as_ref().map(|status| tasktype(status));
-
-            match (state, &filters.statusfilter) {
-                (Some(TaskStateType::OK), _) if filters.errors => return false,
-                (Some(state), Some(filters)) => {
-                    if !filters.contains(&state) {
+                if let Some(until) = filters.until {
+                    if item.starttime > until {
                         return false;
                     }
                 }
-                (None, Some(_)) => return false,
-                _ => {}
-            }
 
-            true
-        })
-        .skip(filters.start as usize)
-        .take(filters.limit as usize)
-        .collect();
-
-    // We don't need to wait for this task to finish
-    tokio::task::spawn_blocking(move || {
-        if let Err(e) = cache.save() {
-            log::error!("could not save task cache: {e}");
-        }
-    });
-
-    Ok(returned_tasks)
-}
-
-/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
-    let mut tasks = Vec::new();
-
-    match remote.ty {
-        RemoteType::Pve => {
-            let client = pve::connect(remote)?;
-
-            // N+1 requests - we could use /cluster/tasks, but that one
-            // only gives a limited task history
-            for node in client.list_nodes().await? {
-                let params = ListTasks {
-                    // Include running tasks
-                    source: Some(ListTasksSource::All),
-                    // TODO: How much task history do we want? Right now we just hard-code it
-                    // to 7 days.
-                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
-                    ..Default::default()
-                };
-
-                let list = client.get_task_list(&node.node, params).await?;
-                let mapped = map_tasks(list, &remote.id)?;
-
-                tasks.extend(mapped);
-            }
-        }
-        RemoteType::Pbs => {
-            // TODO: Add code for PBS
-        }
-    }
-
-    Ok(tasks)
-}
-
-/// Convert a `Vec<ListTaskResponce>` to `Vec<TaskListItem>`
-fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskListItem>, Error> {
-    let mut mapped = Vec::new();
-
-    for task in tasks {
-        let remote_upid: RemoteUpid = (remote.to_string(), task.upid.to_string()).try_into()?;
-
-        mapped.push(TaskListItem {
-            upid: remote_upid.to_string(),
-            node: task.node,
-            pid: task.pid,
-            pstart: task.pstart as u64,
-            starttime: task.starttime,
-            worker_type: task.ty,
-            worker_id: Some(task.id),
-            user: task.user,
-            endtime: task.endtime,
-            status: task.status,
-        })
-    }
-
-    Ok(mapped)
-}
-
-/// Drops the cached task list of a remote for all finished tasks.
-///
-/// We use this to force a refresh so that we get the full task
-/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
-    let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
-
-    // If a task is finished, we force a refresh for the remote - otherwise
-    // we don't get the 'endtime' for the task.
-    for task in finished.drain() {
-        cache.invalidate_cache_for_remote(task.remote());
-    }
-}
-
-/// Supplement the list of tasks that we received from the remote with
-/// the tasks that were started by PDM and are currently running.
-fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem>, Error> {
-    let mut returned_tasks = Vec::new();
-
-    let mut running_tasks = RUNNING_FOREIGN_TASKS.write().expect("mutex poisoned");
-    for task in cached_tasks {
-        let remote_upid = task.upid.parse()?;
-
-        if running_tasks.contains(&remote_upid) {
-            if task.endtime.is_some() {
-                // Task is finished but we still think it is running ->
-                // Drop it from RUNNING_FOREIGN_TASKS
-                running_tasks.remove(&remote_upid);
-
-                // No need to put it in FINISHED_TASKS, since we already
-                // got its state recently enough (we know the status and endtime)
-            }
-        } else {
-            returned_tasks.push(task);
-        }
-    }
-
-    for task in running_tasks.iter() {
-        let upid: PveUpid = task.upid.parse()?;
-        returned_tasks.push(TaskListItem {
-            upid: task.to_string(),
-            node: upid.node,
-            pid: upid.pid as i64,
-            pstart: upid.pstart,
-            starttime: upid.starttime,
-            worker_type: upid.worker_type,
-            worker_id: upid.worker_id,
-            user: upid.auth_id,
-            endtime: None,
-            status: None,
-        });
-    }
-
-    Ok(returned_tasks)
-}
-
-/// A cache for fetched remote tasks.
-struct TaskCache {
-    /// Cache entries
-    content: TaskCacheContent,
-
-    /// Entries that were added or updated - these will be persistet
-    /// when `save` is called.
-    new_or_updated: TaskCacheContent,
-
-    /// Cache entries were changed/removed.
-    dirty: bool,
-
-    /// File-location at which the cached tasks are stored.
-    cachefile_path: PathBuf,
-}
-
-impl TaskCache {
-    /// Create a new tasks cache instance by loading
-    /// the cache from disk.
-    fn new(cachefile_path: PathBuf) -> Result<Self, Error> {
-        Ok(Self {
-            content: Self::load_content()?,
-            new_or_updated: Default::default(),
-            dirty: false,
-            cachefile_path,
-        })
-    }
-
-    /// Load the task cache contents from disk.
-    fn load_content() -> Result<TaskCacheContent, Error> {
-        let taskcache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-        let content = proxmox_sys::fs::file_read_optional_string(taskcache_path)?;
-
-        let content = if let Some(content) = content {
-            serde_json::from_str(&content)?
-        } else {
-            Default::default()
-        };
-
-        Ok(content)
-    }
-
-    /// Get path for the cache's lockfile.
-    fn lockfile_path(&self) -> PathBuf {
-        let mut path = self.cachefile_path.clone();
-        path.set_extension("lock");
-        path
-    }
-
-    /// Persist the task cache
-    ///
-    /// This method requests an exclusive lock for the task cache lockfile.
-    fn save(&mut self) -> Result<(), Error> {
-        // if we have not updated anything, we don't have to update the cache file
-        if !self.dirty {
-            return Ok(());
-        }
-
-        let _guard = self.lock(Duration::from_secs(5))?;
-
-        // Read content again, in case somebody has changed it in the meanwhile
-        let mut content = Self::load_content()?;
-
-        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
-            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
-                // Only update entry if nobody else has updated it in the meanwhile
-                if existing_entry.timestamp < entry.timestamp {
-                    *existing_entry = entry;
-                }
-            } else {
-                content.remote_tasks.insert(remote_name, entry);
-            }
-        }
-
-        let bytes = serde_json::to_vec_pretty(&content)?;
-
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-        proxmox_sys::fs::replace_file(&self.cachefile_path, &bytes, file_options, true)?;
-
-        self.dirty = false;
-
-        Ok(())
-    }
-
-    // Update task data for a given remote.
-    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, timestamp: i64) {
-        self.dirty = true;
-        self.new_or_updated
-            .remote_tasks
-            .insert(remote.to_string(), TaskCacheEntry { timestamp, tasks });
-    }
-
-    // Get task data for a given remote.
-    fn get_tasks(&self, remote: &str, now: i64, max_age: i64) -> Option<Vec<TaskListItem>> {
-        if let Some(entry) = self.content.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-
-            Some(entry.tasks.clone())
-        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
-            if (entry.timestamp + max_age) < now {
-                return None;
-            }
-            Some(entry.tasks.clone())
-        } else {
-            None
-        }
-    }
-
-    // Invalidate cache for a given remote.
-    fn invalidate_cache_for_remote(&mut self, remote: &str) {
-        self.dirty = true;
-        self.content.remote_tasks.remove(remote);
-    }
-
-    // Lock the cache for modification.
-    //
-    // While the cache is locked, other users can still read the cache
-    // without a lock, since the cache file is replaced atomically
-    // when updating.
-    fn lock(&self, duration: Duration) -> Result<File, Error> {
-        let api_uid = pdm_config::api_user()?.uid;
-        let api_gid = pdm_config::api_group()?.gid;
-
-        let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-        proxmox_sys::fs::open_file_locked(self.lockfile_path(), duration, true, file_options)
-    }
-}
-
-#[derive(Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
-    timestamp: i64,
-    tasks: Vec<TaskListItem>,
-}
-
-#[derive(Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
-    remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
-/// Interval at which tracked tasks are polled
-const RUNNING_CHECK_INTERVAL_S: u64 = 10;
-
-/// Tasks which were started by PDM and are still running
-static RUNNING_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-/// Tasks which were started by PDM and w
-static FINISHED_FOREIGN_TASKS: LazyLock<RwLock<HashSet<RemoteUpid>>> = LazyLock::new(init);
-
-fn init() -> RwLock<HashSet<RemoteUpid>> {
-    RwLock::new(HashSet::new())
-}
-
-/// Insert a remote UPID into the running list
-///
-/// If it is the first entry in the list, a background task is started to track its state
-///
-/// Returns the [`JoinHandle`] if a task was started.
-///
-/// panics on a poisoned mutex
-pub fn track_running_task(task: RemoteUpid) -> Option<JoinHandle<()>> {
-    let mut tasks = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-    // the call inserting the first task in the list needs to start the checking task
-    let need_start_task = tasks.is_empty();
-    tasks.insert(task);
-
-    if !need_start_task {
-        return None;
-    }
-    drop(tasks);
-
-    Some(tokio::spawn(async move {
-        loop {
-            let delay_target = task_utils::next_aligned_instant(RUNNING_CHECK_INTERVAL_S);
-            tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
-            let finished_tasks = get_finished_tasks().await;
-
-            // skip iteration if we still have tasks, just not finished ones
-            if finished_tasks.is_empty() && !RUNNING_FOREIGN_TASKS.read().unwrap().is_empty() {
-                continue;
-            }
-
-            let mut finished = FINISHED_FOREIGN_TASKS.write().unwrap();
-            // we either have finished tasks, or the running task list was empty
-            let mut set = RUNNING_FOREIGN_TASKS.write().unwrap();
-
-            for (upid, _status) in finished_tasks {
-                if set.remove(&upid) {
-                    finished.insert(upid);
-                } else {
-                    // someone else removed & persisted the task in the meantime
-                }
-            }
-
-            // if no task remains, end the current task
-            // it will be restarted by the next caller that inserts one
-            if set.is_empty() {
-                return;
-            }
-        }
-    }))
-}
-
-/// Get a list of running foreign tasks
-///
-/// panics on a poisoned mutex
-pub fn get_running_tasks() -> Vec<RemoteUpid> {
-    RUNNING_FOREIGN_TASKS
-        .read()
-        .unwrap()
-        .iter()
-        .cloned()
-        .collect()
-}
-
-/// Checks all current saved UPIDs if they're still running, and if not,
-/// returns their upids + status
-///
-/// panics on a poisoned mutex
-pub async fn get_finished_tasks() -> Vec<(RemoteUpid, String)> {
-    let mut finished = Vec::new();
-    let config = match pdm_config::remotes::config() {
-        Ok((config, _)) => config,
-        Err(err) => {
-            log::error!("could not open remotes config: {err}");
-            return Vec::new();
-        }
-    };
-    for task in get_running_tasks() {
-        match config.get(task.remote()) {
-            Some(remote) => match remote.ty {
-                RemoteType::Pve => {
-                    let status = match crate::api::pve::tasks::get_task_status(
-                        remote.id.clone(),
-                        task.clone(),
-                        false,
-                    )
-                    .await
-                    {
-                        Ok(status) => status,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    if let Some(status) = status.exitstatus {
-                        finished.push((task, status.to_string()));
+                if let Some(since) = filters.since {
+                    if item.starttime < since {
+                        return false;
                     }
                 }
-                RemoteType::Pbs => {
-                    let _client = match crate::pbs_client::connect(remote) {
-                        Ok(client) => client,
-                        Err(err) => {
-                            log::error!("could not get status from remote: {err}");
-                            finished.push((task, "could not get status".to_string()));
-                            continue;
-                        }
-                    };
-                    // FIXME implement get task status
-                    finished.push((task, "unknown state".to_string()));
-                }
-            },
-            None => finished.push((task, "unknown remote".to_string())),
-        }
-    }
 
-    finished
+                if let Some(needle) = &filters.userfilter {
+                    if !item.user.contains(needle) {
+                        return false;
+                    }
+                }
+
+                if let Some(typefilter) = &filters.typefilter {
+                    if !item.worker_type.contains(typefilter) {
+                        return false;
+                    }
+                }
+
+                let state = item.status.as_ref().map(|status| tasktype(status));
+
+                match (state, &filters.statusfilter) {
+                    (Some(TaskStateType::OK), _) if filters.errors => return false,
+                    (Some(state), Some(filters)) => {
+                        if !filters.contains(&state) {
+                            return false;
+                        }
+                    }
+                    (None, Some(_)) => return false,
+                    _ => {}
+                }
+
+                true
+            })
+            .collect();
+
+        Ok(returned_tasks)
+    })
+    .await?
+}
+
+/// Insert a newly created tasks into the list of tracked tasks.
+///
+/// Any tracked task will be polled with a short interval until the task
+/// has finished.
+pub async fn track_running_task(task: RemoteUpid) -> Result<(), Error> {
+    tokio::task::spawn_blocking(move || {
+        let cache = get_cache()?.write()?;
+        // TODO:: Handle PBS tasks correctly.
+        let pve_upid: pve_api_types::PveUpid = task.upid.parse()?;
+        let task = TaskCacheItem {
+            upid: task.clone(),
+            starttime: pve_upid.starttime,
+            status: None,
+            endtime: None,
+        };
+        cache.add_tracked_task(task)
+    })
+    .await?
+}
+
+/// Get a new [`TaskCache`] instance.
+///
+/// No heavy-weight operations are done here, it's fine to call this regularly as part of the
+/// update loop.
+pub fn get_cache() -> Result<TaskCache, Error> {
+    let file_options = proxmox_product_config::default_create_options();
+
+    let cache_path = Path::new(REMOTE_TASKS_DIR);
+    let cache = TaskCache::new(
+        cache_path,
+        file_options,
+        KEEP_OLD_FILES,
+        NUMBER_OF_UNCOMPRESSED_FILES,
+        ROTATE_AFTER,
+        JOURNAL_MAX_SIZE,
+    )?;
+
+    Ok(cache)
 }
 
 /// Parses a task status string into a TaskStateType
-- 
2.47.2





More information about the pdm-devel mailing list