[pdm-devel] [PATCH proxmox-datacenter-manager v3 3/6] remote tasks: improve locking for task archive iterator

Wolfgang Bumiller w.bumiller at proxmox.com
Fri Apr 18 09:12:39 CEST 2025


On Thu, Apr 17, 2025 at 03:22:53PM +0200, Lukas Wagner wrote:
> Instead of awkwardly using into_lock for keeping the archive locked,
> pass the lock as a reference to TaskCache::get_tasks_with_lock. The
> iterator itself only holds the lock if get use the auto-locking
> TaskCache::get_tasks, otherwise we ensure that the lock lives long
> enough via lifetimes and PhantomData in the iterator.
> 
> Suggested-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> 
> Notes:
>     New in v3
> 
>  server/src/remote_tasks/mod.rs        |  2 +-
>  server/src/remote_tasks/task_cache.rs | 62 +++++++++++++--------------
>  2 files changed, 31 insertions(+), 33 deletions(-)
> 
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 126c9ad3..b0fc052f 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -24,7 +24,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
>              GetTasks::All
>          };
>  
> -        for task in &mut cache
> +        for task in cache
>              .get_tasks(which)?
>              .skip(filters.start as usize)
>              .take(filters.limit as usize)
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> index f847d441..c59c5235 100644
> --- a/server/src/remote_tasks/task_cache.rs
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -6,6 +6,7 @@ use std::{
>      fs::File,
>      io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
>      iter::Peekable,
> +    marker::PhantomData,
>      path::{Path, PathBuf},
>      time::Duration,
>  };
> @@ -213,8 +214,8 @@ impl TaskCache {
>              .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
>              .collect();
>  
> -        let mut task_iter = self
> -            .get_tasks_with_lock(GetTasks::Active, lock)
> +        let task_iter = self
> +            .get_tasks_with_lock(GetTasks::Active, &lock)
>              .context("failed to create archive iterator for active tasks")?;
>  
>          let mut active_tasks =
> @@ -226,10 +227,6 @@ impl TaskCache {
>                  }
>              }));
>  
> -        // Consume the iterator to get back the lock. The lock is held
> -        // until _lock is finally dropped at the end of the function.
> -        let _lock = task_iter.into_lock();
> -
>          let mut new_finished_tasks = Vec::new();
>  
>          for task in tasks {
> @@ -387,7 +384,7 @@ impl TaskCache {
>  
>          let mut tasks = Vec::new();
>          let mut task_iter = self
> -            .get_tasks_with_lock(GetTasks::Active, lock)
> +            .get_tasks_with_lock(GetTasks::Active, &lock)
>              .context("failed to create active task iterator")?;
>  
>          for task in &mut task_iter {
> @@ -404,8 +401,6 @@ impl TaskCache {
>          tasks.push(task.clone());
>          tasks.sort_by(compare_tasks_reverse);
>  
> -        let _lock = task_iter.into_lock();
> -
>          let mut state = self.read_state();
>  
>          state
> @@ -432,24 +427,28 @@ impl TaskCache {
>      ///
>      /// This function will request a non-exclusive read-lock, don't call if
>      /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
> -    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
> -        let lock = self
> -            .lock(false)
> -            .context("get_tasks: failed to acquire lock")?;
> -        self.get_tasks_with_lock(mode, lock)
> +    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'static>, Error> {
> +        let lock = self.lock(false).context("failed to lock archive")?;
> +        self.get_tasks_impl(mode, Some(lock))
>              .context("failed to create task archive iterator")
>      }
>  
>      /// Iterate over cached tasks.
>      ///
> -    /// This function requires you to pass a lock. If you want to continue to hold the lock
> -    /// after iterating, you can consume the iterator by calling
> -    /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
> -    pub fn get_tasks_with_lock(
> +    /// This function requires you to pass a lock.
> +    pub fn get_tasks_with_lock<'a>(
>          &self,
>          mode: GetTasks,
> -        lock: TaskCacheLock,
> -    ) -> Result<TaskArchiveIterator, Error> {
> +        _lock: &'a TaskCacheLock,
> +    ) -> Result<TaskArchiveIterator<'a>, Error> {
> +        self.get_tasks_impl(mode, None)
> +    }
> +
> +    pub fn get_tasks_impl<'a>(

^ This must not be `pub`, given the lifetime is chosen "at will" by the
caller and the relation to the lock's lifetime is not compile-time
enforced. Only the non-`_impl` functions enforce this.

> +        &self,
> +        mode: GetTasks,
> +        lock: Option<TaskCacheLock>,
> +    ) -> Result<TaskArchiveIterator<'a>, Error> {
>          match mode {
>              GetTasks::All => {
>                  let archive_files = self.archive_files()?.into_iter().map(|pair| pair.file);
> @@ -682,32 +681,31 @@ pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
>  }
>  
>  /// Iterator over the task archive.
> -pub struct TaskArchiveIterator {
> +pub struct TaskArchiveIterator<'a> {
>      /// Archive files to read.
>      files: Box<dyn Iterator<Item = PathBuf>>,
>      /// Archive iterator we are currently using, if any
>      current: Option<ArchiveIterator<BufReader<File>>>,
> -    /// Lock for this archive.
> -    lock: TaskCacheLock,
> +    /// Lock for this archive. This contains the lock in case we
> +    /// need to keep the archive locked while iterating over it.
> +    _lock: Option<TaskCacheLock>,
> +    /// PhantomData to bind the lifetime of the iterator to an externally held lock.
> +    _lifetime: PhantomData<&'a ()>,
>  }
>  
> -impl TaskArchiveIterator {
> +impl TaskArchiveIterator<'_> {
>      /// Create a new task archive iterator.
> -    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
> +    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: Option<TaskCacheLock>) -> Self {
>          Self {
>              files,
>              current: None,
> -            lock,
> +            _lock: lock,
> +            _lifetime: PhantomData,
>          }
>      }
> -
> -    /// Return the task archive lock, consuming `self`.
> -    pub fn into_lock(self) -> TaskCacheLock {
> -        self.lock
> -    }
>  }
>  
> -impl Iterator for &mut TaskArchiveIterator {
> +impl Iterator for TaskArchiveIterator<'_> {
>      type Item = Result<TaskCacheItem, Error>;
>  
>      fn next(&mut self) -> Option<Self::Item> {
> -- 
> 2.39.5




More information about the pdm-devel mailing list