[pdm-devel] [PATCH proxmox-datacenter-manager 08/15] task cache: fetch every 5mins, requesting only missing tasks

Wolfgang Bumiller w.bumiller at proxmox.com
Fri Jan 31 14:42:11 CET 2025


On Tue, Jan 28, 2025 at 01:25:13PM +0100, Lukas Wagner wrote:
> Instead of purging the cache completely and requesting the entire task
> history over the last seven days, we keep the per-remote task history in
> the cache and only fetch missing tasks since the last time we fetched
> them.
> 
> If a tracked task finishes, we also force to fetch the most recent task
> data, since that is the only way right now to get the task status *and*
> task endtime.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
>  server/src/remote_tasks/mod.rs        | 52 +++++++++++++++------------
>  server/src/remote_tasks/task_cache.rs | 25 +++++++++----
>  2 files changed, 49 insertions(+), 28 deletions(-)
> 
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 4a0552c..16c46a5 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -20,6 +20,8 @@ use task_cache::TaskCache;
>  
>  // TODO: Does this number make sense?
>  const CACHED_TASKS_PER_REMOTE: usize = 2000;
> +const REFRESH_EVERY_S: i64 = 300;
> +const OVERLAP_S: i64 = 60;
>  
>  /// Get tasks for all remotes
>  // FIXME: filter for privileges
> @@ -42,24 +44,32 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
>      // a task's endtime, which is only returned by
>      // /nodes/<node>/tasks...
>      // Room for improvements in the future.
> -    invalidate_cache_for_finished_tasks(&mut cache);
> +    let force_refresh_remotes = get_remotes_with_finished_tasks();
>  
> +    let now = proxmox_time::epoch_i64();
>      for (remote_name, remote) in &remotes.sections {
> -        if let Some(tasks) = cache.get_tasks(remote_name.as_str()) {
> -            // Data in cache is recent enough and has not been invalidated.
> -            all_tasks.extend(tasks);
> -        } else {
> -            let tasks = match fetch_tasks(remote).await {
> -                Ok(tasks) => tasks,
> +        let last_fetched = cache.get_last_fetched(remote_name).unwrap_or(0);
> +        let diff = now - last_fetched;
> +
> +        if diff > REFRESH_EVERY_S || diff < 0 || force_refresh_remotes.contains(remote_name) {
> +            // Add some overlap so that we for sure fetch every task - duplicates
> +            // are remove when adding the tasks to the cache.
> +            let fetch_since =
> +                (cache.get_most_recent_starttime(remote_name).unwrap_or(0) - OVERLAP_S).max(0);
> +
> +            match fetch_tasks(remote, fetch_since).await {
> +                Ok(tasks) => {
> +                    cache.add_tasks(remote_name.as_str(), tasks, now);
> +                }
>                  Err(err) => {
>                      // ignore errors for not reachable remotes
>                      continue;
>                  }
> -            };
> -            cache.add_tasks(remote_name.as_str(), tasks.clone());
> -
> -            all_tasks.extend(tasks);
> +            }
>          }
> +
> +        let tasks = cache.get_tasks(remote_name).unwrap_or_default();
> +        all_tasks.extend(tasks);
>      }
>  
>      let mut returned_tasks = add_running_tasks(all_tasks)?;
> @@ -125,7 +135,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
>  }
>  
>  /// Fetch tasks (active and finished) from a remote
> -async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
> +async fn fetch_tasks(remote: &Remote, since: i64) -> Result<Vec<TaskListItem>, Error> {
>      let mut tasks = Vec::new();
>  
>      match remote.ty {
> @@ -138,9 +148,8 @@ async fn fetch_tasks(remote: &Remote) -> Result<Vec<TaskListItem>, Error> {
>                  let params = ListTasks {
>                      // Include running tasks
>                      source: Some(ListTasksSource::All),
> -                    // TODO: How much task history do we want? Right now we just hard-code it
> -                    // to 7 days.
> -                    since: Some(proxmox_time::epoch_i64() - 7 * 24 * 60 * 60),
> +                    since: Some(since),
> +                    limit: Some(CACHED_TASKS_PER_REMOTE as u64),
>                      ..Default::default()
>                  };
>  
> @@ -182,18 +191,17 @@ fn map_tasks(tasks: Vec<ListTasksResponse>, remote: &str) -> Result<Vec<TaskList
>      Ok(mapped)
>  }
>  
> -/// Drops the cached task list of a remote for all finished tasks.
> +/// Get list of remotes which have finished tasks.
>  ///
>  /// We use this to force a refresh so that we get the full task
>  /// info (including `endtime`) in the next API call.
> -fn invalidate_cache_for_finished_tasks(cache: &mut TaskCache) {
> +fn get_remotes_with_finished_tasks() -> HashSet<String> {
>      let mut finished = FINISHED_FOREIGN_TASKS.write().expect("mutex poisoned");

Already mentioned this off list (since I also had looked at the old code
the first time just this week):
This could just use
    let finished = std::mem::take(&mut *FINISHED_...lock().unwrap());
to immediately unlock the mutex and use `into_iter()` instead of
`drain()` which should be more efficient.

>  
> -    // If a task is finished, we force a refresh for the remote - otherwise
> -    // we don't get the 'endtime' for the task.
> -    for task in finished.drain() {
> -        cache.invalidate_cache_for_remote(task.remote());
> -    }
> +    finished
> +        .drain()
> +        .map(|task| task.remote().to_string())
> +        .collect()
>  }
>  
>  /// Supplement the list of tasks that we received from the remote with
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index 8a98876..e08e3d4 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -107,10 +107,11 @@ impl TaskCache {
>      ///
>      /// If the total number of stored tasks exceeds `max_tasks_per_remote`, the
>      /// oldest ones are truncated.
> -    pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>) {
> +    pub(super) fn add_tasks(&mut self, remote: &str, tasks: Vec<TaskListItem>, last_fetched: i64) {
>          let entry = self.content.remote_tasks.entry(remote.into()).or_default();
>  
>          entry.tasks = Self::merge_tasks(entry.tasks.clone(), tasks, self.max_tasks_per_remote);

`entry.tasks = something(entry.tasks.clone())` seems wasteful, this is
also solved by `take()`

    entry.tasks = Self::merge_tasks(take(&mut entry.tasks), ...);

> +        entry.last_fetched = last_fetched;
>  
>          self.dirty = true;
>      }
> @@ -123,10 +124,21 @@ impl TaskCache {
>              .map(|entry| entry.tasks.clone())
>      }
>  
> -    // Invalidate cache for a given remote.
> -    pub(super) fn invalidate_cache_for_remote(&mut self, remote: &str) {
> -        self.dirty = true;
> -        self.content.remote_tasks.remove(remote);
> +    // Get the `last_fetched` time stamp for a given remote.
> +    pub(super) fn get_last_fetched(&self, remote: &str) -> Option<i64> {
> +        self.content
> +            .remote_tasks
> +            .get(remote)
> +            .map(|entry| entry.last_fetched)
> +    }
> +
> +    // Get the `most_recent_starttime` time stamp for a given remote.
> +    pub(super) fn get_most_recent_starttime(&self, remote: &str) -> Option<i64> {
> +        if let Some(entry) = self.content.remote_tasks.get(remote) {
> +            Some(entry.tasks.first()?.starttime)
> +        } else {
> +            None
> +        }
>      }
>  
>      // Lock the cache for modification.
> @@ -211,6 +223,7 @@ where
>  /// Per-remote entry in the task cache.
>  struct TaskCacheEntry {
>      tasks: Vec<TaskListItem>,
> +    last_fetched: i64,
>  }
>  
>  #[derive(Debug, Default, Serialize, Deserialize)]
> @@ -267,7 +280,7 @@ mod tests {
>              tasks.push(make_upid(now - 10 * i, None, None)?);
>          }
>  
> -        cache.add_tasks("some-remote", tasks.clone());
> +        cache.add_tasks("some-remote", tasks.clone(), 0);
>          cache.save()?;
>  
>          let cache = TaskCache::new(temp_file.path().into(), options, 50)?;
> -- 
> 2.39.5




More information about the pdm-devel mailing list