[pdm-devel] [PATCH proxmox-datacenter-manager 4/8] remote tasks: implement improved cache for remote tasks

Lukas Wagner l.wagner at proxmox.com
Fri Mar 14 15:12:21 CET 2025


This commit adds a new implementation for a cache for remote tasks, one
that should improve performance characteristics in for pretty much all
use cases.

In general storage works pretty similar to the task archive we already
have for (local) PDM tasks.

root at pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
total 40
-rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 active
-rw-r--r-- 1 www-data www-data  1676 Mar 11 14:51 archive.1741355462
-rw-r--r-- 1 www-data www-data     0 Mar 11 14:51 archive.1741441862
-rw-r--r-- 1 www-data www-data  2538 Mar 11 14:51 archive.1741528262
-rw-r--r-- 1 www-data www-data  8428 Mar 11 15:07 archive.1741614662
-rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
-rw-r--r-- 1 www-data www-data  3364 Mar 13 13:18 archive.1741788270
-rw-r--r-- 1 www-data www-data   287 Mar 13 13:18 state

Tasks are stored in the 'active' and multiple 'archive' files.
Running tasks are placed into the 'active' file, tasks that are finished
are persisted into one of the archive files.
The archive files are suffixed with a UNIX timestamp which serves
as a lower-bound for start times for tasks stored in this file.
Encoding this lower-bound in the file name instead of using a more
traditional increasing file index (.1, .2, .3) gives us the benefit
that we can decide in which file a newly arrived tasks belongs from
a single readdir call, without even reading the file itself.

The size of the entire archive can be controlled by doing file
'rotation', where the archive file with the oldest timestamp is deleted
and a new file with the current timestamp is added.
If 'rotation' happens at fixed intervals (e.g. once daily), this gives
us a configurable number of days of task history.
There is no direct cap for the total number
of tasks, but this could be easily added by checking the size of the
most recent archive file and by rotating early if a threshold is
exceeded.

The format inside the files is also similar to the local task archive,
with one task corresponding to one line in the file.
One key difference is that here each line is a JSON object, that is to
make it easier to add additional data later, if needed.
The JSON object contains the tasks UPID, status, endtime and starttime
(the starttime is technically also encoded in the UPID, but having it as
a separate value simplified a couple of things)
Each file is sorted by the task's start time, the youngest task coming first.

One key difference between this task cache and the local task archive is
that we need to handle tasks coming in out-of-order, e.g. if remotes
were not reachable for a certain time. To maintain the ordering
in the file, we have to merge the newly arrived tasks into the existing
task file. This was implemented in a way that avoids reading the entire
file into memory at once, exploiting the fact that the contents of the
existing file are already sorted. This allows to do a
zipper/merge-sort like approach (see MergeTaskIterator for
details). The existing file is only read line-by-line and finally
replaced atomically.

The cache also has a separate state file, containing additional
information, e.g. cut-off timestamps for the polling task.
Some of the data in the statefile is technically also contained in the
archive files, but reading the state file is much faster.

This commit also adds an elaborate suite of unit tests for this new
cache. While adding some additional work up front, they paid off themselves
during development quite quickly, since the overall approach for the
cache changed a couple of times. The test suite gave me confidence that
the design changes didn't screw anything up, catching a couple of bugs
in the process.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
Reviewed-by: Max Carrara <m.carrara at proxmox.com>
---
 server/src/remote_tasks/mod.rs        |   2 +
 server/src/remote_tasks/task_cache.rs | 964 ++++++++++++++++++++++++++
 2 files changed, 966 insertions(+)
 create mode 100644 server/src/remote_tasks/task_cache.rs

diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
index 7ded5408..2062f2b7 100644
--- a/server/src/remote_tasks/mod.rs
+++ b/server/src/remote_tasks/mod.rs
@@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
 
 use crate::{api::pve, task_utils};
 
+mod task_cache;
+
 /// Get tasks for all remotes
 // FIXME: filter for privileges
 pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
new file mode 100644
index 00000000..628a9a40
--- /dev/null
+++ b/server/src/remote_tasks/task_cache.rs
@@ -0,0 +1,964 @@
+//! Task cache implementation, based on rotating files.
+
+use std::{
+    cmp::Ordering,
+    collections::{HashMap, HashSet},
+    fs::File,
+    io::{BufRead, BufReader, BufWriter, Lines, Write},
+    iter::Peekable,
+    path::{Path, PathBuf},
+    time::Duration,
+};
+
+use anyhow::Error;
+use proxmox_sys::fs::CreateOptions;
+use serde::{Deserialize, Serialize};
+
+use pdm_api_types::RemoteUpid;
+
+/// Item which can be put into the task cache.
+#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
+pub struct TaskCacheItem {
+    /// The task's UPID
+    pub upid: RemoteUpid,
+    /// The time at which the task was started (seconds since the UNIX epoch).
+    /// Technically this is also contained within the UPID, duplicating it here
+    /// allows us to directly access it when sorting in new tasks, without having
+    /// to parse the UPID.
+    pub starttime: i64,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// The task's status.
+    pub status: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// The task's endtime (seconds since the UNIX epoch).
+    pub endtime: Option<i64>,
+}
+
+/// State needed for task polling.
+#[derive(Serialize, Deserialize, Default)]
+#[serde(rename_all = "kebab-case")]
+pub struct State {
+    /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
+    /// This can be used as a cut-off when requesting new task data.
+    pub most_recent_archive_starttime: HashMap<String, i64>,
+    /// Oldest running task. Useful as another cut-off for fetching tasks.
+    /// The timestamp is based on seconds since the UNIX epoch.
+    pub oldest_active_task: HashMap<String, i64>,
+    /// Tracked tasks per remote.
+    pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
+}
+
+/// Cache for remote tasks.
+#[derive(Clone)]
+pub struct TaskCache {
+    /// Path where the cache's files should be placed.
+    base_path: PathBuf,
+    /// File permissions for the cache's files.
+    create_options: CreateOptions,
+}
+
+/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
+pub struct AddTasks {
+    /// Update most recent archived task in state file.
+    pub update_most_recent_archive_timestamp: bool,
+    /// The tasks to add.
+    pub tasks: Vec<TaskCacheItem>,
+}
+
+/// Lock for the cache.
+#[allow(dead_code)]
+pub struct TaskCacheLock(File);
+
+/// Which tasks to fetch from the archive.
+pub enum GetTasks {
+    /// Get all tasks, finished and running.
+    All,
+    /// Only get running (active) tasks.
+    Active,
+    /// Only get finished (archived) tasks.
+    #[allow(dead_code)]
+    Archived,
+}
+
+impl TaskCache {
+    /// Create a new task cache instance.
+    ///
+    /// Remember to call `init` or `new_file` to create initial archive files before
+    /// adding any tasks.
+    pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
+        Ok(Self {
+            base_path: path.as_ref().into(),
+            create_options,
+        })
+    }
+
+    /// Create initial task archives that can be backfilled with the
+    /// recent task history from a remote.
+    ///
+    /// This function only has an effect if there are no archive files yet.
+    pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
+        let _lock = self.lock(true)?;
+
+        if self.archive_files()?.is_empty() {
+            for i in 0..number_of_files {
+                self.new_file(now - (i * period_per_file) as i64)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Start a new archive file with a given timestamp.
+    /// `now` is supposed to be a UNIX timestamp (seconds).
+    fn new_file(&self, now: i64) -> Result<(), Error> {
+        let new_path = self.base_path.join(format!("archive.{now}"));
+        let mut file = File::create(&new_path)?;
+        self.create_options.apply_to(&mut file, &new_path)?;
+
+        Ok(())
+    }
+
+    /// Lock the cache.
+    fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
+        let lockfile = self.base_path.join(".lock");
+
+        let fd = proxmox_sys::fs::open_file_locked(
+            lockfile,
+            Duration::from_secs(15),
+            exclusive,
+            self.create_options.clone(),
+        )?;
+
+        Ok(TaskCacheLock(fd))
+    }
+
+    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
+    ///
+    /// The oldest archive files are removed if the total number of archive files exceeds
+    /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
+    ///
+    /// This function requests an exclusive lock, don't call if you already hold a lock.
+    pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
+        let _lock = self.lock(true)?;
+        let mut did_rotate = false;
+
+        let mut bounds = self.archive_files()?;
+
+        match bounds.first() {
+            Some(bound) => {
+                if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
+                    self.new_file(now)?;
+                    did_rotate = true;
+                }
+            }
+            None => {
+                self.new_file(now)?;
+                did_rotate = true;
+            }
+        }
+
+        while bounds.len() >= max_files as usize {
+            // Unwrap is safe because of the length check above
+            let to_remove = bounds.pop().unwrap();
+            std::fs::remove_file(&to_remove.file)?;
+        }
+
+        Ok(did_rotate)
+    }
+
+    /// Add new tasks to the task archive.
+    ///
+    /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
+    /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
+    /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
+    /// archive file. If a task which was added as running previously is added again, this time in
+    /// a finished state, it will be removed from the `active` file and also sorted into
+    /// one of the archive files.
+    /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
+    ///
+    /// Crash consistency:
+    ///
+    /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
+    /// end after all tasks have been added into the archive. Adding tasks is an idempotent
+    /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
+    /// task archive. Individual archive files are updated atomically, but since
+    /// adding tasks can involve updating multiple archive files, the archive could end up
+    /// in a partially-updated, inconsistent state in case of a crash.
+    /// However, since the state file with the cut-off timestamps is updated last,
+    /// the consistency of the archive should be restored at the next update cycle of the archive.
+    pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
+        let lock = self.lock(true)?;
+
+        // Get a flat `Vec` of all new tasks
+        let tasks: Vec<_> = added
+            .iter_mut()
+            .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
+            .collect();
+
+        let update_state_for_remote: HashMap<String, bool> = added
+            .into_iter()
+            .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
+            .collect();
+
+        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
+
+        let mut active_tasks =
+            HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
+                Ok(task) => Some((task.upid, task.starttime)),
+                Err(err) => {
+                    log::error!("failed to read task cache entry from active file: {err}");
+                    None
+                }
+            }));
+
+        // Consume the iterator to get back the lock. The lock is held
+        // until _lock is finally dropped at the end of the function.
+        let _lock = task_iter.into_lock();
+
+        active_tasks.extend(
+            tasks
+                .iter()
+                .filter(|task| task.endtime.is_none())
+                .map(|a| (a.upid.clone(), a.starttime)),
+        );
+
+        let mut state = self.read_state();
+        let mut new_finished_tasks = tasks
+            .into_iter()
+            .filter(|task| task.endtime.is_some())
+            .collect::<Vec<_>>();
+
+        new_finished_tasks.sort_by(compare_tasks_reverse);
+        self.merge_tasks_into_archive(
+            new_finished_tasks,
+            &mut active_tasks,
+            update_state_for_remote,
+            &mut state,
+        )?;
+        self.update_oldest_active(&active_tasks, &mut state);
+
+        let mut active: Vec<TaskCacheItem> = active_tasks
+            .into_iter()
+            .map(|(upid, starttime)| TaskCacheItem {
+                upid,
+                starttime,
+                status: None,
+                endtime: None,
+            })
+            .collect();
+
+        active.sort_by(compare_tasks_reverse);
+        self.write_active_tasks(active.into_iter())?;
+        self.write_state(state)?;
+
+        Ok(())
+    }
+
+    /// Update the timestamp of the oldest running task in `state`.
+    fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) {
+        // Update the state with timestamp of the oldest running task,
+        // we also use this as cut-off when fetching tasks, so we
+        // for sure know when a task finishes.
+        let mut oldest_active_task_per_remote = HashMap::new();
+        for (upid, startime) in active_tasks.iter() {
+            oldest_active_task_per_remote
+                .entry(upid.remote().to_owned())
+                .and_modify(|time| *time = (*startime).min(*time))
+                .or_insert(*startime);
+        }
+        state.oldest_active_task = oldest_active_task_per_remote;
+    }
+
+    /// Merge a list of *finished* tasks into the remote task archive files.
+    /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
+    /// timestamp, ascending by UPID).
+    ///
+    /// The task archive should be locked when calling this.
+    fn merge_tasks_into_archive(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+        active_tasks: &mut HashSet<(RemoteUpid, i64)>,
+        update_state_for_remote: HashMap<String, bool>,
+        state: &mut State,
+    ) -> Result<(), Error> {
+        debug_assert!(tasks
+            .iter()
+            .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
+
+        let files = self.archive_files()?;
+
+        let mut files = files.iter().peekable();
+
+        let mut current = files.next();
+        let mut next = files.peek();
+
+        let mut tasks_for_current_file = Vec::new();
+
+        // Tasks are sorted youngest to oldest (biggest starttime first)
+        for task in tasks {
+            active_tasks.remove(&(task.upid.clone(), task.starttime));
+
+            if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) {
+                tracked_tasks.remove(&task.upid);
+            }
+
+            if let Some(true) = update_state_for_remote.get(task.upid.remote()) {
+                // Update the most recent startime per remote, the task polling logic uses it as a
+                // cut-off.
+                state
+                    .most_recent_archive_starttime
+                    .entry(task.upid.remote().to_owned())
+                    .and_modify(|time| *time = (task.starttime).max(*time))
+                    .or_insert(task.starttime);
+            }
+
+            // Skip ahead until we have found the correct file.
+            while next.is_some() {
+                if let Some(current) = current {
+                    if task.starttime >= current.starttime {
+                        break;
+                    }
+                    // The next entry's cut-off is larger then the task's start time, that means
+                    // we want to finalized the current file by merging all tasks that
+                    // should be stored in it...
+                    self.merge_single_archive_file(
+                        std::mem::take(&mut tasks_for_current_file),
+                        &current.file,
+                    )?;
+                }
+
+                // ... and the the `current` file to the next entry.
+                current = files.next();
+                next = files.peek();
+            }
+
+            if let Some(current) = current {
+                if task.starttime < current.starttime {
+                    continue;
+                }
+            }
+            tasks_for_current_file.push(task);
+        }
+
+        // Merge tasks for the last file.
+        if let Some(current) = current {
+            self.merge_single_archive_file(tasks_for_current_file, &current.file)?;
+        }
+
+        Ok(())
+    }
+
+    /// Add a new tracked task.
+    ///
+    /// This will insert the task in the list of tracked tasks in the state file,
+    /// as well as create an entry in the `active` file.
+    ///
+    /// This function will request an exclusive lock for the task cache,
+    /// do not call if you are already holding a lock.
+    pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
+        let lock = self.lock(true)?;
+
+        let mut tasks = Vec::new();
+        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
+
+        for task in &mut task_iter {
+            match task {
+                Ok(task) => tasks.push(task),
+                Err(err) => {
+                    log::error!(
+                        "could not read existing task cache entry from 'active' file: {err}"
+                    );
+                }
+            }
+        }
+
+        tasks.push(task.clone());
+        tasks.sort_by(compare_tasks_reverse);
+
+        let _lock = task_iter.into_lock();
+
+        let mut state = self.read_state();
+
+        state
+            .oldest_active_task
+            .entry(task.upid.remote().to_owned())
+            .and_modify(|a| *a = (*a).min(task.starttime))
+            .or_insert(task.starttime);
+
+        let tracked_per_remote = state
+            .tracked_tasks
+            .entry(task.upid.remote().to_owned())
+            .or_default();
+        tracked_per_remote.insert(task.upid);
+
+        self.write_active_tasks(tasks.into_iter())?;
+        self.write_state(state)?;
+
+        Ok(())
+    }
+
+    /// Iterate over cached tasks.
+    ///
+    /// This function will request a non-exclusive read-lock, don't call if
+    /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
+    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
+        let lock = self.lock(false)?;
+        self.get_tasks_with_lock(mode, lock)
+    }
+
+    /// Iterate over cached tasks.
+    ///
+    /// This function requires you to pass a lock. If you want to continue to hold the lock
+    /// after iterating, you can consume the iterator by calling
+    /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
+    pub fn get_tasks_with_lock(
+        &self,
+        mode: GetTasks,
+        lock: TaskCacheLock,
+    ) -> Result<TaskArchiveIterator, Error> {
+        match mode {
+            GetTasks::All => {
+                let mut files = vec![ArchiveFile {
+                    starttime: 0,
+                    file: self.base_path.join("active"),
+                }];
+
+                let archive_files = self.archive_files()?;
+                files.extend(archive_files);
+
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+            GetTasks::Active => {
+                let files = vec![ArchiveFile {
+                    starttime: 0,
+                    file: self.base_path.join("active"),
+                }];
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+            GetTasks::Archived => {
+                let files = self.archive_files()?;
+
+                Ok(TaskArchiveIterator::new(
+                    Box::new(files.into_iter().map(|pair| pair.file)),
+                    lock,
+                ))
+            }
+        }
+    }
+
+    /// Write the provided tasks to the 'active' file.
+    ///
+    /// The tasks are first written to a temporary file, which is then used
+    /// to atomically replace the original.
+    fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
+        let (fd, path) = proxmox_sys::fs::make_tmp_file(
+            self.base_path.join("active"),
+            self.create_options.clone(),
+        )?;
+        let mut fd = BufWriter::new(fd);
+
+        Self::write_tasks(&mut fd, tasks)?;
+
+        if let Err(err) = fd.flush() {
+            log::error!("could not flush 'active' file: {err}");
+        }
+        drop(fd);
+
+        std::fs::rename(path, self.base_path.join("active"))?;
+
+        Ok(())
+    }
+
+    /// Read the state file.
+    /// If the state file could not be read or does not exist, the default (empty) state
+    /// is returned.
+    /// A lock is only necessary if the returned state is modified and later passed
+    /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as
+    /// the state file is always replaced atomically.
+    pub fn read_state(&self) -> State {
+        fn do_read_state(path: &Path) -> Result<State, Error> {
+            let data = proxmox_sys::fs::file_read_optional_string(path)?;
+            match data {
+                Some(data) => Ok(serde_json::from_str(&data)?),
+                None => Ok(Default::default()),
+            }
+        }
+
+        let path = self.base_path.join("state");
+
+        do_read_state(&path).unwrap_or_else(|err| {
+            log::error!("could not read state file: {err}");
+            Default::default()
+        })
+    }
+
+    /// Write the state file.
+    /// The task archive should be locked for writing when calling this function.
+    pub fn write_state(&self, state: State) -> Result<(), Error> {
+        let path = self.base_path.join("state");
+
+        let data = serde_json::to_vec_pretty(&state)?;
+
+        proxmox_sys::fs::replace_file(path, &data, self.create_options.clone(), true)?;
+
+        Ok(())
+    }
+
+    /// Returns a list of existing archive files, together with their respective
+    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
+    /// first).
+    /// The task archive should be locked for reading when calling this function.
+    fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> {
+        let mut names = Vec::new();
+
+        for entry in std::fs::read_dir(&self.base_path)? {
+            let entry = entry?;
+
+            if let Some(endtime) = entry
+                .path()
+                .file_name()
+                .and_then(|s| s.to_str())
+                .and_then(|s| s.strip_prefix("archive."))
+            {
+                match endtime.parse() {
+                    Ok(starttime) => {
+                        names.push(ArchiveFile {
+                            starttime,
+                            file: entry.path(),
+                        });
+                    }
+                    Err(err) => log::error!("could not parse archive timestamp: {err}"),
+                }
+            }
+        }
+
+        names.sort_by_key(|e| -e.starttime);
+
+        Ok(names)
+    }
+
+    /// Merge `tasks` with an existing archive file.
+    /// This function assumes that `tasks` and the pre-existing contents of the archive
+    /// file are both sorted descending by startime (most recent tasks come first).
+    /// The task archive must be locked when calling this function.
+    fn merge_single_archive_file(
+        &self,
+        tasks: Vec<TaskCacheItem>,
+        file: &Path,
+    ) -> Result<(), Error> {
+        if tasks.is_empty() {
+            return Ok(());
+        }
+
+        let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options.clone())?;
+        let mut fd = BufWriter::new(fd);
+
+        if file.exists() {
+            let archive_reader = BufReader::new(File::open(file)?);
+            let archive_iter = ArchiveIterator::new(archive_reader)
+                .flat_map(|item| match item {
+                    Ok(item) => Some(item),
+                    Err(err) => {
+                        log::error!("could not read task cache item while merging: {err}");
+                        None
+                    }
+                })
+                .peekable();
+            let task_iter = tasks.into_iter().peekable();
+
+            Self::write_tasks(&mut fd, MergeTaskIterator::new(archive_iter, task_iter))?;
+        } else {
+            Self::write_tasks(&mut fd, tasks.into_iter())?;
+        }
+
+        if let Err(err) = fd.flush() {
+            log::error!("could not flush BufWriter for {file:?}: {err}");
+        }
+        drop(fd);
+
+        if let Err(err) = std::fs::rename(&new_path, file) {
+            log::error!("could not replace archive file {new_path:?}: {err}");
+        }
+
+        Ok(())
+    }
+
+    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
+    /// The individual items are encoded as JSON followed by a newline.
+    /// The task archive should be locked when calling this function.
+    fn write_tasks(
+        writer: &mut impl Write,
+        tasks: impl Iterator<Item = TaskCacheItem>,
+    ) -> Result<(), Error> {
+        for task in tasks {
+            serde_json::to_writer(&mut *writer, &task)?;
+            writeln!(writer)?;
+        }
+
+        Ok(())
+    }
+}
+
+/// Comparison function for sorting tasks.
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+    a.starttime
+        .cmp(&b.starttime)
+        .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
+}
+
+/// Comparison function for sorting tasks, reversed
+/// The tasks are compared based on the task's start time, falling
+/// back to the task's UPID as a secondary criterion in case the
+/// start times are equal.
+pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
+    compare_tasks(a, b).reverse()
+}
+
+/// Iterator over the task archive.
+pub struct TaskArchiveIterator {
+    /// Archive files to read.
+    files: Box<dyn Iterator<Item = PathBuf>>,
+    /// Archive iterator we are currently using, if any
+    current: Option<ArchiveIterator<BufReader<File>>>,
+    /// Lock for this archive.
+    lock: TaskCacheLock,
+}
+
+impl TaskArchiveIterator {
+    /// Create a new task archive iterator.
+    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
+        Self {
+            files,
+            current: None,
+            lock,
+        }
+    }
+
+    /// Return the task archive lock, consuming `self`.
+    pub fn into_lock(self) -> TaskCacheLock {
+        self.lock
+    }
+}
+
+impl Iterator for &mut TaskArchiveIterator {
+    type Item = Result<TaskCacheItem, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        loop {
+            match &mut self.current {
+                Some(current) => {
+                    let next = current.next();
+                    if next.is_some() {
+                        return next;
+                    } else {
+                        self.current = None;
+                    }
+                }
+                None => 'inner: loop {
+                    // Returns `None` if no more files are available, stopping iteration.
+                    let next_file = self.files.next()?;
+
+                    match File::open(&next_file) {
+                        Ok(file) => {
+                            let archive_reader = BufReader::new(file);
+                            let archive_iter = ArchiveIterator::new(archive_reader);
+                            self.current = Some(archive_iter);
+                            break 'inner;
+                        }
+                        Err(err) => {
+                            log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}")
+                        }
+                    }
+                },
+            }
+        }
+    }
+}
+
+/// Archive file.
+#[derive(Clone, Debug)]
+struct ArchiveFile {
+    /// The path to the archive file.
+    file: PathBuf,
+    /// The archive's lowest permitted starttime (seconds since UNIX epoch).
+    starttime: i64,
+}
+
+/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
+/// from both iterators sorted.
+/// The two iterators are expected to be sorted descendingly based on the task's starttime and
+/// ascendingly based on the task's UPID's string representation. This can be
+/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
+pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
+    left: Peekable<T>,
+    right: Peekable<U>,
+}
+
+impl<T, U> MergeTaskIterator<T, U>
+where
+    T: Iterator<Item = TaskCacheItem>,
+    U: Iterator<Item = TaskCacheItem>,
+{
+    /// Create a new merging iterator.
+    pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
+        Self { left, right }
+    }
+}
+
+impl<T, U> Iterator for MergeTaskIterator<T, U>
+where
+    T: Iterator<Item = TaskCacheItem>,
+    U: Iterator<Item = TaskCacheItem>,
+{
+    type Item = T::Item;
+
+    fn next(&mut self) -> Option<T::Item> {
+        let order = match (self.left.peek(), self.right.peek()) {
+            (Some(l), Some(r)) => Some(compare_tasks(l, r)),
+            (Some(_), None) => Some(Ordering::Greater),
+            (None, Some(_)) => Some(Ordering::Less),
+            (None, None) => None,
+        };
+
+        match order {
+            Some(Ordering::Greater) => self.left.next(),
+            Some(Ordering::Less) => self.right.next(),
+            Some(Ordering::Equal) => {
+                // Dedup by consuming the other iterator as well
+                let _ = self.right.next();
+                self.left.next()
+            }
+            None => None,
+        }
+    }
+}
+
+/// Iterator for a single task archive file.
+///
+/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
+/// tasks are read line by line, without leading the entire archive file into memory.
+pub struct ArchiveIterator<T> {
+    iter: Lines<T>,
+}
+
+impl<T: BufRead> ArchiveIterator<T> {
+    /// Create a new iterator.
+    pub fn new(file: T) -> Self {
+        let reader = file.lines();
+
+        Self { iter: reader }
+    }
+}
+
+impl<T: BufRead> Iterator for ArchiveIterator<T> {
+    type Item = Result<TaskCacheItem, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.iter.next().map(|result| {
+            result
+                .and_then(|line| Ok(serde_json::from_str(&line)?))
+                .map_err(Into::into)
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Cursor;
+
+    use crate::test_support::temp::NamedTempDir;
+
+    use super::*;
+
+    #[test]
+    fn archive_iterator() -> Result<(), Error> {
+        let file = r#"
+            {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
+            {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
+            invalid"#
+            .trim_start();
+
+        let cursor = Cursor::new(file.as_bytes());
+        let mut iter = ArchiveIterator::new(cursor);
+
+        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
+        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
+        assert!(iter.next().unwrap().is_err());
+        assert!(iter.next().is_none());
+
+        Ok(())
+    }
+
+    fn task(starttime: i64, ended: bool) -> TaskCacheItem {
+        let (status, endtime) = if ended {
+            (Some("OK".into()), Some(starttime + 10))
+        } else {
+            (None, None)
+        };
+
+        TaskCacheItem {
+            upid: format!(
+                "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root at pam:"
+            )
+            .parse()
+            .unwrap(),
+            starttime,
+            status,
+            endtime,
+        }
+    }
+
+    fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) {
+        let tasks: Vec<i64> = cache
+            .get_tasks(GetTasks::All)
+            .unwrap()
+            .map(|task| task.unwrap().starttime)
+            .collect();
+
+        assert_eq!(&tasks, starttimes);
+    }
+
+    fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
+        let param = AddTasks {
+            update_most_recent_archive_timestamp: true,
+            tasks,
+        };
+        let mut a = HashMap::new();
+        a.insert("pve-remote".into(), param);
+
+        cache.add_tasks(a)
+    }
+
+    #[test]
+    fn test_add_tasks() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.new_file(1000)?;
+        assert_eq!(cache.archive_files()?.len(), 1);
+
+        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+        cache.rotate(1500, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 2);
+
+        add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
+        add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
+
+        cache.rotate(2000, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        add_tasks(&cache, vec![task(2000, true)])?;
+        add_tasks(&cache, vec![task(1502, true)])?;
+        add_tasks(&cache, vec![task(1002, true)])?;
+
+        // These are before the cut-off of 1000, so they will be discarded.
+        add_tasks(&cache, vec![task(800, true), task(900, true)])?;
+
+        // This one should be deduped
+        add_tasks(&cache, vec![task(1000, true)])?;
+
+        assert_starttimes(
+            &cache,
+            &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
+        );
+
+        cache.rotate(2500, 0, 3)?;
+
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
+
+        cache.rotate(3000, 0, 3)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        assert_starttimes(&cache, &[2000]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.new_file(1000)?;
+        add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+        let state = cache.read_state();
+        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000);
+
+        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
+
+        assert_starttimes(&cache, &[1001, 1000]);
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_init() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.init(1000, 3, 100)?;
+        assert_eq!(cache.archive_files()?.len(), 3);
+
+        add_tasks(
+            &cache,
+            vec![
+                task(1050, true),
+                task(950, true),
+                task(850, true),
+                task(750, true), // This one is discarded
+            ],
+        )?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_tracking_tasks() -> Result<(), Error> {
+        let tmp_dir = NamedTempDir::new()?;
+        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
+
+        cache.init(1000, 3, 100)?;
+
+        cache.add_tracked_task(task(1050, false))?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+        cache.add_tracked_task(task(1060, false))?;
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
+
+        let state = cache.read_state();
+        assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2);
+        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050);
+
+        // Mark first task as finished
+        add_tasks(&cache, vec![task(1050, true)])?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
+
+        // Mark second task as finished
+        add_tasks(&cache, vec![task(1060, true)])?;
+
+        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
+        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
+
+        Ok(())
+    }
+}
-- 
2.39.5





More information about the pdm-devel mailing list