[pdm-devel] [PATCH proxmox-datacenter-manager 05/15] task cache: add FIFO cache replacement policy

Lukas Wagner l.wagner at proxmox.com
Tue Jan 28 13:25:10 CET 2025


The task cache will now hold up to a certain number of tasks
*per remote*. If the capacity is exceeded, the oldest tasks,
based on the task's starttime, will be dropped.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/task_cache.rs | 193 +++++++++++++++++++++++++++++++--------
 1 file changed, 153 insertions(+), 40 deletions(-)

diff --git a/server/src/task_cache.rs b/server/src/task_cache.rs
index f24af3f..032f2a4 100644
--- a/server/src/task_cache.rs
+++ b/server/src/task_cache.rs
@@ -1,6 +1,8 @@
 use std::{
+    cmp::Ordering,
     collections::{HashMap, HashSet},
     fs::File,
+    iter::Peekable,
     path::{Path, PathBuf},
     sync::{LazyLock, RwLock},
     time::Duration,
@@ -18,6 +20,9 @@ use tokio::task::JoinHandle;
 
 use crate::{api::pve, task_utils};
 
+// TODO: Does this number make sense?
+const CACHED_TASKS_PER_REMOTE: usize = 2000;
+
 /// Get tasks for all remotes
 // FIXME: filter for privileges
 pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
@@ -31,7 +36,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
     let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
 
     let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
-    let mut cache = TaskCache::new(cache_path, file_options)?;
+    let mut cache = TaskCache::new(cache_path, file_options, CACHED_TASKS_PER_REMOTE)?;
 
     // Force a refresh for all tasks of a remote if a task is finished.
     // Not super nice, but saves us from persisting finished tasks. Also,
@@ -53,7 +58,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
                     continue;
                 }
             };
-            cache.set_tasks(remote_name.as_str(), tasks.clone());
+            cache.add_tasks(remote_name.as_str(), tasks.clone());
 
             all_tasks.extend(tasks);
         }
@@ -240,10 +245,6 @@ struct TaskCache {
     /// Cache entries
     content: TaskCacheContent,
 
-    /// Entries that were added or updated - these will be persistet
-    /// when `save` is called.
-    new_or_updated: TaskCacheContent,
-
     /// Cache entries were changed/removed.
     dirty: bool,
 
@@ -252,18 +253,25 @@ struct TaskCache {
 
     /// File mode/owner/group for the cache file.
     cachefile_options: CreateOptions,
+
+    /// Max. tasks per remote
+    max_tasks_per_remote: usize,
 }
 
 impl TaskCache {
     /// Create a new tasks cache instance by loading
     /// the cache from disk.
-    fn new(cachefile_path: PathBuf, cachefile_options: CreateOptions) -> Result<Self, Error> {
+    fn new(
+        cachefile_path: PathBuf,
+        cachefile_options: CreateOptions,
+        max_tasks_per_remote: usize,
+    ) -> Result<Self, Error> {
         Ok(Self {
             content: Self::load_content(&cachefile_path)?,
-            new_or_updated: Default::default(),
             dirty: false,
             cachefile_path,
             cachefile_options,
+            max_tasks_per_remote,
         })
     }
 
@@ -301,15 +309,14 @@ impl TaskCache {
         // Read content again, in case somebody has changed it in the meanwhile
         let mut content = Self::load_content(&self.cachefile_path)?;
 
-        for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
-            if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
-                *existing_entry = entry;
-            } else {
-                content.remote_tasks.insert(remote_name, entry);
+        for (remote, entry) in self.content.remote_tasks.iter_mut() {
+            if let Some(other) = content.remote_tasks.remove(remote) {
+                entry.tasks =
+                    Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
             }
         }
 
-        let bytes = serde_json::to_vec_pretty(&content)?;
+        let bytes = serde_json::to_vec_pretty(&self.content)?;
 
         proxmox_sys::fs::replace_file(
             &self.cachefile_path,
@@ -323,20 +330,22 @@ impl TaskCache {
         Ok(())
     }
 
-    // Update task data for a given remote.
-    fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+    /// Add tasks to the cache.
+    ///
+    /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
+    /// oldest ones are truncated.
+    fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+        let entry = self.content.remote_tasks.entry(remote.into()).or_default();
+
+        entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+
         self.dirty = true;
-        self.new_or_updated
-            .remote_tasks
-            .insert(remote.to_string(), TaskCacheEntry { tasks });
     }
 
     // Get task data for a given remote.
     fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
         if let Some(entry) = self.content.remote_tasks.get(remote) {
             Some(entry.tasks.clone())
-        } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
-            Some(entry.tasks.clone())
         } else {
             None
         }
@@ -361,9 +370,72 @@ impl TaskCache {
             self.cachefile_options.clone(),
         )
     }
+
+    fn merge_tasks(
+        mut a: Vec<TaskListItem>,
+        mut b: Vec<TaskListItem>,
+        limit: usize,
+    ) -> Vec<TaskListItem> {
+        a.sort_by_key(|task| -task.starttime);
+        b.sort_by_key(|task| -task.starttime);
+
+        MergeTaskIterator {
+            left: a.into_iter().peekable(),
+            right: b.into_iter().peekable(),
+        }
+        .take(limit)
+        .collect()
+    }
+}
+
+struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
+    left: Peekable<T>,
+    right: Peekable<T>,
+}
+
+impl<T> Iterator for MergeTaskIterator<T>
+where
+    T: Iterator<Item = TaskListItem>,
+{
+    type Item = T::Item;
+
+    fn next(&mut self) -> Option<T::Item> {
+        let order = match (self.left.peek(), self.right.peek()) {
+            (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
+            (Some(_), None) => Some(Ordering::Greater),
+            (None, Some(_)) => Some(Ordering::Less),
+            (None, None) => None,
+        };
+
+        match order {
+            Some(Ordering::Greater) => self.left.next(),
+            Some(Ordering::Less) => self.right.next(),
+            Some(Ordering::Equal) => {
+                // Both unwraps are safe, the former peek/match
+                // guarantess that there is an element.
+                let l = self.left.peek().unwrap();
+                let r = self.right.peek().unwrap();
+
+                // Dedup if both lists contain the same task
+                if l.upid == r.upid {
+                    // Take the one where the task is already finished
+                    if l.endtime.is_some() {
+                        let _ = self.right.next();
+                        self.left.next()
+                    } else {
+                        let _ = self.left.next();
+                        self.right.next()
+                    }
+                } else {
+                    self.left.next()
+                }
+            }
+            None => None,
+        }
+    }
 }
 
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Default, Debug, Serialize, Deserialize)]
 /// Per-remote entry in the task cache.
 struct TaskCacheEntry {
     tasks: Vec<TaskListItem>,
@@ -524,6 +596,29 @@ mod tests {
     use super::*;
     use crate::test_support::temp::NamedTempFile;
 
+    fn make_upid(
+        starttime: i64,
+        endtime: Option<i64>,
+        status: Option<String>,
+    ) -> Result<TaskListItem, Error> {
+        let upid: PveUpid =
+            format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
+                .parse()?;
+
+        Ok(TaskListItem {
+            upid: upid.to_string(),
+            node: upid.node,
+            pid: upid.pid as i64,
+            pstart: upid.pstart,
+            starttime: upid.starttime,
+            worker_type: upid.worker_type,
+            worker_id: upid.worker_id,
+            user: upid.auth_id,
+            endtime,
+            status,
+        })
+    }
+
     #[test]
     fn basic_task_cache() -> Result<(), Error> {
         let options = CreateOptions::new()
@@ -533,37 +628,55 @@ mod tests {
 
         let temp_file = NamedTempFile::new(options.clone())?;
 
-        let mut cache = TaskCache::new(temp_file.path().into(), options.clone())?;
+        let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
 
         let mut tasks = Vec::new();
 
         let now = proxmox_time::epoch_i64();
         for i in (0..20).rev() {
-            let upid: PveUpid =
-                "UPID:pve-node:0000C530:001C9BEC:677E934A:stopall::root at pam:".parse()?;
-
-            tasks.push(TaskListItem {
-                upid: upid.to_string(),
-                node: upid.node,
-                pid: upid.pid as i64,
-                pstart: upid.pstart,
-                starttime: now - 10 * i,
-                worker_type: upid.worker_type,
-                worker_id: upid.worker_id,
-                user: upid.auth_id,
-                endtime: None,
-                status: None,
-            });
+            tasks.push(make_upid(now - 10 * i, None, None)?);
         }
 
-        cache.set_tasks("some-remote", tasks.clone());
+        cache.add_tasks("some-remote", tasks.clone());
         cache.save()?;
 
-        let cache = TaskCache::new(temp_file.path().into(), options)?;
+        let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
 
         let res = cache.get_tasks("some-remote").unwrap();
+        tasks.reverse();
         assert_eq!(tasks, res);
 
         Ok(())
     }
+
+    #[test]
+    fn merge_tasks() -> Result<(), Error> {
+        // Arrange
+        let mut a = Vec::new();
+        for i in [30, 10, 20] {
+            a.push(make_upid(i, None, None)?);
+        }
+
+        let mut b = Vec::new();
+        for i in [25, 15, 35, 5] {
+            b.push(make_upid(i, None, None)?);
+        }
+
+        a.push(make_upid(40, None, None)?);
+        b.push(make_upid(40, Some(50), Some("some status".into()))?);
+
+        // Act
+        let tasks = TaskCache::merge_tasks(a, b, 5);
+
+        // Assert
+        assert_eq!(tasks.len(), 5);
+        assert_eq!(tasks[0].starttime, 40);
+        assert_eq!(tasks[0].endtime, Some(50));
+        assert_eq!(tasks[1].starttime, 35);
+        assert_eq!(tasks[2].starttime, 30);
+        assert_eq!(tasks[3].starttime, 25);
+        assert_eq!(tasks[4].starttime, 20);
+
+        Ok(())
+    }
 }
-- 
2.39.5





More information about the pdm-devel mailing list