[pdm-devel] [PATCH proxmox-datacenter-manager 07/15] task cache: move to its own submodule

Lukas Wagner l.wagner at proxmox.com
Tue Jan 28 13:25:12 CET 2025


No functional changes, only adapting method visibility where needed.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 server/src/remote_tasks/mod.rs        | 309 +------------------------
 server/src/remote_tasks/task_cache.rs | 312 ++++++++++++++++++++++++++
 2 files changed, 317 insertions(+), 304 deletions(-)
 create mode 100644 server/src/remote_tasks/task_cache.rs

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 032f2a4..4a0552c 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,11 +1,7 @@
 use std::{
-    cmp::Ordering,
-    collections::{HashMap, HashSet},
-    fs::File,
-    iter::Peekable,
-    path::{Path, PathBuf},
+    collections::HashSet,
+    path::Path,
     sync::{LazyLock, RwLock},
-    time::Duration,
 };
 
 use anyhow::Error;
@@ -15,11 +11,13 @@ use pdm_api_types::{
 };
 use proxmox_sys::fs::CreateOptions;
 use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
 use tokio::task::JoinHandle;
 
 use crate::{api::pve, task_utils};
 
+mod task_cache;
+use task_cache::TaskCache;
+
 // TODO: Does this number make sense?
 const CACHED_TASKS_PER_REMOTE: usize = 2000;
 
@@ -240,213 +238,6 @@ fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem
     Ok(returned_tasks)
 }
 
-/// A cache for fetched remote tasks.
-struct TaskCache {
-    /// Cache entries
-    content: TaskCacheContent,
-
-    /// Cache entries were changed/removed.
-    dirty: bool,
-
-    /// File-location at which the cached tasks are stored.
-    cachefile_path: PathBuf,
-
-    /// File mode/owner/group for the cache file.
-    cachefile_options: CreateOptions,
-
-    /// Max. tasks per remote
-    max_tasks_per_remote: usize,
-}
-
-impl TaskCache {
-    /// Create a new tasks cache instance by loading
-    /// the cache from disk.
-    fn new(
-        cachefile_path: PathBuf,
-        cachefile_options: CreateOptions,
-        max_tasks_per_remote: usize,
-    ) -> Result<Self, Error> {
-        Ok(Self {
-            content: Self::load_content(&cachefile_path)?,
-            dirty: false,
-            cachefile_path,
-            cachefile_options,
-            max_tasks_per_remote,
-        })
-    }
-
-    /// Load the task cache contents from disk.
-    fn load_content(path: &Path) -> Result<TaskCacheContent, Error> {
-        let content = proxmox_sys::fs::file_read_optional_string(path)?;
-
-        let content = if let Some(content) = content {
-            serde_json::from_str(&content).unwrap_or_default()
-        } else {
-            Default::default()
-        };
-
-        Ok(content)
-    }
-
-    /// Get path for the cache's lockfile.
-    fn lockfile_path(&self) -> PathBuf {
-        let mut path = self.cachefile_path.clone();
-        path.set_extension("lock");
-        path
-    }
-
-    /// Persist the task cache
-    ///
-    /// This method requests an exclusive lock for the task cache lockfile.
-    fn save(&mut self) -> Result<(), Error> {
-        // if we have not updated anything, we don't have to update the cache file
-        if !self.dirty {
-            return Ok(());
-        }
-
-        let _guard = self.lock(Duration::from_secs(5))?;
-
-        // Read content again, in case somebody has changed it in the meanwhile
-        let mut content = Self::load_content(&self.cachefile_path)?;
-
-        for (remote, entry) in self.content.remote_tasks.iter_mut() {
-            if let Some(other) = content.remote_tasks.remove(remote) {
-                entry.tasks =
-                    Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
-            }
-        }
-
-        let bytes = serde_json::to_vec_pretty(&self.content)?;
-
-        proxmox_sys::fs::replace_file(
-            &self.cachefile_path,
-            &bytes,
-            self.cachefile_options.clone(),
-            true,
-        )?;
-
-        self.dirty = false;
-
-        Ok(())
-    }
-
-    /// Add tasks to the cache.
-    ///
-    /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
-    /// oldest ones are truncated.
-    fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
-        let entry = self.content.remote_tasks.entry(remote.into()).or_default();
-
-        entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
-
-        self.dirty = true;
-    }
-
-    // Get task data for a given remote.
-    fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
-        if let Some(entry) = self.content.remote_tasks.get(remote) {
-            Some(entry.tasks.clone())
-        } else {
-            None
-        }
-    }
-
-    // Invalidate cache for a given remote.
-    fn invalidate_cache_for_remote(&mut self, remote: &str) {
-        self.dirty = true;
-        self.content.remote_tasks.remove(remote);
-    }
-
-    // Lock the cache for modification.
-    //
-    // While the cache is locked, other users can still read the cache
-    // without a lock, since the cache file is replaced atomically
-    // when updating.
-    fn lock(&self, duration: Duration) -> Result<File, Error> {
-        proxmox_sys::fs::open_file_locked(
-            self.lockfile_path(),
-            duration,
-            true,
-            self.cachefile_options.clone(),
-        )
-    }
-
-    fn merge_tasks(
-        mut a: Vec<TaskListItem>,
-        mut b: Vec<TaskListItem>,
-        limit: usize,
-    ) -> Vec<TaskListItem> {
-        a.sort_by_key(|task| -task.starttime);
-        b.sort_by_key(|task| -task.starttime);
-
-        MergeTaskIterator {
-            left: a.into_iter().peekable(),
-            right: b.into_iter().peekable(),
-        }
-        .take(limit)
-        .collect()
-    }
-}
-
-struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
-    left: Peekable<T>,
-    right: Peekable<T>,
-}
-
-impl<T> Iterator for MergeTaskIterator<T>
-where
-    T: Iterator<Item = TaskListItem>,
-{
-    type Item = T::Item;
-
-    fn next(&mut self) -> Option<T::Item> {
-        let order = match (self.left.peek(), self.right.peek()) {
-            (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
-            (Some(_), None) => Some(Ordering::Greater),
-            (None, Some(_)) => Some(Ordering::Less),
-            (None, None) => None,
-        };
-
-        match order {
-            Some(Ordering::Greater) => self.left.next(),
-            Some(Ordering::Less) => self.right.next(),
-            Some(Ordering::Equal) => {
-                // Both unwraps are safe, the former peek/match
-                // guarantess that there is an element.
-                let l = self.left.peek().unwrap();
-                let r = self.right.peek().unwrap();
-
-                // Dedup if both lists contain the same task
-                if l.upid == r.upid {
-                    // Take the one where the task is already finished
-                    if l.endtime.is_some() {
-                        let _ = self.right.next();
-                        self.left.next()
-                    } else {
-                        let _ = self.left.next();
-                        self.right.next()
-                    }
-                } else {
-                    self.left.next()
-                }
-            }
-            None => None,
-        }
-    }
-}
-
-#[derive(Default, Debug, Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
-    tasks: Vec<TaskListItem>,
-}
-
-#[derive(Debug, Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
-    remote_tasks: HashMap<String, TaskCacheEntry>,
-}
-
 /// Interval at which tracked tasks are polled
 const RUNNING_CHECK_INTERVAL_S: u64 = 10;
 
@@ -590,93 +381,3 @@ pub fn tasktype(status: &str) -> TaskStateType {
         TaskStateType::Error
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::test_support::temp::NamedTempFile;
-
-    fn make_upid(
-        starttime: i64,
-        endtime: Option<i64>,
-        status: Option<String>,
-    ) -> Result<TaskListItem, Error> {
-        let upid: PveUpid =
-            format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
-                .parse()?;
-
-        Ok(TaskListItem {
-            upid: upid.to_string(),
-            node: upid.node,
-            pid: upid.pid as i64,
-            pstart: upid.pstart,
-            starttime: upid.starttime,
-            worker_type: upid.worker_type,
-            worker_id: upid.worker_id,
-            user: upid.auth_id,
-            endtime,
-            status,
-        })
-    }
-
-    #[test]
-    fn basic_task_cache() -> Result<(), Error> {
-        let options = CreateOptions::new()
-            .owner(nix::unistd::Uid::effective())
-            .group(nix::unistd::Gid::effective())
-            .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
-
-        let temp_file = NamedTempFile::new(options.clone())?;
-
-        let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
-
-        let mut tasks = Vec::new();
-
-        let now = proxmox_time::epoch_i64();
-        for i in (0..20).rev() {
-            tasks.push(make_upid(now - 10 * i, None, None)?);
-        }
-
-        cache.add_tasks("some-remote", tasks.clone());
-        cache.save()?;
-
-        let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
-
-        let res = cache.get_tasks("some-remote").unwrap();
-        tasks.reverse();
-        assert_eq!(tasks, res);
-
-        Ok(())
-    }
-
-    #[test]
-    fn merge_tasks() -> Result<(), Error> {
-        // Arrange
-        let mut a = Vec::new();
-        for i in [30, 10, 20] {
-            a.push(make_upid(i, None, None)?);
-        }
-
-        let mut b = Vec::new();
-        for i in [25, 15, 35, 5] {
-            b.push(make_upid(i, None, None)?);
-        }
-
-        a.push(make_upid(40, None, None)?);
-        b.push(make_upid(40, Some(50), Some("some status".into()))?);
-
-        // Act
-        let tasks = TaskCache::merge_tasks(a, b, 5);
-
-        // Assert
-        assert_eq!(tasks.len(), 5);
-        assert_eq!(tasks[0].starttime, 40);
-        assert_eq!(tasks[0].endtime, Some(50));
-        assert_eq!(tasks[1].starttime, 35);
-        assert_eq!(tasks[2].starttime, 30);
-        assert_eq!(tasks[3].starttime, 25);
-        assert_eq!(tasks[4].starttime, 20);
-
-        Ok(())
-    }
-}
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 0000000..8a98876
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,312 @@
+use std::{
+    cmp::Ordering,
+    collections::HashMap,
+    fs::File,
+    iter::Peekable,
+    path::{Path, PathBuf},
+    time::Duration,
+};
+
+use anyhow::Error;
+use serde::{Deserialize, Serialize};
+
+use pdm_api_types::TaskListItem;
+use proxmox_sys::fs::CreateOptions;
+
+/// A cache for fetched remote tasks.
+pub(super) struct TaskCache {
+    /// Cache entries
+    content: TaskCacheContent,
+
+    /// Cache entries were changed/removed.
+    dirty: bool,
+
+    /// File-location at which the cached tasks are stored.
+    cachefile_path: PathBuf,
+
+    /// File mode/owner/group for the cache file.
+    cachefile_options: CreateOptions,
+
+    /// Max. tasks per remote
+    max_tasks_per_remote: usize,
+}
+
+impl TaskCache {
+    /// Create a new tasks cache instance by loading
+    /// the cache from disk.
+    pub(super) fn new(
+        cachefile_path: PathBuf,
+        cachefile_options: CreateOptions,
+        max_tasks_per_remote: usize,
+    ) -> Result<Self, Error> {
+        Ok(Self {
+            content: Self::load_content(&cachefile_path)?,
+            dirty: false,
+            cachefile_path,
+            cachefile_options,
+            max_tasks_per_remote,
+        })
+    }
+
+    /// Load the task cache contents from disk.
+    fn load_content(path: &Path) -> Result<TaskCacheContent, Error> {
+        let content = proxmox_sys::fs::file_read_optional_string(path)?;
+
+        let content = if let Some(content) = content {
+            serde_json::from_str(&content).unwrap_or_default()
+        } else {
+            Default::default()
+        };
+
+        Ok(content)
+    }
+
+    /// Get path for the cache's lockfile.
+    fn lockfile_path(&self) -> PathBuf {
+        let mut path = self.cachefile_path.clone();
+        path.set_extension("lock");
+        path
+    }
+
+    /// Persist the task cache
+    ///
+    /// This method requests an exclusive lock for the task cache lockfile.
+    pub(super) fn save(&mut self) -> Result<(), Error> {
+        // if we have not updated anything, we don't have to update the cache file
+        if !self.dirty {
+            return Ok(());
+        }
+
+        let _guard = self.lock(Duration::from_secs(5))?;
+
+        // Read content again, in case somebody has changed it in the meanwhile
+        let mut content = Self::load_content(&self.cachefile_path)?;
+
+        for (remote, entry) in self.content.remote_tasks.iter_mut() {
+            if let Some(other) = content.remote_tasks.remove(remote) {
+                entry.tasks =
+                    Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
+            }
+        }
+
+        let bytes = serde_json::to_vec_pretty(&self.content)?;
+
+        proxmox_sys::fs::replace_file(
+            &self.cachefile_path,
+            &bytes,
+            self.cachefile_options.clone(),
+            true,
+        )?;
+
+        self.dirty = false;
+
+        Ok(())
+    }
+
+    /// Add tasks to the cache.
+    ///
+    /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
+    /// oldest ones are truncated.
+    pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+        let entry = self.content.remote_tasks.entry(remote.into()).or_default();
+
+        entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+
+        self.dirty = true;
+    }
+
+    // Get task data for a given remote.
+    pub(super) fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
+        self.content
+            .remote_tasks
+            .get(remote)
+            .map(|entry| entry.tasks.clone())
+    }
+
+    // Invalidate cache for a given remote.
+    pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
+        self.dirty = true;
+        self.content.remote_tasks.remove(remote);
+    }
+
+    // Lock the cache for modification.
+    //
+    // While the cache is locked, other users can still read the cache
+    // without a lock, since the cache file is replaced atomically
+    // when updating.
+    fn lock(&self, duration: Duration) -> Result<File, Error> {
+        proxmox_sys::fs::open_file_locked(
+            self.lockfile_path(),
+            duration,
+            true,
+            self.cachefile_options.clone(),
+        )
+    }
+
+    fn merge_tasks(
+        mut a: Vec<TaskListItem>,
+        mut b: Vec<TaskListItem>,
+        limit: usize,
+    ) -> Vec<TaskListItem> {
+        a.sort_by_key(|task| -task.starttime);
+        b.sort_by_key(|task| -task.starttime);
+
+        MergeTaskIterator {
+            left: a.into_iter().peekable(),
+            right: b.into_iter().peekable(),
+        }
+        .take(limit)
+        .collect()
+    }
+}
+
+struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
+    left: Peekable<T>,
+    right: Peekable<T>,
+}
+
+impl<T> Iterator for MergeTaskIterator<T>
+where
+    T: Iterator<Item = TaskListItem>,
+{
+    type Item = T::Item;
+
+    fn next(&mut self) -> Option<T::Item> {
+        let order = match (self.left.peek(), self.right.peek()) {
+            (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
+            (Some(_), None) => Some(Ordering::Greater),
+            (None, Some(_)) => Some(Ordering::Less),
+            (None, None) => None,
+        };
+
+        match order {
+            Some(Ordering::Greater) => self.left.next(),
+            Some(Ordering::Less) => self.right.next(),
+            Some(Ordering::Equal) => {
+                // Both unwraps are safe, the former peek/match
+                // guarantess that there is an element.
+                let l = self.left.peek().unwrap();
+                let r = self.right.peek().unwrap();
+
+                // Dedup if both lists contain the same task
+                if l.upid == r.upid {
+                    // Take the one where the task is already finished
+                    if l.endtime.is_some() {
+                        let _ = self.right.next();
+                        self.left.next()
+                    } else {
+                        let _ = self.left.next();
+                        self.right.next()
+                    }
+                } else {
+                    self.left.next()
+                }
+            }
+            None => None,
+        }
+    }
+}
+
+#[derive(Default, Debug, Serialize, Deserialize)]
+/// Per-remote entry in the task cache.
+struct TaskCacheEntry {
+    tasks: Vec<TaskListItem>,
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+/// Content of the task cache file.
+struct TaskCacheContent {
+    remote_tasks: HashMap<String, TaskCacheEntry>,
+}
+
+#[cfg(test)]
+mod tests {
+    use pve_api_types::PveUpid;
+
+    use super::*;
+    use crate::test_support::temp::NamedTempFile;
+
+    fn make_upid(
+        starttime: i64,
+        endtime: Option<i64>,
+        status: Option<String>,
+    ) -> Result<TaskListItem, Error> {
+        let upid: PveUpid =
+            format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
+                .parse()?;
+
+        Ok(TaskListItem {
+            upid: upid.to_string(),
+            node: upid.node,
+            pid: upid.pid as i64,
+            pstart: upid.pstart,
+            starttime: upid.starttime,
+            worker_type: upid.worker_type,
+            worker_id: upid.worker_id,
+            user: upid.auth_id,
+            endtime,
+            status,
+        })
+    }
+
+    #[test]
+    fn basic_task_cache() -> Result<(), Error> {
+        let options = CreateOptions::new()
+            .owner(nix::unistd::Uid::effective())
+            .group(nix::unistd::Gid::effective())
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
+
+        let temp_file = NamedTempFile::new(options.clone())?;
+
+        let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
+
+        let mut tasks = Vec::new();
+
+        let now = proxmox_time::epoch_i64();
+        for i in (0..20).rev() {
+            tasks.push(make_upid(now - 10 * i, None, None)?);
+        }
+
+        cache.add_tasks("some-remote", tasks.clone());
+        cache.save()?;
+
+        let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
+
+        let res = cache.get_tasks("some-remote").unwrap();
+        tasks.reverse();
+        assert_eq!(tasks, res);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merge_tasks() -> Result<(), Error> {
+        // Arrange
+        let mut a = Vec::new();
+        for i in [30, 10, 20] {
+            a.push(make_upid(i, None, None)?);
+        }
+
+        let mut b = Vec::new();
+        for i in [25, 15, 35, 5] {
+            b.push(make_upid(i, None, None)?);
+        }
+
+        a.push(make_upid(40, None, None)?);
+        b.push(make_upid(40, Some(50), Some("some status".into()))?);
+
+        // Act
+        let tasks = TaskCache::merge_tasks(a, b, 5);
+
+        // Assert
+        assert_eq!(tasks.len(), 5);
+        assert_eq!(tasks[0].starttime, 40);
+        assert_eq!(tasks[0].endtime, Some(50));
+        assert_eq!(tasks[1].starttime, 35);
+        assert_eq!(tasks[2].starttime, 30);
+        assert_eq!(tasks[3].starttime, 25);
+        assert_eq!(tasks[4].starttime, 20);
+
+        Ok(())
+    }
+}
-- 
2.39.5





More information about the pdm-devel mailing list