[pdm-devel] [PATCH proxmox-datacenter-manager 05/15] task cache: add FIFO cache replacement policy
Lukas Wagner
l.wagner at proxmox.com
Tue Jan 28 13:25:10 CET 2025
The task cache will now hold up to a certain number of tasks
*per remote*. If the capacity is exceeded, the oldest tasks,
based on the task's starttime, will be dropped.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
server/src/task_cache.rs | 193 +++++++++++++++++++++++++++++++--------
1 file changed, 153 insertions(+), 40 deletions(-)
diff --git a/server/src/task_cache.rs b/server/src/task_cache.rs
index f24af3f..032f2a4 100644
--- a/server/src/task_cache.rs
+++ b/server/src/task_cache.rs
@@ -1,6 +1,8 @@
use std::{
+ cmp::Ordering,
collections::{HashMap, HashSet},
fs::File,
+ iter::Peekable,
path::{Path, PathBuf},
sync::{LazyLock, RwLock},
time::Duration,
@@ -18,6 +20,9 @@ use tokio::task::JoinHandle;
use crate::{api::pve, task_utils};
+// TODO: Does this number make sense?
+const CACHED_TASKS_PER_REMOTE: usize = 2000;
+
/// Get tasks for all remotes
// FIXME: filter for privileges
pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
@@ -31,7 +36,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
let cache_path = Path::new(pdm_buildcfg::PDM_CACHE_DIR).join("taskcache.json");
- let mut cache = TaskCache::new(cache_path, file_options)?;
+ let mut cache = TaskCache::new(cache_path, file_options, CACHED_TASKS_PER_REMOTE)?;
// Force a refresh for all tasks of a remote if a task is finished.
// Not super nice, but saves us from persisting finished tasks. Also,
@@ -53,7 +58,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
continue;
}
};
- cache.set_tasks(remote_name.as_str(), tasks.clone());
+ cache.add_tasks(remote_name.as_str(), tasks.clone());
all_tasks.extend(tasks);
}
@@ -240,10 +245,6 @@ struct TaskCache {
/// Cache entries
content: TaskCacheContent,
- /// Entries that were added or updated - these will be persistet
- /// when `save` is called.
- new_or_updated: TaskCacheContent,
-
/// Cache entries were changed/removed.
dirty: bool,
@@ -252,18 +253,25 @@ struct TaskCache {
/// File mode/owner/group for the cache file.
cachefile_options: CreateOptions,
+
+ /// Max. tasks per remote
+ max_tasks_per_remote: usize,
}
impl TaskCache {
/// Create a new tasks cache instance by loading
/// the cache from disk.
- fn new(cachefile_path: PathBuf, cachefile_options: CreateOptions) -> Result<Self, Error> {
+ fn new(
+ cachefile_path: PathBuf,
+ cachefile_options: CreateOptions,
+ max_tasks_per_remote: usize,
+ ) -> Result<Self, Error> {
Ok(Self {
content: Self::load_content(&cachefile_path)?,
- new_or_updated: Default::default(),
dirty: false,
cachefile_path,
cachefile_options,
+ max_tasks_per_remote,
})
}
@@ -301,15 +309,14 @@ impl TaskCache {
// Read content again, in case somebody has changed it in the meanwhile
let mut content = Self::load_content(&self.cachefile_path)?;
- for (remote_name, entry) in self.new_or_updated.remote_tasks.drain() {
- if let Some(existing_entry) = content.remote_tasks.get_mut(&remote_name) {
- *existing_entry = entry;
- } else {
- content.remote_tasks.insert(remote_name, entry);
+ for (remote, entry) in self.content.remote_tasks.iter_mut() {
+ if let Some(other) = content.remote_tasks.remove(remote) {
+ entry.tasks =
+ Self::merge_tasks(entry.tasks.clone(), other.tasks, self.max_tasks_per_remote);
}
}
- let bytes = serde_json::to_vec_pretty(&content)?;
+ let bytes = serde_json::to_vec_pretty(&self.content)?;
proxmox_sys::fs::replace_file(
&self.cachefile_path,
@@ -323,20 +330,22 @@ impl TaskCache {
Ok(())
}
- // Update task data for a given remote.
- fn set_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+ /// Add tasks to the cache.
+ ///
+ /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
+ /// oldest ones are truncated.
+ fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+ let entry = self.content.remote_tasks.entry(remote.into()).or_default();
+
+ entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+
self.dirty = true;
- self.new_or_updated
- .remote_tasks
- .insert(remote.to_string(), TaskCacheEntry { tasks });
}
// Get task data for a given remote.
fn get_tasks(&self, remote: &str) -> Option<Vec<TaskListItem>> {
if let Some(entry) = self.content.remote_tasks.get(remote) {
Some(entry.tasks.clone())
- } else if let Some(entry) = self.new_or_updated.remote_tasks.get(remote) {
- Some(entry.tasks.clone())
} else {
None
}
@@ -361,9 +370,72 @@ impl TaskCache {
self.cachefile_options.clone(),
)
}
+
+ fn merge_tasks(
+ mut a: Vec<TaskListItem>,
+ mut b: Vec<TaskListItem>,
+ limit: usize,
+ ) -> Vec<TaskListItem> {
+ a.sort_by_key(|task| -task.starttime);
+ b.sort_by_key(|task| -task.starttime);
+
+ MergeTaskIterator {
+ left: a.into_iter().peekable(),
+ right: b.into_iter().peekable(),
+ }
+ .take(limit)
+ .collect()
+ }
+}
+
+struct MergeTaskIterator<T: Iterator<Item = TaskListItem>> {
+ left: Peekable<T>,
+ right: Peekable<T>,
+}
+
+impl<T> Iterator for MergeTaskIterator<T>
+where
+ T: Iterator<Item = TaskListItem>,
+{
+ type Item = T::Item;
+
+ fn next(&mut self) -> Option<T::Item> {
+ let order = match (self.left.peek(), self.right.peek()) {
+ (Some(l), Some(r)) => Some(l.starttime.cmp(&r.starttime)),
+ (Some(_), None) => Some(Ordering::Greater),
+ (None, Some(_)) => Some(Ordering::Less),
+ (None, None) => None,
+ };
+
+ match order {
+ Some(Ordering::Greater) => self.left.next(),
+ Some(Ordering::Less) => self.right.next(),
+ Some(Ordering::Equal) => {
+ // Both unwraps are safe, the former peek/match
+ // guarantess that there is an element.
+ let l = self.left.peek().unwrap();
+ let r = self.right.peek().unwrap();
+
+ // Dedup if both lists contain the same task
+ if l.upid == r.upid {
+ // Take the one where the task is already finished
+ if l.endtime.is_some() {
+ let _ = self.right.next();
+ self.left.next()
+ } else {
+ let _ = self.left.next();
+ self.right.next()
+ }
+ } else {
+ self.left.next()
+ }
+ }
+ None => None,
+ }
+ }
}
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Default, Debug, Serialize, Deserialize)]
/// Per-remote entry in the task cache.
struct TaskCacheEntry {
tasks: Vec<TaskListItem>,
@@ -524,6 +596,29 @@ mod tests {
use super::*;
use crate::test_support::temp::NamedTempFile;
+ fn make_upid(
+ starttime: i64,
+ endtime: Option<i64>,
+ status: Option<String>,
+ ) -> Result<TaskListItem, Error> {
+ let upid: PveUpid =
+ format!("UPID:pve-node:0000C530:001C9BEC:{starttime:08X}:stopall::root at pam:",)
+ .parse()?;
+
+ Ok(TaskListItem {
+ upid: upid.to_string(),
+ node: upid.node,
+ pid: upid.pid as i64,
+ pstart: upid.pstart,
+ starttime: upid.starttime,
+ worker_type: upid.worker_type,
+ worker_id: upid.worker_id,
+ user: upid.auth_id,
+ endtime,
+ status,
+ })
+ }
+
#[test]
fn basic_task_cache() -> Result<(), Error> {
let options = CreateOptions::new()
@@ -533,37 +628,55 @@ mod tests {
let temp_file = NamedTempFile::new(options.clone())?;
- let mut cache = TaskCache::new(temp_file.path().into(), options.clone())?;
+ let mut cache = TaskCache::new(temp_file.path().into(), options.clone(), 50)?;
let mut tasks = Vec::new();
let now = proxmox_time::epoch_i64();
for i in (0..20).rev() {
- let upid: PveUpid =
- "UPID:pve-node:0000C530:001C9BEC:677E934A:stopall::root at pam:".parse()?;
-
- tasks.push(TaskListItem {
- upid: upid.to_string(),
- node: upid.node,
- pid: upid.pid as i64,
- pstart: upid.pstart,
- starttime: now - 10 * i,
- worker_type: upid.worker_type,
- worker_id: upid.worker_id,
- user: upid.auth_id,
- endtime: None,
- status: None,
- });
+ tasks.push(make_upid(now - 10 * i, None, None)?);
}
- cache.set_tasks("some-remote", tasks.clone());
+ cache.add_tasks("some-remote", tasks.clone());
cache.save()?;
- let cache = TaskCache::new(temp_file.path().into(), options)?;
+ let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
let res = cache.get_tasks("some-remote").unwrap();
+ tasks.reverse();
assert_eq!(tasks, res);
Ok(())
}
+
+ #[test]
+ fn merge_tasks() -> Result<(), Error> {
+ // Arrange
+ let mut a = Vec::new();
+ for i in [30, 10, 20] {
+ a.push(make_upid(i, None, None)?);
+ }
+
+ let mut b = Vec::new();
+ for i in [25, 15, 35, 5] {
+ b.push(make_upid(i, None, None)?);
+ }
+
+ a.push(make_upid(40, None, None)?);
+ b.push(make_upid(40, Some(50), Some("some status".into()))?);
+
+ // Act
+ let tasks = TaskCache::merge_tasks(a, b, 5);
+
+ // Assert
+ assert_eq!(tasks.len(), 5);
+ assert_eq!(tasks[0].starttime, 40);
+ assert_eq!(tasks[0].endtime, Some(50));
+ assert_eq!(tasks[1].starttime, 35);
+ assert_eq!(tasks[2].starttime, 30);
+ assert_eq!(tasks[3].starttime, 25);
+ assert_eq!(tasks[4].starttime, 20);
+
+ Ok(())
+ }
}
--
2.39.5
More information about the pdm-devel
mailing list