[pdm-devel] [PATCH proxmox-datacenter-manager 08/15] task cache: fetch every 5mins, requesting only missing tasks
Lukas Wagner
l.wagner at proxmox.com
Tue Jan 28 13:25:13 CET 2025
Instead of purging the cache completely and requesting the entire task
history over the last seven days, we keep the per-remote task history in
the cache and only fetch missing tasks since the last time we fetched
them.
If a tracked task finishes, we also force to fetch the most recent task
data, since that is the only way right now to get the task status *and*
task endtime.
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
server/src/remote_tasks/mod.rs | 52 +++++++++++++++------------
server/src/remote_tasks/task_cache.rs | 25 +++++++++----
2 files changed, 49 insertions(+), 28 deletions(-)
diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 4a0552c..16c46a5 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -20,6 +20,8 @@ use task_cache::TaskCache;
// TODO: Does this number make sense?
const CACHED_TASKS_PER_REMOTE: usize = 2000;
+const REFRESH_EVERY_S: i64 = 300;
+const OVERLAP_S: i64 = 60;
/// Get tasks for all remotes
// FIXME: filter for privileges
@@ -42,24 +44,32 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
// a task's endtime, which is only returned by
// /nodes/<node>/tasks...
// Room for improvements in the future.
- invalidate_cache_for_finished_tasks(&mut cache);
+ let force_refresh_remotes = get_remotes_with_finished_tasks();
+ let now = proxmox_time::epoch_i64();
for (remote_name, remote) in &remotes.sections {
- if let Some(tasks) = cache.get_tasks(remote_name.as_str()) {
- // Data in cache is recent enough and has not been invalidated.
- all_tasks.extend(tasks);
- } else {
- let tasks = match fetch_tasks(remote).await {
- Ok(tasks) => tasks,
+ let last_fetched = cache.get_last_fetched(remote_name).unwrap_or(0);
+ let diff = now - last_fetched;
+
+ if diff > REFRESH_EVERY_S || diff < 0 || force_refresh_remotes.contains(remote_name) {
+ // Add some overlap so that we for sure fetch every task - duplicates
+ // are remove when adding the tasks to the cache.
+ let fetch_since =
+ (cache.get_most_recent_starttime(remote_name).unwrap_or(0) - OVERLAP_S).max(0);
+
+ match fetch_tasks(remote, fetch_since).await {
+ Ok(tasks) => {
+ cache.add_tasks(remote_name.as_str(), tasks, now);
+ }
Err(err) => {
// ignore errors for not reachable remotes
continue;
}
- };
- cache.add_tasks(remote_name.as_str(), tasks.clone());
-
- all_tasks.extend(tasks);
+ }
}
+
+ let tasks = cache.get_tasks(remote_name).unwrap_or_default();
+ all_tasks.extend(tasks);
}
let mut returned_tasks = add_running_tasks(all_tasks)?;
@@ -125,7 +135,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
}
/// Fetch tasks (active and finished) from a remote
-async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
+async fn fetch_tasks(remote: &Remote, since: i64) -> Result<Vec<TaskListItem>, Error> {
let mut tasks = Vec::new();
match remote.ty {
@@ -138,9 +148,8 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
let params = ListTasks {
// Include running tasks
source: Some(ListTasksSource::All),
- // TODO: How much task history do we want? Right now we just hard-code it
- // to 7 days.
- since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
+ since: Some(since),
+ limit: Some(CACHED_TASKS_PER_REMOTE as u64),
..Default::default()
};
@@ -182,18 +191,17 @@ fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskList
Ok(mapped)
}
-/// Drops the cached task list of a remote for all finished tasks.
+/// Get list of remotes which have finished tasks.
///
/// We use this to force a refresh so that we get the full task
/// info (including `endtime`) in the next API call.
-fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
+fn get_remotes_with_finished_tasks() -> HashSet<String> {
let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
- // If a task is finished, we force a refresh for the remote - otherwise
- // we don't get the 'endtime' for the task.
- for task in finished.drain() {
- cache.invalidate_cache_for_remote(task.remote());
- }
+ finished
+ .drain()
+ .map(|task| task.remote().to_string())
+ .collect()
}
/// Supplement the list of tasks that we received from the remote with
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index 8a98876..e08e3d4 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -107,10 +107,11 @@ impl TaskCache {
///
/// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
/// oldest ones are truncated.
- pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
+ pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, last_fetched: i64) {
let entry = self.content.remote_tasks.entry(remote.into()).or_default();
entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
+ entry.last_fetched = last_fetched;
self.dirty = true;
}
@@ -123,10 +124,21 @@ impl TaskCache {
.map(|entry| entry.tasks.clone())
}
- // Invalidate cache for a given remote.
- pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
- self.dirty = true;
- self.content.remote_tasks.remove(remote);
+ // Get the `last_fetched` time stamp for a given remote.
+ pub(super) fn get_last_fetched(&self, remote: &str) -> Option<i64> {
+ self.content
+ .remote_tasks
+ .get(remote)
+ .map(|entry| entry.last_fetched)
+ }
+
+ // Get the `most_recent_starttime` time stamp for a given remote.
+ pub(super) fn get_most_recent_starttime(&self, remote: &str) -> Option<i64> {
+ if let Some(entry) = self.content.remote_tasks.get(remote) {
+ Some(entry.tasks.first()?.starttime)
+ } else {
+ None
+ }
}
// Lock the cache for modification.
@@ -211,6 +223,7 @@ where
/// Per-remote entry in the task cache.
struct TaskCacheEntry {
tasks: Vec<TaskListItem>,
+ last_fetched: i64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
@@ -267,7 +280,7 @@ mod tests {
tasks.push(make_upid(now - 10 * i, None, None)?);
}
- cache.add_tasks("some-remote", tasks.clone());
+ cache.add_tasks("some-remote", tasks.clone(), 0);
cache.save()?;
let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
--
2.39.5
More information about the pdm-devel
mailing list