[pdm-devel] [PATCH proxmox-datacenter-manager 08/15] task cache: fetch every 5mins, requesting only missing tasks
Wolfgang Bumiller
w.bumiller at proxmox.com
Fri Jan 31 14:42:11 CET 2025
On Tue, Jan 28, 2025 at 01:25:13PM +0100, Lukas Wagner wrote:
> Instead of purging the cache completely and requesting the entire task
> history over the last seven days, we keep the per-remote task history in
> the cache and only fetch missing tasks since the last time we fetched
> them.
>
> If a tracked task finishes, we also force to fetch the most recent task
> data, since that is the only way right now to get the task status *and*
> task endtime.
>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> server/src/remote_tasks/mod.rs | 52 +++++++++++++++------------
> server/src/remote_tasks/task_cache.rs | 25 +++++++++----
> 2 files changed, 49 insertions(+), 28 deletions(-)
>
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 4a0552c..16c46a5 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -20,6 +20,8 @@ use task_cache::TaskCache;
>
> // TODO: Does this number make sense?
> const CACHED_TASKS_PER_REMOTE: usize = 2000;
> +const REFRESH_EVERY_S: i64 = 300;
> +const OVERLAP_S: i64 = 60;
>
> /// Get tasks for all remotes
> // FIXME: filter for privileges
> @@ -42,24 +44,32 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
> // a task's endtime, which is only returned by
> // /nodes/<node>/tasks...
> // Room for improvements in the future.
> - invalidate_cache_for_finished_tasks(&mut cache);
> + let force_refresh_remotes = get_remotes_with_finished_tasks();
>
> + let now = proxmox_time::epoch_i64();
> for (remote_name, remote) in &remotes.sections {
> - if let Some(tasks) = cache.get_tasks(remote_name.as_str()) {
> - // Data in cache is recent enough and has not been invalidated.
> - all_tasks.extend(tasks);
> - } else {
> - let tasks = match fetch_tasks(remote).await {
> - Ok(tasks) => tasks,
> + let last_fetched = cache.get_last_fetched(remote_name).unwrap_or(0);
> + let diff = now - last_fetched;
> +
> + if diff > REFRESH_EVERY_S || diff < 0 || force_refresh_remotes.contains(remote_name) {
> + // Add some overlap so that we for sure fetch every task - duplicates
> + // are remove when adding the tasks to the cache.
> + let fetch_since =
> + (cache.get_most_recent_starttime(remote_name).unwrap_or(0) - OVERLAP_S).max(0);
> +
> + match fetch_tasks(remote, fetch_since).await {
> + Ok(tasks) => {
> + cache.add_tasks(remote_name.as_str(), tasks, now);
> + }
> Err(err) => {
> // ignore errors for not reachable remotes
> continue;
> }
> - };
> - cache.add_tasks(remote_name.as_str(), tasks.clone());
> -
> - all_tasks.extend(tasks);
> + }
> }
> +
> + let tasks = cache.get_tasks(remote_name).unwrap_or_default();
> + all_tasks.extend(tasks);
> }
>
> let mut returned_tasks = add_running_tasks(all_tasks)?;
> @@ -125,7 +135,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
> }
>
> /// Fetch tasks (active and finished) from a remote
> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<Vec<TaskListItem>, Error> {
> let mut tasks = Vec::new();
>
> match remote.ty {
> @@ -138,9 +148,8 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> let params = ListTasks {
> // Include running tasks
> source: Some(ListTasksSource::All),
> - // TODO: How much task history do we want? Right now we just hard-code it
> - // to 7 days.
> - since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> + since: Some(since),
> + limit: Some(CACHED_TASKS_PER_REMOTE as u64),
> ..Default::default()
> };
>
> @@ -182,18 +191,17 @@ fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskList
> Ok(mapped)
> }
>
> -/// Drops the cached task list of a remote for all finished tasks.
> +/// Get list of remotes which have finished tasks.
> ///
> /// We use this to force a refresh so that we get the full task
> /// info (including `endtime`) in the next API call.
> -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
> +fn get_remotes_with_finished_tasks() -> HashSet<String> {
> let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");
Already mentioned this off list (since I also had looked at the old code
the first time just this week):
This could just use
let finished = std::mem::take(&mut *FINISHED_...lock().unwrap());
to immediately unlock the mutex and use `into_iter()` instead of
`drain()` which should be more efficient.
>
> - // If a task is finished, we force a refresh for the remote - otherwise
> - // we don't get the 'endtime' for the task.
> - for task in finished.drain() {
> - cache.invalidate_cache_for_remote(task.remote());
> - }
> + finished
> + .drain()
> + .map(|task| task.remote().to_string())
> + .collect()
> }
>
> /// Supplement the list of tasks that we received from the remote with
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index 8a98876..e08e3d4 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -107,10 +107,11 @@ impl TaskCache {
> ///
> /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
> /// oldest ones are truncated.
> - pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
> + pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, last_fetched: i64) {
> let entry = self.content.remote_tasks.entry(remote.into()).or_default();
>
> entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);
`entry.tasks = something(entry.tasks.clone())` seems wasteful, this is
also solved by `take()`
entry.tasks = Self::merge_tasks(take(&mut entry.tasks), ...);
> + entry.last_fetched = last_fetched;
>
> self.dirty = true;
> }
> @@ -123,10 +124,21 @@ impl TaskCache {
> .map(|entry| entry.tasks.clone())
> }
>
> - // Invalidate cache for a given remote.
> - pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
> - self.dirty = true;
> - self.content.remote_tasks.remove(remote);
> + // Get the `last_fetched` time stamp for a given remote.
> + pub(super) fn get_last_fetched(&self, remote: &str) -> Option<i64> {
> + self.content
> + .remote_tasks
> + .get(remote)
> + .map(|entry| entry.last_fetched)
> + }
> +
> + // Get the `most_recent_starttime` time stamp for a given remote.
> + pub(super) fn get_most_recent_starttime(&self, remote: &str) -> Option<i64> {
> + if let Some(entry) = self.content.remote_tasks.get(remote) {
> + Some(entry.tasks.first()?.starttime)
> + } else {
> + None
> + }
> }
>
> // Lock the cache for modification.
> @@ -211,6 +223,7 @@ where
> /// Per-remote entry in the task cache.
> struct TaskCacheEntry {
> tasks: Vec<TaskListItem>,
> + last_fetched: i64,
> }
>
> #[derive(Debug, Default, Serialize, Deserialize)]
> @@ -267,7 +280,7 @@ mod tests {
> tasks.push(make_upid(now - 10 * i, None, None)?);
> }
>
> - cache.add_tasks("some-remote", tasks.clone());
> + cache.add_tasks("some-remote", tasks.clone(), 0);
> cache.save()?;
>
> let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
> --
> 2.39.5
More information about the pdm-devel
mailing list