[pbs-devel] [PATCH proxmox-backup 06/10] server/worker_task: add TaskListInfoIterator

Dominik Csapak d.csapak at proxmox.com
Fri Sep 25 16:13:22 CEST 2020


this is an iterator that reads/parses/updates the task list as
necessary and returns the tasks in descending order (newest first)

it does this by using our logrotate iterator and using a vecdeque

we can use this to iterate over all tasks, even if they are in the
archive and even if the archive is logrotated but only read
as much as we need

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 src/server/worker_task.rs | 82 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 81 insertions(+), 1 deletion(-)

diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 98047814..622453a1 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -1,4 +1,4 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
 use std::fs::File;
 use std::io::{Read, Write, BufRead, BufReader};
 use std::panic::UnwindSafe;
@@ -16,6 +16,7 @@ use tokio::sync::oneshot;
 use proxmox::sys::linux::procfs;
 use proxmox::try_block;
 use proxmox::tools::fs::{create_path, open_file_locked, open_file_locked_shared, replace_file, CreateOptions};
+use proxmox::tools::logrotate::{LogRotate, LogRotateFiles};
 
 use super::UPID;
 
@@ -496,6 +497,85 @@ where
     read_task_file(file)
 }
 
+enum TaskFile {
+    Active,
+    Index,
+    Archive,
+}
+
+pub struct TaskListInfoIterator {
+    list: VecDeque<TaskListInfo>,
+    file: TaskFile,
+    archive: LogRotateFiles,
+    _lock: File,
+}
+
+impl TaskListInfoIterator {
+    pub fn new() -> Result<Self, Error> {
+        let (read_lock, active_list) = {
+            let lock = lock_task_list_files(false)?;
+            let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+
+            let needs_update = active_list
+                .iter()
+                .any(|info| info.state.is_none() && !worker_is_active_local(&info.upid));
+
+            if needs_update {
+                drop(lock);
+                update_active_workers(None)?;
+                let lock = lock_task_list_files(false)?;
+                let active_list = read_task_file_from_path(PROXMOX_BACKUP_ACTIVE_TASK_FN)?;
+                (lock, active_list)
+            } else {
+                (lock, active_list)
+            }
+        };
+        let logrotate = LogRotate::new(PROXMOX_BACKUP_ARCHIVE_TASK_FN, true).ok_or_else(|| format_err!("could not get archive file names"))?;
+
+        Ok(Self {
+            list: active_list.into(),
+            file: TaskFile::Active,
+            archive: logrotate.files(),
+            _lock: read_lock,
+        })
+    }
+}
+
+impl Iterator for TaskListInfoIterator {
+    type Item = Result<TaskListInfo, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            if let Some(element) = self.list.pop_back() {
+                return Some(Ok(element));
+            } else {
+                match self.file {
+                    TaskFile::Active => {
+                        let index = match read_task_file_from_path(PROXMOX_BACKUP_INDEX_TASK_FN) {
+                            Ok(index) => index,
+                            Err(err) => return Some(Err(err)),
+                        };
+                        self.list.append(&mut index.into());
+                        self.file = TaskFile::Index;
+                    },
+                    TaskFile::Index | TaskFile::Archive => {
+                        if let Some(file) = self.archive.next() {
+                            let archive = match read_task_file(file) {
+                                Ok(archive) => archive,
+                                Err(err) => return Some(Err(err)),
+                            };
+                            self.list.append(&mut archive.into());
+                        } else {
+                            return None;
+                        }
+                        self.file = TaskFile::Archive;
+                    }
+                }
+            }
+        }
+    }
+}
+
 /// Launch long running worker tasks.
 ///
 /// A worker task can either be a whole thread, or a simply tokio
-- 
2.20.1






More information about the pbs-devel mailing list