[pdm-devel] [PATCH proxmox-datacenter-manager 08/15] task cache: fetch every 5mins, requesting only missing tasks

Lukas Wagner l.wagner at proxmox.com
Tue Jan 28 13:25:13 CET 2025


Instead of purging the cache completely and requesting the entire task
history over the last seven days, we keep the per-remote task history in
the cache and only fetch missing tasks since the last time we fetched
them.

If a tracked task finishes, we also force to fetch the most recent task
data, since that is the only way right now to get the task status *and*
task endtime.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/remote_tasks/mod.rs        | 52 +++++++++++++++------------
 server/src/remote_tasks/task_cache.rs | 25 +++++++++----
 2 files changed, 49 insertions(+), 28 deletions(-)

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 4a0552c..16c46a5 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -20,6 +20,8 @@ use task_cache::TaskCache;
 
 // TODO: Does this number make sense?
 const CACHED_TASKS_PER_REMOTE: usize = 2000;
+const REFRESH_EVERY_S: i64 = 300;
+const OVERLAP_S: i64 = 60;
 
 /// Get tasks for all remotes
 // FIXME: filter for privileges
@@ -42,24 +44,32 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
     // a task's endtime, which is only returned by
     // /nodes/<node>/tasks...
     // Room for improvements in the future.
-    invalidate_cache_for_finished_tasks(&mut cache);
+    let force_refresh_remotes = get_remotes_with_finished_tasks();
 
+    let now = proxmox_time::epoch_i64();
     for (remote_name, remote) in &remotes.sections {
-        if let Some(tasks) = cache.get_tasks(remote_name.as_str()) {
-            // Data in cache is recent enough and has not been invalidated.
-            all_tasks.extend(tasks);
-        } else {
-            let tasks = match fetch_tasks(remote).await {
-                Ok(tasks) => tasks,
+        let last_fetched = cache.get_last_fetched(remote_name).unwrap_or(0);
+        let diff = now - last_fetched;
+
+        if diff > REFRESH_EVERY_S || diff < 0 || force_refresh_remotes.contains(remote_name) {
+            // Add some overlap so that we for sure fetch every task - duplicates
+            // are remove when adding the tasks to the cache.
+            let fetch_since =
+                (cache.get_most_recent_starttime(remote_name).unwrap_or(0) - OVERLAP_S).max(0);
+
+            match fetch_tasks(remote, fetch_since).await {
+                Ok(tasks) => {
+                    cache.add_tasks(remote_name.as_str(), tasks, now);
+                }
                 Err(err) => {
                     // ignore errors for not reachable remotes
                     continue;
                 }
-            };
-            cache.add_tasks(remote_name.as_str(), tasks.clone());
-
-            all_tasks.extend(tasks);
+            }
         }
+
+        let tasks = cache.get_tasks(remote_name).unwrap_or_default();
+        all_tasks.extend(tasks);
     }
 
     let mut returned_tasks = add_running_tasks(all_tasks)?;
@@ -125,7 +135,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
 }
 
 /// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<Vec<TaskListItem>, Error> {
     let mut tasks = Vec::new();
 
     match remote.ty {
@@ -138,9 +148,8 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
                 let params = ListTasks {
                     // Include running tasks
                     source: Some(ListTasksSource::All),
-                    // TODO: How much task history do we want? Right now we just hard-code it
-                    // to 7 days.
-                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
+                    since: Some(since),
+                    limit: Some(CACHED_TASKS_PER_REMOTE as u64),
                     ..Default::default()
                 };
 
@@ -182,18 +191,17 @@ fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskList
     Ok(mapped)
 }
 
-/// Drops the cached task list of a remote for all finished tasks.
+/// Get list of remotes which have finished tasks.
 ///
 /// We use this to force a refresh so that we get the full task
 /// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
+fn get_remotes_with_finished_tasks() -> HashSet<String> {
     let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
 
-    // If a task is finished, we force a refresh for the remote - otherwise
-    // we don't get the 'endtime' for the task.
-    for task in finished.drain() {
-        cache.invalidate_cache_for_remote(task.remote());
-    }
+    finished
+        .drain()
+        .map(|task| task.remote().to_string())
+        .collect()
 }
 
 /// Supplement the list of tasks that we received from the remote with
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 8a98876..e08e3d4 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -107,10 +107,11 @@ impl TaskCache {
     ///
     /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
     /// oldest ones are truncated.
-    pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+    pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, last_fetched: i64) {
         let entry = self.content.remote_tasks.entry(remote.into()).or_default();
 
         entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+        entry.last_fetched = last_fetched;
 
         self.dirty = true;
     }
@@ -123,10 +124,21 @@ impl TaskCache {
             .map(|entry| entry.tasks.clone())
     }
 
-    // Invalidate cache for a given remote.
-    pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
-        self.dirty = true;
-        self.content.remote_tasks.remove(remote);
+    // Get the `last_fetched` time stamp for a given remote.
+    pub(super) fn get_last_fetched(&self, remote: &str) -> Option<i64> {
+        self.content
+            .remote_tasks
+            .get(remote)
+            .map(|entry| entry.last_fetched)
+    }
+
+    // Get the `most_recent_starttime` time stamp for a given remote.
+    pub(super) fn get_most_recent_starttime(&self, remote: &str) -> Option<i64> {
+        if let Some(entry) = self.content.remote_tasks.get(remote) {
+            Some(entry.tasks.first()?.starttime)
+        } else {
+            None
+        }
     }
 
     // Lock the cache for modification.
@@ -211,6 +223,7 @@ where
 /// Per-remote entry in the task cache.
 struct TaskCacheEntry {
     tasks: Vec<TaskListItem>,
+    last_fetched: i64,
 }
 
 #[derive(Debug, Default, Serialize, Deserialize)]
@@ -267,7 +280,7 @@ mod tests {
             tasks.push(make_upid(now - 10 * i, None, None)?);
         }
 
-        cache.add_tasks("some-remote", tasks.clone());
+        cache.add_tasks("some-remote", tasks.clone(), 0);
         cache.save()?;
 
         let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
-- 
2.39.5





More information about the pdm-devel mailing list