[pdm-devel] [PATCH proxmox-datacenter-manager 07/15] task cache: move to its own submodule
Lukas Wagner
l.wagner at proxmox.com
Tue Jan 28 13:25:12 CET 2025
No functional changes, only adapting method visibility where needed.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
server/src/remote_tasks/mod.rs | 309 +------------------------
server/src/remote_tasks/task_cache.rs | 312 ++++++++++++++++++++++++++
2 files changed, 317 insertions(+), 304 deletions(-)
create mode 100644 server/src/remote_tasks/task_cache.rs
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 032f2a4..4a0552c 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -1,11 +1,7 @@
use std::{
- cmp::Ordering,
- collections::{HashMap, HashSet},
- fs::File,
- iter::Peekable,
- path::{Path, PathBuf},
+ collections::HashSet,
+ path::Path,
sync::{LazyLock, RwLock},
- time::Duration,
use anyhow::Error;
@@ -15,11 +11,13 @@ use pdm_api_types::{
use proxmox_sys::fs::CreateOptions;
use pve_api_types::{ListTasks, ListTasksResponse, ListTasksSource, PveUpid};
-use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use crate::{api::pve, task_utils};
+mod task_cache;
+use task_cache::TaskCache;
// TODO: Does this number make sense?
const CACHED_TASKS_PER_REMOTE: usize = 2000;
@@ -240,213 +238,6 @@ fn add_running_tasks(cached_tasks: Vec<TaskListItem>) -> Result<Vec<TaskListItem
-/// A cache for fetched remote tasks.
-struct TaskCache {
- /// Cache entries
- content: TaskCacheContent,
- /// Cache entries were changed/removed.
- dirty: bool,
- /// File-location at which the cached tasks are stored.
- cachefile_path: PathBuf,
- /// File mode/owner/group for the cache file.
- cachefile_options: CreateOptions,
- /// Max. tasks per remote
- max_tasks_per_remote: usize,
-impl TaskCache {
- /// Create a new tasks cache instance by loading
- /// the cache from disk.
- fn new(
- cachefile_path: PathBuf,
- cachefile_options: CreateOptions,
- max_tasks_per_remote: usize,
- ) -> Result<Self, Error> {
- Ok(Self {
- content: Self::load_content(&cachefile_path)?,
- dirty: false,
- cachefile_path,
- cachefile_options,
- max_tasks_per_remote,
- })
- }
- /// Load the task cache contents from disk.
- fn load_content(path: &Path) -> Result<TaskCacheContent, Error> {
- let content = proxmox_sys::fs::file_read_optional_string(path)?;
- let content = if let Some(content) = content {
- serde_json::from_str(&content).unwrap_or_default()
- } else {
- Default::default()
- };
- Ok(content)
- }
- /// Get path for the cache's lockfile.
- fn lockfile_path(&self) -> PathBuf {
- let mut path = self.cachefile_path.clone();
- path.set_extension("lock");
- path
- }
- /// Persist the task cache
- ///
- /// This method requests an exclusive lock for the task cache lockfile.
- fn save(&mut self) -> Result<(), Error> {
- // if we have not updated anything, we don't have to update the cache file
- if !self.dirty {
- return Ok(());
- }
- let _guard = self.lock(Duration::from_secs(5))?;
- // Read content again, in case somebody has changed it in the meanwhile
- let mut content = Self::load_content(&self.cachefile_path)?;
- for (remote, entry) in self.content.remote_tasks.iter_mut() {
- if let Some(other) = content.remote_tasks.remove(remote) {
- entry.tasks =
- Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
- }
- }
- let bytes = serde_json::to_vec_pretty(&self.content)?;
- proxmox_sys::fs::replace_file(
- &self.cachefile_path,
- &bytes,
- self.cachefile_options.clone(),
- true,
- )?;
- self.dirty = false;
- Ok(())
- }
- /// Add tasks to the cache.
- ///
- /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
- /// oldest ones are truncated.
- fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
- let entry = self.content.remote_tasks.entry(remote.into()).or_default();
- entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
- self.dirty = true;
- }
- // Get task data for a given remote.
- fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
- if let Some(entry) = self.content.remote_tasks.get(remote) {
- Some(entry.tasks.clone())
- } else {
- None
- }
- }
- // Invalidate cache for a given remote.
- fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
- }
- // Lock the cache for modification.
- //
- // While the cache is locked, other users can still read the cache
- // without a lock, since the cache file is replaced atomically
- // when updating.
- fn lock(&self, duration: Duration) -> Result<File, Error> {
- proxmox_sys::fs::open_file_locked(
- self.lockfile_path(),
- duration,
- true,
- self.cachefile_options.clone(),
- )
- }
- fn merge_tasks(
- mut a: Vec<TaskListItem>,
- mut b: Vec<TaskListItem>,
- limit: usize,
- ) -> Vec<TaskListItem> {
- a.sort_by_key(|task| -task.starttime);
- b.sort_by_key(|task| -task.starttime);
- MergeTaskIterator {
- left: a.into_iter().peekable(),
- right: b.into_iter().peekable(),
- }
- .take(limit)
- .collect()
- }
-struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
- left: Peekable<T>,
- right: Peekable<T>,
-impl<T> Iterator for MergeTaskIterator<T>
- T: Iterator<Item = TaskListItem>,
- type Item = T::Item;
- fn next(&mut self) -> Option<T::Item> {
- let order = match (self.left.peek(), self.right.peek()) {
- (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
- (Some(_), None) => Some(Ordering::Greater),
- (None, Some(_)) => Some(Ordering::Less),
- (None, None) => None,
- };
- match order {
- Some(Ordering::Greater) => self.left.next(),
- Some(Ordering::Less) => self.right.next(),
- Some(Ordering::Equal) => {
- // Both unwraps are safe, the former peek/match
- // guarantess that there is an element.
- let l = self.left.peek().unwrap();
- let r = self.right.peek().unwrap();
- // Dedup if both lists contain the same task
- if l.upid == r.upid {
- // Take the one where the task is already finished
- if l.endtime.is_some() {
- let _ = self.right.next();
- self.left.next()
- } else {
- let _ = self.left.next();
- self.right.next()
- }
- } else {
- self.left.next()
- }
- }
- None => None,
- }
- }
-#[derive(Default, Debug, Serialize, Deserialize)]
-/// Per-remote entry in the task cache.
-struct TaskCacheEntry {
- tasks: Vec<TaskListItem>,
-#[derive(Debug, Default, Serialize, Deserialize)]
-/// Content of the task cache file.
-struct TaskCacheContent {
- remote_tasks: HashMap<String, TaskCacheEntry>,
/// Interval at which tracked tasks are polled
@@ -590,93 +381,3 @@ pub fn tasktype(status: &str) -> TaskStateType {
-mod tests {
- use super::*;
- use crate::test_support::temp::NamedTempFile;
- fn make_upid(
- starttime: i64,
- endtime: Option<i64>,
- status: Option<String>,
- ) -> Result<TaskListItem, Error> {
- let upid: PveUpid =
- format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
- .parse()?;
- Ok(TaskListItem {
- upid: upid.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: upid.starttime,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime,
- status,
- })
- }
- #[test]
- fn basic_task_cache() -> Result<(), Error> {
- let options = CreateOptions::new()
- .owner(nix::unistd::Uid::effective())
- .group(nix::unistd::Gid::effective())
- .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
- let temp_file = NamedTempFile::new(options.clone())?;
- let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
- let mut tasks = Vec::new();
- let now = proxmox_time::epoch_i64();
- for i in (0..20).rev() {
- tasks.push(make_upid(now - 10 * i, None, None)?);
- }
- cache.add_tasks("some-remote", tasks.clone());
- cache.save()?;
- let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
- let res = cache.get_tasks("some-remote").unwrap();
- tasks.reverse();
- assert_eq!(tasks, res);
- Ok(())
- }
- #[test]
- fn merge_tasks() -> Result<(), Error> {
- // Arrange
- let mut a = Vec::new();
- for i in [30, 10, 20] {
- a.push(make_upid(i, None, None)?);
- }
- let mut b = Vec::new();
- for i in [25, 15, 35, 5] {
- b.push(make_upid(i, None, None)?);
- }
- a.push(make_upid(40, None, None)?);
- b.push(make_upid(40, Some(50), Some("some status".into()))?);
- // Act
- let tasks = TaskCache::merge_tasks(a, b, 5);
- // Assert
- assert_eq!(tasks.len(), 5);
- assert_eq!(tasks[0].starttime, 40);
- assert_eq!(tasks[0].endtime, Some(50));
- assert_eq!(tasks[1].starttime, 35);
- assert_eq!(tasks[2].starttime, 30);
- assert_eq!(tasks[3].starttime, 25);
- assert_eq!(tasks[4].starttime, 20);
- Ok(())
- }
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 0000000..8a98876
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,312 @@
+use std::{
+ cmp::Ordering,
+ collections::HashMap,
+ fs::File,
+ iter::Peekable,
+ path::{Path, PathBuf},
+ time::Duration,
+use anyhow::Error;
+use serde::{Deserialize, Serialize};
+use pdm_api_types::TaskListItem;
+use proxmox_sys::fs::CreateOptions;
+/// A cache for fetched remote tasks.
+pub(super) struct TaskCache {
+ /// Cache entries
+ content: TaskCacheContent,
+ /// Cache entries were changed/removed.
+ dirty: bool,
+ /// File-location at which the cached tasks are stored.
+ cachefile_path: PathBuf,
+ /// File mode/owner/group for the cache file.
+ cachefile_options: CreateOptions,
+ /// Max. tasks per remote
+ max_tasks_per_remote: usize,
+impl TaskCache {
+ /// Create a new tasks cache instance by loading
+ /// the cache from disk.
+ pub(super) fn new(
+ cachefile_path: PathBuf,
+ cachefile_options: CreateOptions,
+ max_tasks_per_remote: usize,
+ ) -> Result<Self, Error> {
+ Ok(Self {
+ content: Self::load_content(&cachefile_path)?,
+ dirty: false,
+ cachefile_path,
+ cachefile_options,
+ max_tasks_per_remote,
+ })
+ }
+ /// Load the task cache contents from disk.
+ fn load_content(path: &Path) -> Result<TaskCacheContent, Error> {
+ let content = proxmox_sys::fs::file_read_optional_string(path)?;
+ let content = if let Some(content) = content {
+ serde_json::from_str(&content).unwrap_or_default()
+ } else {
+ Default::default()
+ };
+ Ok(content)
+ }
+ /// Get path for the cache's lockfile.
+ fn lockfile_path(&self) -> PathBuf {
+ let mut path = self.cachefile_path.clone();
+ path.set_extension("lock");
+ path
+ }
+ /// Persist the task cache
+ ///
+ /// This method requests an exclusive lock for the task cache lockfile.
+ pub(super) fn save(&mut self) -> Result<(), Error> {
+ // if we have not updated anything, we don't have to update the cache file
+ if !self.dirty {
+ return Ok(());
+ }
+ let _guard = self.lock(Duration::from_secs(5))?;
+ // Read content again, in case somebody has changed it in the meanwhile
+ let mut content = Self::load_content(&self.cachefile_path)?;
+ for (remote, entry) in self.content.remote_tasks.iter_mut() {
+ if let Some(other) = content.remote_tasks.remove(remote) {
+ entry.tasks =
+ Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
+ }
+ }
+ let bytes = serde_json::to_vec_pretty(&self.content)?;
+ proxmox_sys::fs::replace_file(
+ &self.cachefile_path,
+ &bytes,
+ self.cachefile_options.clone(),
+ true,
+ )?;
+ self.dirty = false;
+ Ok(())
+ }
+ /// Add tasks to the cache.
+ ///
+ /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
+ /// oldest ones are truncated.
+ pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+ let entry = self.content.remote_tasks.entry(remote.into()).or_default();
+ entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+ self.dirty = true;
+ }
+ // Get task data for a given remote.
+ pub(super) fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
+ self.content
+ .remote_tasks
+ .get(remote)
+ .map(|entry| entry.tasks.clone())
+ }
+ // Invalidate cache for a given remote.
+ pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
+ self.dirty = true;
+ self.content.remote_tasks.remove(remote);
+ }
+ // Lock the cache for modification.
+ //
+ // While the cache is locked, other users can still read the cache
+ // without a lock, since the cache file is replaced atomically
+ // when updating.
+ fn lock(&self, duration: Duration) -> Result<File, Error> {
+ proxmox_sys::fs::open_file_locked(
+ self.lockfile_path(),
+ duration,
+ true,
+ self.cachefile_options.clone(),
+ )
+ }
+ fn merge_tasks(
+ mut a: Vec<TaskListItem>,
+ mut b: Vec<TaskListItem>,
+ limit: usize,
+ ) -> Vec<TaskListItem> {
+ a.sort_by_key(|task| -task.starttime);
+ b.sort_by_key(|task| -task.starttime);
+ MergeTaskIterator {
+ left: a.into_iter().peekable(),
+ right: b.into_iter().peekable(),
+ }
+ .take(limit)
+ .collect()
+ }
+struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
+ left: Peekable<T>,
+ right: Peekable<T>,
+impl<T> Iterator for MergeTaskIterator<T>
+ T: Iterator<Item = TaskListItem>,
+ type Item = T::Item;
+ fn next(&mut self) -> Option<T::Item> {
+ let order = match (self.left.peek(), self.right.peek()) {
+ (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
+ (Some(_), None) => Some(Ordering::Greater),
+ (None, Some(_)) => Some(Ordering::Less),
+ (None, None) => None,
+ };
+ match order {
+ Some(Ordering::Greater) => self.left.next(),
+ Some(Ordering::Less) => self.right.next(),
+ Some(Ordering::Equal) => {
+ // Both unwraps are safe, the former peek/match
+ // guarantess that there is an element.
+ let l = self.left.peek().unwrap();
+ let r = self.right.peek().unwrap();
+ // Dedup if both lists contain the same task
+ if l.upid == r.upid {
+ // Take the one where the task is already finished
+ if l.endtime.is_some() {
+ let _ = self.right.next();
+ self.left.next()
+ } else {
+ let _ = self.left.next();
+ self.right.next()
+ }
+ } else {
+ self.left.next()
+ }
+ }
+ None => None,
+ }
+ }
+#[derive(Default, Debug, Serialize, Deserialize)]
+/// Per-remote entry in the task cache.
+struct TaskCacheEntry {
+ tasks: Vec<TaskListItem>,
+#[derive(Debug, Default, Serialize, Deserialize)]
+/// Content of the task cache file.
+struct TaskCacheContent {
+ remote_tasks: HashMap<String, TaskCacheEntry>,
+mod tests {
+ use pve_api_types::PveUpid;
+ use super::*;
+ use crate::test_support::temp::NamedTempFile;
+ fn make_upid(
+ starttime: i64,
+ endtime: Option<i64>,
+ status: Option<String>,
+ ) -> Result<TaskListItem, Error> {
+ let upid: PveUpid =
+ format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
+ .parse()?;
+ Ok(TaskListItem {
+ upid: upid.to_string(),
+ node: upid.node,
+ pid: upid.pid as i64,
+ pstart: upid.pstart,
+ starttime: upid.starttime,
+ worker_type: upid.worker_type,
+ worker_id: upid.worker_id,
+ user: upid.auth_id,
+ endtime,
+ status,
+ })
+ }
+ #[test]
+ fn basic_task_cache() -> Result<(), Error> {
+ let options = CreateOptions::new()
+ .owner(nix::unistd::Uid::effective())
+ .group(nix::unistd::Gid::effective())
+ .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
+ let temp_file = NamedTempFile::new(options.clone())?;
+ let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
+ let mut tasks = Vec::new();
+ let now = proxmox_time::epoch_i64();
+ for i in (0..20).rev() {
+ tasks.push(make_upid(now - 10 * i, None, None)?);
+ }
+ cache.add_tasks("some-remote", tasks.clone());
+ cache.save()?;
+ let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
+ let res = cache.get_tasks("some-remote").unwrap();
+ tasks.reverse();
+ assert_eq!(tasks, res);
+ Ok(())
+ }
+ #[test]
+ fn merge_tasks() -> Result<(), Error> {
+ // Arrange
+ let mut a = Vec::new();
+ for i in [30, 10, 20] {
+ a.push(make_upid(i, None, None)?);
+ }
+ let mut b = Vec::new();
+ for i in [25, 15, 35, 5] {
+ b.push(make_upid(i, None, None)?);
+ }
+ a.push(make_upid(40, None, None)?);
+ b.push(make_upid(40, Some(50), Some("some status".into()))?);
+ // Act
+ let tasks = TaskCache::merge_tasks(a, b, 5);
+ // Assert
+ assert_eq!(tasks.len(), 5);
+ assert_eq!(tasks[0].starttime, 40);
+ assert_eq!(tasks[0].endtime, Some(50));
+ assert_eq!(tasks[1].starttime, 35);
+ assert_eq!(tasks[2].starttime, 30);
+ assert_eq!(tasks[3].starttime, 25);
+ assert_eq!(tasks[4].starttime, 20);
+ Ok(())
+ }
More information about the pdm-devel
mailing list