[pdm-devel] [PATCH proxmox-datacenter-manager v4 3/6] remote tasks: improve locking for task archive iterator

Lukas Wagner l.wagner at proxmox.com
Fri Apr 18 10:32:07 CEST 2025


Instead of awkwardly using into_lock for keeping the archive locked,
pass the lock as a reference to TaskCache::get_tasks_with_lock. The
iterator itself only holds the lock if get use the auto-locking
TaskCache::get_tasks, otherwise we ensure that the lock lives long
enough via lifetimes and PhantomData in the iterator.

Suggested-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---

Notes:
    New in v3
    
    Changes since v3:
      - remove unecessary (and potentially unsafe) pub for get_tasks_impl
        (copy/paste mistake)

 server/src/remote_tasks/mod.rs        |  2 +-
 server/src/remote_tasks/task_cache.rs | 62 +++++++++++++--------------
 2 files changed, 31 insertions(+), 33 deletions(-)

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 126c9ad3..b0fc052f 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -24,7 +24,7 @@ pub async fn get_tasks(filters: TaskFilters) -> Result<Vec<TaskListItem>, Error>
             GetTasks::All
         };
 
-        for task in &mut cache
+        for task in cache
             .get_tasks(which)?
             .skip(filters.start as usize)
             .take(filters.limit as usize)
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
index f8dd821e..6e85f10a 100644
--- a/server/src/remote_tasks/task_cache.rs
+++ b/server/src/remote_tasks/task_cache.rs
@@ -6,6 +6,7 @@ use std::{
     fs::File,
     io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
     iter::Peekable,
+    marker::PhantomData,
     path::{Path, PathBuf},
     time::Duration,
 };
@@ -213,8 +214,8 @@ impl TaskCache {
             .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
             .collect();
 
-        let mut task_iter = self
-            .get_tasks_with_lock(GetTasks::Active, lock)
+        let task_iter = self
+            .get_tasks_with_lock(GetTasks::Active, &lock)
             .context("failed to create archive iterator for active tasks")?;
 
         let mut active_tasks =
@@ -226,10 +227,6 @@ impl TaskCache {
                 }
             }));
 
-        // Consume the iterator to get back the lock. The lock is held
-        // until _lock is finally dropped at the end of the function.
-        let _lock = task_iter.into_lock();
-
         let mut new_finished_tasks = Vec::new();
 
         for task in tasks {
@@ -387,7 +384,7 @@ impl TaskCache {
 
         let mut tasks = Vec::new();
         let mut task_iter = self
-            .get_tasks_with_lock(GetTasks::Active, lock)
+            .get_tasks_with_lock(GetTasks::Active, &lock)
             .context("failed to create active task iterator")?;
 
         for task in &mut task_iter {
@@ -404,8 +401,6 @@ impl TaskCache {
         tasks.push(task.clone());
         tasks.sort_by(compare_tasks_reverse);
 
-        let _lock = task_iter.into_lock();
-
         let mut state = self.read_state();
 
         state
@@ -432,24 +427,28 @@ impl TaskCache {
     ///
     /// This function will request a non-exclusive read-lock, don't call if
     /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
-    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
-        let lock = self
-            .lock(false)
-            .context("get_tasks: failed to acquire lock")?;
-        self.get_tasks_with_lock(mode, lock)
+    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'static>, Error> {
+        let lock = self.lock(false).context("failed to lock archive")?;
+        self.get_tasks_impl(mode, Some(lock))
             .context("failed to create task archive iterator")
     }
 
     /// Iterate over cached tasks.
     ///
-    /// This function requires you to pass a lock. If you want to continue to hold the lock
-    /// after iterating, you can consume the iterator by calling
-    /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
-    pub fn get_tasks_with_lock(
+    /// This function requires you to pass a lock.
+    pub fn get_tasks_with_lock<'a>(
         &self,
         mode: GetTasks,
-        lock: TaskCacheLock,
-    ) -> Result<TaskArchiveIterator, Error> {
+        _lock: &'a TaskCacheLock,
+    ) -> Result<TaskArchiveIterator<'a>, Error> {
+        self.get_tasks_impl(mode, None)
+    }
+
+    fn get_tasks_impl<'a>(
+        &self,
+        mode: GetTasks,
+        lock: Option<TaskCacheLock>,
+    ) -> Result<TaskArchiveIterator<'a>, Error> {
         match mode {
             GetTasks::All => {
                 let archive_files = self.archive_files()?.into_iter().map(|pair| pair.file);
@@ -682,32 +681,31 @@ pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
 }
 
 /// Iterator over the task archive.
-pub struct TaskArchiveIterator {
+pub struct TaskArchiveIterator<'a> {
     /// Archive files to read.
     files: Box<dyn Iterator<Item = PathBuf>>,
     /// Archive iterator we are currently using, if any
     current: Option<ArchiveIterator<BufReader<File>>>,
-    /// Lock for this archive.
-    lock: TaskCacheLock,
+    /// Lock for this archive. This contains the lock in case we
+    /// need to keep the archive locked while iterating over it.
+    _lock: Option<TaskCacheLock>,
+    /// PhantomData to bind the lifetime of the iterator to an externally held lock.
+    _lifetime: PhantomData<&'a ()>,
 }
 
-impl TaskArchiveIterator {
+impl TaskArchiveIterator<'_> {
     /// Create a new task archive iterator.
-    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
+    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: Option<TaskCacheLock>) -> Self {
         Self {
             files,
             current: None,
-            lock,
+            _lock: lock,
+            _lifetime: PhantomData,
         }
     }
-
-    /// Return the task archive lock, consuming `self`.
-    pub fn into_lock(self) -> TaskCacheLock {
-        self.lock
-    }
 }
 
-impl Iterator for &mut TaskArchiveIterator {
+impl Iterator for TaskArchiveIterator<'_> {
     type Item = Result<TaskCacheItem, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
-- 
2.39.5





More information about the pdm-devel mailing list