[pdm-devel] [PATCH proxmox-datacenter-manager v6 1/6] remote tasks: implement improved cache for remote tasks

Dominik Csapak d.csapak at proxmox.com
Wed Aug 20 09:50:54 CEST 2025


after looking a bit more thorough there are a few (again minor) things
i found

other than these (and the things in my previous reply), this patch LGTM


comments inline

On 8/14/25 09:56, Lukas Wagner wrote:
> This commit adds a new implementation for a cache for remote tasks, one
> that should improve performance characteristics in for pretty much all
> use cases.
> 
> In general storage works pretty similar to the task archive we already
> have for (local) PDM tasks.
> 
> root at pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
> total 40
> -rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 active
> -rw-r--r-- 1 www-data www-data  1676 Mar 11 14:51 archive.1741355462.zst
> -rw-r--r-- 1 www-data www-data     0 Mar 11 14:51 archive.1741441862.zst
> -rw-r--r-- 1 www-data www-data  2538 Mar 11 14:51 archive.1741528262.zst
> -rw-r--r-- 1 www-data www-data  8428 Mar 11 15:07 archive.1741614662.zst
> -rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
> -rw-r--r-- 1 www-data www-data  3364 Mar 13 13:18 archive.1741788270
> -rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 journal
> -rw-r--r-- 1 www-data www-data   287 Mar 13 13:18 state
> 
> Tasks are stored in the 'active' and multiple 'archive' files. Running
> tasks are placed into the 'active' file, tasks that are finished are
> persisted into one of the archive files. The archive files are suffixed
> with a UNIX timestamp which serves as a lower-bound for start times for
> tasks stored in this file. Encoding this lower-bound in the file name
> instead of using a more traditional increasing file index (.1, .2, .3)
> gives us the benefit that we can decide in which file a newly arrived
> tasks belongs from a single readdir call, without even reading the file
> itself.
> 
> Older archive files are additionally compressed using zstd.
> 
> The size of the entire archive can be controlled by doing file
> 'rotation', where the archive file with the oldest timestamp is deleted
> and a new file with the current timestamp is added. If 'rotation'
> happens at fixed intervals (e.g. once daily), this gives us a
> configurable number of days of task history. There is no direct cap for
> the total number of tasks, but this could be easily added by checking
> the size of the most recent archive file and by rotating early if a
> threshold is exceeded.
> 
> The format inside the files is also similar to the local task archive,
> with one task corresponding to one line in the file. One key difference
> is that here each line is a JSON object, that is to make it easier to
> add additional data later, if needed. The JSON object contains the tasks
> UPID, status, endtime and starttime (the starttime is technically also
> encoded in the UPID, but having it as a separate value simplified a
> couple of things) Each file is sorted by the task's start time, the
> youngest task coming first.
> 
> One key difference between this task cache and the local task archive is
> that we need to handle tasks coming in out-of-order, e.g. if remotes
> were not reachable for a certain time. To maintain the ordering in the
> file, we have to merge the newly arrived tasks into the existing task
> file. This was implemented in a way that avoids reading the entire file
> into memory at once, exploiting the fact that the contents of the
> existing file are already sorted. This allows to do a zipper/merge-sort
> like approach (see MergeTaskIterator for details). The existing file is
> only read line-by-line and finally replaced atomically.
> 
> Since we always need to rewrite the entire archive file, even if there
> is only a single new task added, there is also a journal/WAL file.
> When adding new tasks, they will be first appended to the journal file
> and then later applied to the actual archive files. The journal is
> applied after a certain time or if the journal file itself grows too
> large.
> 
> The cache also has a separate state file, containing additional
> information, e.g. cut-off timestamps for the polling task. Some of the
> data in the statefile is technically also contained in the archive
> files, but reading the state file is much faster.
> 
> This commit also adds an elaborate suite of unit tests for this new
> cache. While adding some additional work up front, they paid off
> themselves during development quite quickly, since the overall approach
> for the cache changed a couple of times. The test suite gave me
> confidence that the design changes didn't screw anything up, catching a
> couple of bugs in the process.
> 
> Finally, some concrete numbers about the performance improvments.
> Benchmarking was done using the 'fake-remote' feature. There were 100
> remotes, 10 PVE nodes per remote. The task cache contained
> about 1.5 million tasks.
>                                                before         after
> list of active tasks (*):                     ~1.3s         ~300µs
> list of 500 tasks, offset 0 (**):             ~1.3s         ~1.5ms
> list of 500 tasks, offset 1 million (***):    ~1.3s         ~200ms
> list of 500 tasks, offset 0,
>      2000 tasks in journal (****):                           ~4.5ms
> Size on disk:                                 ~500MB        ~40MB
> 
> (*):  Requested by the UI every 3s
> (**): Requested by the UI when visiting Remotes > Tasks
> (***): E.g. when scrolling towards the bottom of 'Remotes > Tasks'
> (****): e.g. when the journal has not been applied for a while. Reading tasks
>         from the journal is a bit less efficient than from the task archive, since
>         we have to fully load it into memory so that we can sort the tasks and
>         also  remove potential duplicates
> 
> In the old implementation, the archive file was *always* fully
> deserialized and loaded into RAM, this is the reason why the time needed
> is pretty idential for all scenarios. The new implementation reads the
> archive files only line by line, and only 500 tasks were loaded into RAM
> at the same time. The higher the offset, the more archive lines/files we
> have to scan, which increases the time needed to access the data. The
> tasks are sorted descending by starttime, as a result the requests get
> slower the further you go back in history.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> 
> Notes:
>      Changes since v5:
>        - Change format of state file, include a per-node cutoff time
>        - Change locking approach, using separate types to represent
>          a locked cached (ReadableTaskCache, WritableTaskCache)
>          (as suggested by Wolfgang and Dominik independently)
>        - Rename 'add_tasks' to 'update', since it also drops
>          tracked tasks from the state file, moves tasks from
>          the active file to the archive, etc.
>      
>        - Instead of adding new tasks directly to the archive, they
>          are added to an append-only journal at first. This allows us to
>          better recover from a crash/shutdown while we are adding tasks,
>          as well as reduce disk writes. The latter is due to the fact that
>          we have to rewrite the archive file in its entirety, even if
>          only a single task is added.
>      
>        - Use zstd compression for older archive files, greatly reducing
>          space consumption
>      
>        - Some general refactoring to make the code a bit nicer
>      
>      Changes since v4:
>        - Rebased
>      
>      Changes since v3:
>        - Included benchmark results in the commit message
>        - Drop `pub` for TaskCache::write_state
>      
>      Changes since v2:
>        - Incorporate feedback from Wolfang (thx!)
>           - eliminate one .clone()
>           - get_tasks_with_lock: directly create iterator over PathBuf
>             instead of first creating ArchiveFile vec and the mapping
>           - Fixed TOCTOU race condition (checked Path::exists instead of
>             trying the actual operation and reacting to ErrorKind::NotFound)
>           - handle error when replacing 'active' file
>           - unlink temp file when replacing the archive file did not work
>           - avoid UTF8 decoding where not really necessary
>        - Added a couple of .context()/.with_context() for better error
>          messages
>        - Extracted the file names into consts
>        - Fixed some clippy issues (.clone() for CreateOptions - when
>          this series was written, CreateOptions were not Copy yet)
> 
>   Cargo.toml                            |    2 +-
>   server/Cargo.toml                     |    1 +
>   server/src/remote_tasks/mod.rs        |    2 +
>   server/src/remote_tasks/task_cache.rs | 1486 +++++++++++++++++++++++++
>   4 files changed, 1490 insertions(+), 1 deletion(-)
>   create mode 100644 server/src/remote_tasks/task_cache.rs
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 658c53a7..08b93737 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -128,7 +128,7 @@ url = "2.1"
>   walkdir = "2"
>   webauthn-rs-core = "0.5"
>   xdg = "2.2"
> -zstd = { version = "0.12", features = [ "bindgen" ] }
> +zstd = { version = "0.13" }
>   
>   # Local path overrides
>   # NOTE: You must run `cargo update` after changing this for it to take effect!
> diff --git a/server/Cargo.toml b/server/Cargo.toml
> index 2d1fa5d2..24a2e40c 100644
> --- a/server/Cargo.toml
> +++ b/server/Cargo.toml
> @@ -32,6 +32,7 @@ tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "n
>   tokio-stream.workspace = true
>   tracing.workspace = true
>   url.workspace = true
> +zstd.workspace = true
>   
>   proxmox-access-control = { workspace = true, features = [ "impl" ] }
>   proxmox-async.workspace = true
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 234ffa76..7c8e31ef 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
>   
>   use crate::{api::pve, task_utils};
>   
> +mod task_cache;
> +
>   /// Get tasks for all remotes
>   // FIXME: filter for privileges
>   pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> new file mode 100644
> index 00000000..1badc422
> --- /dev/null
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -0,0 +1,1486 @@
> +//! Task cache implementation, based on rotating files.
> +use std::{
> +    cmp::Ordering,
> +    collections::{HashMap, HashSet},
> +    fs::{File, OpenOptions},
> +    io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
> +    iter::Peekable,
> +    os::unix::fs::MetadataExt,
> +    path::{Path, PathBuf},
> +    time::{Duration, Instant},
> +};
> +
> +use anyhow::{Context, Error};
> +use serde::{Deserialize, Serialize};
> +
> +use proxmox_sys::fs::CreateOptions;
> +
> +use pdm_api_types::RemoteUpid;
> +use pve_api_types::PveUpid;
> +
> +/// Filename for the file containing running tasks.
> +const ACTIVE_FILENAME: &str = "active";
> +/// Filename prefix for archive files.
> +const ARCHIVE_FILENAME_PREFIX: &str = "archive.";
> +/// Filename for the state file.
> +const STATE_FILENAME: &str = "state";
> +/// Filename of the archive lockfile.
> +const LOCKFILE_FILENAME: &str = ".lock";
> +/// Write-ahead log.
> +const WAL_FILENAME: &str = "journal";
> +
> +/// File name extension for zstd compressed archive files
> +const ZSTD_EXTENSION: &str = "zst";
> +const ZSTD_EXTENSION_WITH_DOT: &str = ".zst";
> +
> +/// Item which can be put into the task cache.
> +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
> +#[serde(rename_all = "kebab-case")]
> +pub struct TaskCacheItem {
> +    /// The task's UPID
> +    pub upid: RemoteUpid,
> +    /// The time at which the task was started (seconds since the UNIX epoch).
> +    /// Technically this is also contained within the UPID, duplicating it here
> +    /// allows us to directly access it when sorting in new tasks, without having
> +    /// to parse the UPID.
> +    pub starttime: i64,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    /// The task's status.
> +    pub status: Option<String>,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    /// The task's endtime (seconds since the UNIX epoch).
> +    pub endtime: Option<i64>,
> +}
> +
> +#[derive(Serialize, Deserialize, Default)]
> +#[serde(rename_all = "kebab-case")]
> +/// Per remote state.
> +struct RemoteState {
> +    /// Per-node state for this remote.
> +    node_state: HashMap<String, NodeState>,
> +}
> +
> +#[derive(Serialize, Deserialize, Default)]
> +#[serde(rename_all = "kebab-case")]
> +struct NodeState {
> +    /// Cutoff timestamp for this node when fetching archived tasks.
> +    cutoff: i64,
> +}
> +
> +/// State needed for task polling.
> +#[derive(Serialize, Deserialize, Default)]
> +#[serde(rename_all = "kebab-case")]
> +pub struct State {
> +    /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
> +    /// This can be used as a cut-off when requesting new task data.
> +    #[serde(default)]
> +    remote_state: HashMap<String, RemoteState>,
> +    /// Tracked tasks which are polled in short intervals.
> +    #[serde(default)]
> +    tracked_tasks: HashSet<RemoteUpid>,
> +}
> +
> +impl State {
> +    /// Get tracked tasks.
> +    pub fn tracked_tasks(&self) -> impl Iterator<Item = &RemoteUpid> {
> +        self.tracked_tasks.iter()
> +    }
> +
> +    /// Get the cutoff timestamp for a node of a remote.
> +    pub fn cutoff_timestamp(&self, remote_id: &str, node: &str) -> Option<i64> {
> +        self.remote_state
> +            .get(remote_id)
> +            .and_then(|remote_state| remote_state.node_state.get(node))
> +            .map(|state| state.cutoff)
> +    }
> +
> +    /// Add a new tracked task.
> +    fn add_tracked_task(&mut self, upid: RemoteUpid) {
> +        self.tracked_tasks.insert(upid);
> +    }
> +
> +    /// Remove a tracked task.
> +    fn remove_tracked_task(&mut self, upid: &RemoteUpid) {
> +        self.tracked_tasks.remove(upid);
> +    }
> +
> +    /// Update the per-node cutoff timestamp if it is higher than the current one.
> +    fn update_cutoff_timestamp(&mut self, remote_id: &str, node: &str, starttime: i64) {
> +        match self.remote_state.get_mut(remote_id) {
> +            Some(remote_state) => match remote_state.node_state.get_mut(node) {
> +                Some(node_state) => {
> +                    node_state.cutoff = node_state.cutoff.max(starttime);
> +                }
> +                None => {
> +                    remote_state
> +                        .node_state
> +                        .insert(node.to_string(), NodeState { cutoff: starttime });
> +                }
> +            },
> +            None => {
> +                let node_state =
> +                    HashMap::from_iter([(node.to_string(), NodeState { cutoff: starttime })]);
> +
> +                self.remote_state
> +                    .insert(remote_id.to_string(), RemoteState { node_state });
> +            }
> +        }
> +    }
> +}
> +
> +/// Cache for remote tasks.
> +#[derive(Clone)]
> +pub struct TaskCache {
> +    /// Path where the cache's files should be placed.
> +    base_path: PathBuf,
> +    /// File permissions for the cache's files.
> +    create_options: CreateOptions,
> +
> +    /// Maximum size of the journal. If it grows larger than size after
> +    /// tasks have been added, it will be applied immediately.
> +    journal_max_size: u64,
> +
> +    /// Maximum number of archive files. If the archive is rotated and `max_files` is exceeded, the
> +    /// oldest fill we dropped
> +    max_files: u32,
> +
> +    /// Number of uncompressed archive files to keep. These will be the most recent ones.
> +    uncompressed_files: u32,
> +
> +    /// Rotate archive file if it is older than this number of seconds.
> +    rotate_after: u64,
> +}
> +
> +/// A [`TaskCache`] locked for writing.
> +pub struct WritableTaskCache {
> +    cache: TaskCache,
> +    lock: TaskCacheLock,
> +}
> +
> +/// A [`TaskCache`] locked for reading.
> +pub struct ReadableTaskCache {
> +    cache: TaskCache,
> +    lock: TaskCacheLock,
> +}
> +
> +/// Lock for the cache.
> +#[allow(dead_code)]
> +struct TaskCacheLock(File);
> +
> +/// Which tasks to fetch from the archive.
> +pub enum GetTasks {
> +    /// Get all tasks, finished and running.
> +    All,
> +    /// Only get running (active) tasks.
> +    Active,
> +    #[cfg(test)] // Used by tests, might be used by production code in the future
> +    /// Only get finished (archived) tasks.
> +    Archived,
> +}
> +
> +/// Map that stores whether a remote node's tasks were successfully
> +/// fetched.
> +#[derive(Default)]
> +pub struct NodeFetchSuccessMap(HashMap<(String, String), bool>);
> +
> +impl NodeFetchSuccessMap {
> +    /// Mark a node of a given remote as successful.
> +    pub fn set_node_success(&mut self, remote: String, node: String) {
> +        self.0.insert((remote, node), true);
> +    }
> +
> +    /// Mark a node of a given remote as failed.
> +    pub fn set_node_failure(&mut self, remote: String, node: String) {
> +        self.0.insert((remote, node), false);
> +    }
> +
> +    /// Returns whether tasks from a given node of a remote were successfully fetched.
> +    pub fn node_successful(&self, remote: &str, node: &str) -> bool {
> +        matches!(self.0.get(&(remote.into(), node.into())), Some(true))
> +    }
> +
> +    /// Merge this map with another.
> +    pub fn merge(&mut self, other: Self) {
> +        self.0.extend(other.0);
> +    }
> +}
> +
> +impl ReadableTaskCache {
> +    /// Iterate over cached tasks.
> +    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
> +        self.cache
> +            .get_tasks_impl(mode, &self.lock)
> +            .context("failed to create task archive iterator")
> +    }
> +}
> +
> +impl WritableTaskCache {
> +    /// Create initial task archives that can be backfilled with the
> +    /// recent task history from a remote.
> +    ///
> +    /// This function only has an effect if there are no archive files yet.
> +    pub fn init(&self, now: i64) -> Result<(), Error> {
> +        if self.cache.archive_files(&self.lock)?.is_empty() {
> +            for i in 0..self.cache.max_files {
> +                self.new_file(
> +                    now - (i as u64 * self.cache.rotate_after) as i64,
> +                    i >= self.cache.uncompressed_files,
> +                )?;
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Start a new archive file with a given timestamp.
> +    /// `now` is supposed to be a UNIX timestamp (seconds).
> +    fn new_file(&self, now: i64, compress: bool) -> Result<ArchiveFile, Error> {
> +        let suffix = if compress {
> +            ZSTD_EXTENSION_WITH_DOT
> +        } else {
> +            ""
> +        };
> +
> +        let new_path = self
> +            .cache
> +            .base_path
> +            .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}{suffix}"));
> +
> +        let mut file = File::create(&new_path)?;
> +        self.cache.create_options.apply_to(&mut file, &new_path)?;
> +
> +        if compress {
> +            let encoder = zstd::stream::write::Encoder::new(file, zstd::DEFAULT_COMPRESSION_LEVEL)?;
> +            encoder.finish()?;
> +        }
> +
> +        Ok(ArchiveFile {
> +            path: new_path,
> +            compressed: compress,
> +            starttime: now,
> +        })
> +    }
> +
> +    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
> +    ///
> +    /// The oldest archive files are removed if the total number of archive files exceeds
> +    /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
> +    pub fn rotate(&self, now: i64) -> Result<bool, Error> {
> +        let mut did_rotate = false;
> +        let mut archive_files = self.cache.archive_files(&self.lock)?;
> +
> +        match archive_files.first() {
> +            Some(bound) => {
> +                if now > bound.starttime && now - bound.starttime > self.cache.rotate_after as i64 {
> +                    let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?;
> +                    archive_files.insert(0, new_file);
> +                    did_rotate = true;
> +
> +                    self.apply_journal()?;
> +                }
> +            }
> +            None => {
> +                let new_file = self.new_file(now, self.cache.uncompressed_files == 0)?;
> +                archive_files.insert(0, new_file);
> +                did_rotate = true;
> +
> +                self.apply_journal()?;
> +            }
> +        }
> +
> +        let mut archive_files = self.cache.archive_files(&self.lock)?;
> +
> +        while archive_files.len() > self.cache.max_files as usize {
> +            // Unwrap is safe because of the length check above
> +            let to_remove = archive_files.pop().unwrap();
> +            std::fs::remove_file(&to_remove.path)
> +                .with_context(|| format!("failed to remove {}", to_remove.path.display()))?;
> +        }
> +
> +        for file in archive_files
> +            .iter_mut()
> +            .skip(self.cache.uncompressed_files as usize)
> +        {
> +            if !file.compressed {
> +                file.compress(self.cache.create_options)
> +                    .with_context(|| format!("failed to compress {}", file.path.display()))?;
> +            }
> +        }
> +
> +        Ok(did_rotate)
> +    }
> +
> +    /// Iterate over cached tasks.
> +    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator<'_>, Error> {
> +        self.cache
> +            .get_tasks_impl(mode, &self.lock)
> +            .context("failed to create task archive iterator")
> +    }
> +
> +    /// Update task cache contents.
> +    ///
> +    /// This is mostly used for adding new tasks to tasks to the cache, but
> +    /// will also handle dropping finished/failed tracked tasks from the
> +    /// state file and active file. This is done so that we don't have to update
> +    /// these files multiple times.
> +    ///
> +    /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
> +    /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
> +    /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
> +    /// archive file. If a task which was added as running previously is added again, this time in
> +    /// a finished state, it will be removed from the `active` file and also sorted into
> +    /// one of the archive files.
> +    /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
> +    ///
> +    /// Crash consistency:
> +    ///
> +    /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
> +    /// end after all tasks have been added into the archive. Adding tasks is an idempotent
> +    /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
> +    /// task archive. Individual archive files are updated atomically, but since
> +    /// adding tasks can involve updating multiple archive files, the archive could end up
> +    /// in a partially-updated, inconsistent state in case of a crash.
> +    /// However, since the state file with the cut-off timestamps is updated last,
> +    /// the consistency of the archive should be restored at the next update cycle of the archive.
> +    pub fn update(
> +        &self,
> +        new_tasks: Vec<TaskCacheItem>,
> +        update_state_for_remote: &NodeFetchSuccessMap,
> +        drop_tracked: HashSet<RemoteUpid>,
> +    ) -> Result<(), Error> {
> +        let task_iter = self
> +            .get_tasks(GetTasks::Active)
> +            .context("failed to create archive iterator for active tasks")?;
> +
> +        let mut active_tasks = HashMap::from_iter(task_iter.filter_map(|task| {
> +            if !drop_tracked.contains(&task.upid) {
> +                Some((task.upid.clone(), task))
> +            } else {
> +                None
> +            }
> +        }));
> +
> +        let mut new_finished_tasks = Vec::new();
> +
> +        for task in new_tasks {
> +            if task.endtime.is_none() {
> +                active_tasks.insert(task.upid.clone(), task);
> +            } else {
> +                new_finished_tasks.push(task);
> +            }
> +        }
> +
> +        let mut state = self.read_state();
> +
> +        for upid in drop_tracked {
> +            state.remove_tracked_task(&upid);
> +        }
> +
> +        self.write_tasks_to_journal(
> +            new_finished_tasks,
> +            &mut active_tasks,
> +            update_state_for_remote,
> +            &mut state,
> +        )?;
> +
> +        let mut active: Vec<TaskCacheItem> = active_tasks.into_values().collect();
> +
> +        active.sort_by(compare_tasks_reverse);
> +        self.write_active_tasks(active.into_iter())
> +            .context("failed to write active task file when adding tasks")?;
> +        self.write_state(state)
> +            .context("failed to update task archive state file when adding tasks")?;
> +
> +        self.apply_journal_if_too_large()
> +            .context("could not apply journal early")?;
> +
> +        Ok(())
> +    }
> +
> +    fn write_tasks_to_journal(
> +        &self,
> +        tasks: Vec<TaskCacheItem>,
> +        active_tasks: &mut HashMap<RemoteUpid, TaskCacheItem>,
> +        node_success_map: &NodeFetchSuccessMap,
> +        state: &mut State,
> +    ) -> Result<(), Error> {
> +        let filename = self.cache.base_path.join(WAL_FILENAME);
> +        let mut file = OpenOptions::new()
> +            .append(true)
> +            .create(true)
> +            .open(filename)?;
> +
> +        for task in tasks {
> +            // Remove this finished task from our set of active tasks.
> +            active_tasks.remove(&task.upid);
> +
> +            // TODO:: Handle PBS tasks correctly.
> +            // TODO: This is awkward, maybe overhaul RemoteUpid type to make this easier

There are helper types/functions in proxmox-yew-comp (only ui) that can 
parse both, but we should probably move that into proxmox-schema or it's 
own crate so we can reuse it here when we need it

aside from that, theoretically we could know which type of remote it is,
no ? (we could either save that info somewhere, or simply check the 
config for it)

> +            match task.upid.upid.parse::<PveUpid>() {
> +                Ok(upid) => {
> +                    let node = &upid.node;
> +                    let remote = task.upid.remote();
> +
> +                    if node_success_map.node_successful(remote, node) {
> +                        state.update_cutoff_timestamp(task.upid.remote(), node, task.starttime);
> +                    }
> +                }
> +                Err(error) => {
> +                    log::error!("could not parse PVE UPID - not saving to task cache: {error:#}");
> +                    continue;
> +                }
> +            }
> +
> +            serde_json::to_writer(&mut file, &task)?;
> +            writeln!(&file)?;
> +        }
> +
> +        file.sync_all()?;
> +
> +        Ok(())
> +    }
> +
> +    /// Returns the current size of the journal file.
> +    fn journal_size(&self) -> Result<u64, Error> {
> +        let metadata = self
> +            .cache
> +            .base_path
> +            .join(WAL_FILENAME)
> +            .metadata()
> +            .context("failed to read metadata of journal file")?;
> +
> +        Ok(metadata.size())
> +    }
> +
> +    /// Apply the journal early if it has grown larger than the maximum allowed size.
> +    fn apply_journal_if_too_large(&self) -> Result<(), Error> {
> +        let size = self.journal_size()?;
> +
> +        if size > self.cache.journal_max_size {
> +            log::info!("task cache journal size {size} bytes, applying early");
> +            self.apply_journal()?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Apply the task journal.
> +    ///
> +    /// This will merge all tasks in the journal file into the task archive.
> +    pub fn apply_journal(&self) -> Result<(), Error> {
> +        let start = Instant::now();
> +        let filename = self.cache.base_path.join(WAL_FILENAME);
> +
> +        let file = match File::open(&filename) {
> +            Ok(file) => Box::new(BufReader::new(file)),
> +            Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
> +            Err(err) => return Err(err.into()),
> +        };
> +
> +        log::info!("applying task cache journal");
> +        let iterator = ArchiveIterator::new(file);
> +
> +        let mut tasks: Vec<TaskCacheItem> = iterator
> +            .filter_map(|task| match task {
> +                Ok(task) => Some(task),
> +                Err(err) => {
> +                    log::error!("could not read task from journal file: {err:#}");
> +                    None
> +                }
> +            })
> +            .collect();
> +
> +        // The WAL contains tasks in arbitrary order since we always append.
> +        tasks.sort_by(compare_tasks_reverse);
> +        tasks.dedup();
> +
> +        let count = tasks.len();
> +
> +        self.merge_tasks_into_archive(tasks)?;
> +
> +        // truncate the journal file
> +        OpenOptions::new()
> +            .write(true)
> +            .truncate(true)
> +            .open(filename)
> +            .context("failed to truncate journal file")?;
> +
> +        log::info!(
> +            "commited {count} tasks in {:.3}.s to task cache archive",
> +            start.elapsed().as_secs_f32()
> +        );
> +
> +        Ok(())
> +    }
> +
> +    /// Merge a list of *finished* tasks into the remote task archive files.
> +    /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
> +    /// timestamp, ascending by UPID).
> +    fn merge_tasks_into_archive(&self, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
> +        debug_assert!(tasks
> +            .iter()
> +            .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
> +
> +        let files = self
> +            .cache
> +            .archive_files(&self.lock)
> +            .context("failed to read achive files")?;
> +
> +        let mut files = files.iter().peekable();
> +
> +        let mut current = files.next();
> +        let mut next = files.peek();
> +
> +        let mut tasks_for_current_file = Vec::new();
> +
> +        // Tasks are sorted youngest to oldest (biggest start time first)
> +        for task in tasks {
> +            // Skip ahead until we have found the correct file.
> +            while next.is_some() {
> +                if let Some(current) = current {
> +                    if task.starttime >= current.starttime {
> +                        break;
> +                    }
> +                    // The next entry's cut-off is larger then the task's start time, that means
> +                    // we want to finalized the current file by merging all tasks that
> +                    // should be stored in it...
> +                    self.merge_single_archive_file(
> +                        std::mem::take(&mut tasks_for_current_file),
> +                        current,
> +                    )
> +                    .with_context(|| {
> +                        format!("failed to merge archive file {}", current.path.display())
> +                    })?;
> +                }
> +
> +                // ... and the `current` file to the next entry.
> +                current = files.next();
> +                next = files.peek();
> +            }
> +
> +            if let Some(current) = current {
> +                if task.starttime < current.starttime {
> +                    continue;
> +                }
> +            }
> +            tasks_for_current_file.push(task);
> +        }
> +
> +        // Merge tasks for the last file.
> +        if let Some(current) = current {
> +            self.merge_single_archive_file(tasks_for_current_file, current)
> +                .with_context(|| {
> +                    format!("failed to merge archive file {}", current.path.display())
> +                })?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Add a new tracked task.
> +    ///
> +    /// This will insert the task in the list of tracked tasks in the state file,
> +    /// as well as create an entry in the `active` file.
> +    pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
> +        let mut state = self.read_state();
> +
> +        let mut tasks: Vec<TaskCacheItem> = self
> +            .get_tasks(GetTasks::Active)
> +            .context("failed to create active task iterator")?
> +            .collect();
> +
> +        tasks.push(task.clone());
> +        tasks.sort_by(compare_tasks_reverse);
> +
> +        state.add_tracked_task(task.upid);
> +
> +        self.write_active_tasks(tasks.into_iter())
> +            .context("failed to write active tasks file when adding tracked task")?;
> +
> +        self.write_state(state)
> +            .context("failed to write state when adding tracked task")?;
> +
> +        Ok(())
> +    }
> +
> +    /// Read the state file.
> +    /// If the state file could not be read or does not exist, the default (empty) state
> +    /// is returned.
> +    pub fn read_state(&self) -> State {
> +        self.cache.read_state()
> +    }
> +
> +    /// Write the state file.
> +    fn write_state(&self, state: State) -> Result<(), Error> {
> +        let path = self.cache.base_path.join(STATE_FILENAME);
> +
> +        let data = serde_json::to_vec_pretty(&state)?;
> +
> +        proxmox_sys::fs::replace_file(path, &data, self.cache.create_options, true)?;
> +
> +        Ok(())
> +    }
> +
> +    /// Write the provided tasks to the 'active' file.
> +    ///
> +    /// The tasks are first written to a temporary file, which is then used
> +    /// to atomically replace the original.
> +    fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
> +        let (fd, path) = proxmox_sys::fs::make_tmp_file(
> +            self.cache.base_path.join(ACTIVE_FILENAME),
> +            self.cache.create_options,
> +        )?;
> +        let mut fd = BufWriter::new(fd);
> +
> +        Self::write_tasks(&mut fd, tasks)?;
> +
> +        if let Err(err) = fd.flush() {
> +            log::error!("could not flush 'active' file: {err:#}");
> +        }
> +        drop(fd);
> +
> +        let target = self.cache.base_path.join(ACTIVE_FILENAME);
> +
> +        let res = std::fs::rename(&path, &target).with_context(|| {
> +            format!(
> +                "failed to replace {} with {}",
> +                target.display(),
> +                path.display(),
> +            )
> +        });
> +
> +        if let Err(err) = res {
> +            if let Err(err) = std::fs::remove_file(&path) {
> +                log::error!(
> +                    "failed to cleanup temporary file {}: {err:#}",
> +                    path.display()
> +                );
> +            }
> +
> +            return Err(err);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Merge `tasks` with an existing archive file.
> +    /// This function assumes that `tasks` and the pre-existing contents of the archive
> +    /// file are both sorted descending by starttime (most recent tasks come first).
> +    /// The task archive must be locked when calling this function.
> +    fn merge_single_archive_file(
> +        &self,
> +        tasks: Vec<TaskCacheItem>,
> +        file: &ArchiveFile,
> +    ) -> Result<(), Error> {
> +        if tasks.is_empty() {
> +            return Ok(());
> +        }
> +
> +        // TODO: Might be nice to also move this to ArchiveFile
> +        let (temp_file, temp_file_path) =
> +            proxmox_sys::fs::make_tmp_file(&file.path, self.cache.create_options)?;
> +        let mut writer = if file.compressed {
> +            let encoder =
> +                zstd::stream::write::Encoder::new(temp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
> +                    .auto_finish();
> +            Box::new(BufWriter::new(encoder)) as Box<dyn Write>
> +        } else {
> +            Box::new(BufWriter::new(temp_file)) as Box<dyn Write>
> +        };
> +
> +        let archive_iter = file
> +            .iter()?
> +            .flat_map(|item| match item {
> +                Ok(item) => Some(item),
> +                Err(err) => {
> +                    log::error!("could not read task cache item while merging: {err:#}");
> +                    None
> +                }
> +            })
> +            .peekable();
> +        let task_iter = tasks.into_iter().peekable();
> +
> +        Self::write_tasks(&mut writer, MergeTaskIterator::new(archive_iter, task_iter))?;
> +
> +        if let Err(err) = writer.flush() {
> +            log::error!("could not flush BufWriter for {file:?}: {err:#}");
> +        }
> +        drop(writer);
> +
> +        if let Err(err) = std::fs::rename(&temp_file_path, &file.path).with_context(|| {
> +            format!(
> +                "failed to replace {} with {}",
> +                file.path.display(),
> +                temp_file_path.display()
> +            )
> +        }) {
> +            if let Err(err) = std::fs::remove_file(&temp_file_path) {
> +                log::error!(
> +                    "failed to clean up temporary file {}: {err:#}",
> +                    temp_file_path.display()
> +                );
> +            }
> +
> +            return Err(err);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
> +    /// The individual items are encoded as JSON followed by a newline.
> +    fn write_tasks(
> +        writer: &mut impl Write,
> +        tasks: impl Iterator<Item = TaskCacheItem>,
> +    ) -> Result<(), Error> {
> +        for task in tasks {
> +            serde_json::to_writer(&mut *writer, &task)?;
> +            writeln!(writer)?;
> +        }
> +
> +        Ok(())
> +    }
> +}
> +
> +impl TaskCache {
> +    /// Create a new task cache instance.
> +    ///
> +    /// Remember to call `init` or `new_file` on a locked, writable TaskCache
> +    /// to create the initial archive files.
> +    pub fn new<P: AsRef<Path>>(
> +        path: P,
> +        create_options: CreateOptions,
> +        max_files: u32,
> +        uncompressed: u32,
> +        rotate_after: u64,
> +        journal_max_size: u64,
> +    ) -> Result<Self, Error> {
> +        Ok(Self {
> +            base_path: path.as_ref().into(),
> +            create_options,
> +            journal_max_size,
> +            max_files,
> +            rotate_after,
> +            uncompressed_files: uncompressed,
> +        })
> +    }
> +
> +    /// Lock the cache for reading.
> +    pub fn read(self) -> Result<ReadableTaskCache, Error> {
> +        let lock = self.lock_impl(false)?;
> +
> +        Ok(ReadableTaskCache { cache: self, lock })
> +    }
> +
> +    /// Lock the cache for writing.
> +    pub fn write(self) -> Result<WritableTaskCache, Error> {
> +        let lock = self.lock_impl(true)?;
> +
> +        Ok(WritableTaskCache { cache: self, lock })
> +    }
> +
> +    fn lock_impl(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
> +        let lockfile = self.base_path.join(LOCKFILE_FILENAME);
> +
> +        Ok(TaskCacheLock(proxmox_sys::fs::open_file_locked(
> +            lockfile,
> +            Duration::from_secs(10),
> +            exclusive,
> +            self.create_options,
> +        )?))
> +    }
> +
> +    /// Read the state file.
> +    /// If the state file could not be read or does not exist, the default (empty) state
> +    /// is returned.
> +    pub fn read_state(&self) -> State {
> +        fn do_read_state(path: &Path) -> Result<State, Error> {
> +            match std::fs::read(path) {
> +                Ok(content) => serde_json::from_slice(&content).map_err(|err| err.into()),
> +                Err(err) if err.kind() == ErrorKind::NotFound => Ok(Default::default()),
> +                Err(err) => Err(err.into()),
> +            }
> +        }
> +
> +        let path = self.base_path.join(STATE_FILENAME);
> +        do_read_state(&path).unwrap_or_else(|err| {
> +            log::error!("could not read state file: {err:#}");
> +            Default::default()
> +        })
> +    }
> +
> +    fn get_tasks_impl<'a>(
> +        &self,
> +        mode: GetTasks,
> +        lock: &'a TaskCacheLock,
> +    ) -> Result<TaskArchiveIterator<'a>, Error> {
> +        let journal_file = self.base_path.join(WAL_FILENAME);
> +        let active_path = self.base_path.join(ACTIVE_FILENAME);
> +
> +        match mode {
> +            GetTasks::All => {
> +                let mut archive_files = self.archive_files(lock)?;
> +                archive_files.reverse();
> +
> +                if active_path.exists() {
> +                    archive_files.push(ArchiveFile {
> +                        path: self.base_path.join(ACTIVE_FILENAME),
> +                        compressed: false,
> +                        starttime: 0,
> +                    });
> +                }
> +
> +                TaskArchiveIterator::new(Some(journal_file), archive_files, lock)
> +            }
> +            GetTasks::Active => {
> +                let mut archive_files = Vec::new();
> +
> +                if active_path.exists() {
> +                    archive_files.push(ArchiveFile {
> +                        path: self.base_path.join(ACTIVE_FILENAME),
> +                        compressed: false,
> +                        starttime: 0,
> +                    });
> +                }
> +
> +                TaskArchiveIterator::new(None, archive_files, lock)
> +            }
> +            #[cfg(test)]
> +            GetTasks::Archived => {
> +                let mut files = self.archive_files(lock)?;
> +                files.reverse();
> +
> +                TaskArchiveIterator::new(Some(journal_file), files, lock)
> +            }
> +        }
> +    }
> +
> +    /// Returns a list of existing archive files, together with their respective
> +    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
> +    /// first).
> +    /// The task archive should be locked for reading when calling this function.
> +    fn archive_files(&self, _lock: &TaskCacheLock) -> Result<Vec<ArchiveFile>, Error> {
> +        let mut names = Vec::new();
> +
> +        for entry in std::fs::read_dir(&self.base_path)? {
> +            let entry = entry?;
> +
> +            let path = entry.path();
> +
> +            if let Some(file) = Self::parse_archive_filename(&path) {
> +                names.push(file);
> +            }
> +        }
> +
> +        names.sort_by_key(|e| -e.starttime);
> +
> +        Ok(names)
> +    }
> +
> +    fn parse_archive_filename(path: &Path) -> Option<ArchiveFile> {
> +        let filename = path.file_name()?.to_str()?;
> +        let filename = filename.strip_prefix(ARCHIVE_FILENAME_PREFIX)?;
> +
> +        if let Some(starttime) = filename.strip_suffix(ZSTD_EXTENSION_WITH_DOT) {
> +            let starttime: i64 = starttime.parse().ok()?;
> +
> +            Some(ArchiveFile {
> +                path: path.to_path_buf(),
> +                compressed: true,
> +                starttime,
> +            })
> +        } else {
> +            let starttime: i64 = filename.parse().ok()?;
> +
> +            Some(ArchiveFile {
> +                path: path.to_path_buf(),
> +                compressed: false,
> +                starttime,
> +            })
> +        }
> +    }
> +}
> +
> +/// Comparison function for sorting tasks.
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> +    a.starttime
> +        .cmp(&b.starttime)
> +        .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
> +}
> +
> +/// Comparison function for sorting tasks, reversed
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> +    compare_tasks(a, b).reverse()

super minor thing, but wouldn't it also be simply possible to do
compare_tasks(b, a)

here ? it will *probably* compile to the same code anyway so no big deal

> +}
> +
> +/// Iterator over the task archive.
> +pub struct TaskArchiveIterator<'a> {
> +    inner: Box<dyn Iterator<Item = TaskCacheItem>>,
> +
> +    /// Lock for this archive. This contains the lock in case we
> +    /// need to keep the archive locked while iterating over it.
> +    _lock: &'a TaskCacheLock,
> +}
> +
> +impl<'a> TaskArchiveIterator<'a> {
> +    /// Create a new task archive iterator.
> +    ///
> +    /// `files` should be sorted with the most recent archive file *last*.
> +    fn new(
> +        journal: Option<PathBuf>,
> +        files: Vec<ArchiveFile>,
> +        lock: &'a TaskCacheLock,
> +    ) -> Result<Self, Error> {
> +        let inner = InnerTaskArchiveIterator::new(files)
> +            .filter_map(|res| match res {
> +                Ok(task) => Some(task),
> +                Err(err) => {
> +                    log::error!("could not read task from archive file: {err:#}");
> +                    None
> +                }
> +            })
> +            .peekable();
> +
> +        if let Some(journal) = journal {
> +            let journal_reader = Box::new(BufReader::new(File::open(journal)?));
> +            let journal_task_iterator = JournalIterator::new(journal_reader).peekable();
> +            let merge_task_iter = MergeTaskIterator::new(journal_task_iterator, inner);
> +
> +            Ok(Self {
> +                inner: Box::new(merge_task_iter),
> +                _lock: lock,
> +            })
> +        } else {
> +            Ok(Self {
> +                inner: Box::new(inner),
> +                _lock: lock,
> +            })
> +        }
> +    }
> +}
> +
> +impl Iterator for TaskArchiveIterator<'_> {
> +    type Item = TaskCacheItem;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        self.inner.next()
> +    }
> +}
> +
> +struct InnerTaskArchiveIterator {
> +    /// Archive files to read.
> +    files: Vec<ArchiveFile>,
> +    /// Archive iterator we are currently using, if any
> +    current: Option<ArchiveIterator>,
> +}
> +
> +impl InnerTaskArchiveIterator {
> +    /// Create a new task archive iterator.
> +    pub fn new(files: Vec<ArchiveFile>) -> Self {
> +        Self {
> +            files,
> +            current: None,
> +        }
> +    }
> +}
> +
> +impl Iterator for InnerTaskArchiveIterator {
> +    type Item = Result<TaskCacheItem, Error>;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        loop {
> +            match &mut self.current {
> +                Some(current) => {
> +                    let next = current.next();
> +                    if next.is_some() {
> +                        return next;
> +                    } else {
> +                        self.current = None;
> +                    }
> +                }
> +                None => 'inner: loop {
> +                    // Returns `None` if no more files are available, stopping iteration.
> +                    let next_file = self.files.pop()?;
> +
> +                    match next_file.iter() {
> +                        Ok(iter) => {
> +                            self.current = Some(iter);
> +                            break 'inner;
> +                        }
> +                        Err(err) => {
> +                            log::error!("could not create archive iterator while iteration over task archive files, skipping: {err:#}")
> +                        }
> +                    }
> +                },
> +            }
> +        }
> +    }
> +}
> +
> +/// Archive file.
> +#[derive(Clone, Debug)]
> +struct ArchiveFile {
> +    /// The path to the archive file.
> +    path: PathBuf,
> +    /// This archive file is compressed using zstd.
> +    compressed: bool,
> +    /// The archive's lowest permitted starttime (seconds since UNIX epoch).
> +    starttime: i64,
> +}
> +
> +impl ArchiveFile {
> +    /// Create an [`ArchiveIterator`] for this file.
> +    fn iter(&self) -> Result<ArchiveIterator, Error> {
> +        let fd = File::open(&self.path)
> +            .with_context(|| format!("failed to open archive file {}", self.path.display()))?;
> +
> +        let iter = if self.compressed {
> +            let reader = zstd::stream::read::Decoder::new(fd).with_context(|| {
> +                format!(
> +                    "failed to create zstd decoder for archive file {}",
> +                    self.path.display()
> +                )
> +            })?;
> +            ArchiveIterator::new(Box::new(BufReader::new(reader)))
> +        } else {
> +            ArchiveIterator::new(Box::new(BufReader::new(fd)))
> +        };
> +
> +        Ok(iter)
> +    }
> +
> +    fn compress(&mut self, options: CreateOptions) -> Result<(), Error> {
> +        let uncompressed_file_path = &self.path;
> +
> +        let (temp_file, temp_file_path) =
> +            proxmox_sys::fs::make_tmp_file(uncompressed_file_path, options)
> +                .context("failed to create temporary file")?;
> +
> +        let uncompressed_file =
> +            File::open(uncompressed_file_path).context("failed to open uncompressed file")?;
> +
> +        zstd::stream::copy_encode(
> +            uncompressed_file,
> +            temp_file,
> +            zstd::DEFAULT_COMPRESSION_LEVEL,
> +        )
> +        .context("zstd::stream::copy_encode failed")?;
> +
> +        let mut new_path_for_compressed = uncompressed_file_path.clone();
> +        new_path_for_compressed.set_extension(format!("{}.{ZSTD_EXTENSION}", self.starttime));
> +
> +        std::fs::rename(&temp_file_path, &new_path_for_compressed)
> +            .context("failed to move compressed task achive file")?;
> +        std::fs::remove_file(uncompressed_file_path)
> +            .context("failed to remove uncompressed archive file")?;
> +
> +        self.path = new_path_for_compressed;
> +        self.compressed = true;
> +
> +        Ok(())
> +    }
> +}
> +
> +/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
> +/// from both iterators sorted.
> +/// The two iterators are expected to be sorted descendingly based on the task's starttime and
> +/// ascendingly based on the task's UPID's string representation. This can be
> +/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
> +struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
> +    left: Peekable<T>,
> +    right: Peekable<U>,
> +}
> +
> +impl<T, U> MergeTaskIterator<T, U>
> +where
> +    T: Iterator<Item = TaskCacheItem>,
> +    U: Iterator<Item = TaskCacheItem>,
> +{
> +    /// Create a new merging iterator.
> +    fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
> +        Self { left, right }
> +    }
> +}
> +
> +impl<T, U> Iterator for MergeTaskIterator<T, U>
> +where
> +    T: Iterator<Item = TaskCacheItem>,
> +    U: Iterator<Item = TaskCacheItem>,
> +{
> +    type Item = T::Item;
> +
> +    fn next(&mut self) -> Option<T::Item> {
> +        let order = match (self.left.peek(), self.right.peek()) {
> +            (Some(l), Some(r)) => Some(compare_tasks(l, r)),
> +            (Some(_), None) => Some(Ordering::Greater),
> +            (None, Some(_)) => Some(Ordering::Less),
> +            (None, None) => None,
> +        };
> +
> +        match order {
> +            Some(Ordering::Greater) => self.left.next(),
> +            Some(Ordering::Less) => self.right.next(),
> +            Some(Ordering::Equal) => {
> +                // Dedup by consuming the other iterator as well
> +                let _ = self.right.next();
> +                self.left.next()
> +            }
> +            None => None,
> +        }
> +    }
> +}
> +
> +/// Iterator for a single task archive file.
> +///
> +/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
> +/// tasks are read line by line, without leading the entire archive file into memory.
> +struct ArchiveIterator {
> +    iter: Lines<Box<dyn BufRead>>,
> +}
> +
> +impl ArchiveIterator {
> +    /// Create a new iterator.
> +    pub fn new(reader: Box<dyn BufRead>) -> Self {
> +        let lines = reader.lines();
> +
> +        Self { iter: lines }
> +    }
> +}
> +
> +impl Iterator for ArchiveIterator {
> +    type Item = Result<TaskCacheItem, Error>;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        self.iter.next().map(|result| {
> +            result
> +                .and_then(|line| Ok(serde_json::from_str(&line)?))
> +                .map_err(Into::into)
> +        })
> +    }
> +}
> +
> +/// Iterator for journal files. This iterator uses [`ArchiveIterator`] internally, but will eagerly
> +/// load all tasks into memory to sort and deduplicate them.
> +struct JournalIterator {
> +    inner: Box<dyn Iterator<Item = TaskCacheItem>>,
> +}
> +
> +impl JournalIterator {
> +    fn new(file: Box<dyn BufRead>) -> Self {
> +        let iter = ArchiveIterator::new(file);
> +
> +        let mut tasks: Vec<TaskCacheItem> = iter
> +            .flat_map(|task| match task {
> +                Ok(task) => Some(task),
> +                Err(err) => {
> +                    log::error!("could not read task while iterating over archive file: {err:#}");
> +                    None
> +                }
> +            })
> +            .collect();
> +
> +        tasks.sort_by(compare_tasks_reverse);
> +        tasks.dedup();
> +
> +        Self {
> +            inner: Box::new(tasks.into_iter()),
> +        }
> +    }
> +}
> +
> +impl Iterator for JournalIterator {
> +    type Item = TaskCacheItem;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        self.inner.next()
> +    }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> +    use std::io::Cursor;
> +
> +    use crate::test_support::temp::NamedTempDir;
> +
> +    use super::*;
> +
> +    #[test]
> +    fn archive_filename() {
> +        let a = TaskCache::parse_archive_filename(&PathBuf::from("/tmp/archive.10000")).unwrap();
> +
> +        assert_eq!(a.path, PathBuf::from("/tmp/archive.10000"));
> +        assert_eq!(a.starttime, 10000);
> +        assert!(!a.compressed);
> +
> +        let a = TaskCache::parse_archive_filename(&PathBuf::from("/tmp/archive.1234.zst")).unwrap();
> +
> +        assert_eq!(a.path, PathBuf::from("/tmp/archive.1234.zst"));
> +        assert_eq!(a.starttime, 1234);
> +        assert!(a.compressed);
> +    }
> +
> +    #[test]
> +    fn archive_iterator() -> Result<(), Error> {
> +        let file = r#"
> +            {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> +            {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> +            invalid"#
> +            .trim_start();
> +
> +        let cursor = Box::new(Cursor::new(file.as_bytes()));
> +        let mut iter = ArchiveIterator::new(cursor);
> +
> +        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
> +        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
> +        assert!(iter.next().unwrap().is_err());
> +        assert!(iter.next().is_none());
> +
> +        Ok(())
> +    }
> +
> +    fn task(starttime: i64, ended: bool) -> TaskCacheItem {
> +        let (status, endtime) = if ended {
> +            (Some("OK".into()), Some(starttime + 10))
> +        } else {
> +            (None, None)
> +        };
> +
> +        TaskCacheItem {
> +            upid: format!(
> +                "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root at pam:"
> +            )
> +            .parse()
> +            .unwrap(),
> +            starttime,
> +            status,
> +            endtime,
> +        }
> +    }
> +
> +    fn assert_starttimes(cache: &WritableTaskCache, starttimes: &[i64]) {
> +        let tasks: Vec<i64> = cache
> +            .get_tasks(GetTasks::All)
> +            .unwrap()
> +            .map(|task| task.starttime)
> +            .collect();
> +
> +        assert_eq!(&tasks, starttimes);
> +    }
> +
> +    fn add_tasks(cache: &WritableTaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
> +        let mut node_map = NodeFetchSuccessMap::default();
> +        node_map.set_node_success("pve-remote".to_string(), "pve".to_string());
> +
> +        cache.update(tasks, &node_map, HashSet::new())
> +    }
> +
> +    const DEFAULT_MAX_SIZE: u64 = 10000;
> +
> +    #[test]
> +    fn test_add_tasks() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(
> +            tmp_dir.path(),
> +            CreateOptions::new(),
> +            3,
> +            1,
> +            0,
> +            DEFAULT_MAX_SIZE,
> +        )
> +        .unwrap()
> +        .write()?;
> +
> +        cache.new_file(1000, false)?;
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 1);
> +
> +        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> +        assert_eq!(
> +            cache.read_state().cutoff_timestamp("pve-remote", "pve"),
> +            Some(1001)
> +        );
> +
> +        cache.rotate(1500)?;
> +
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 2);
> +
> +        add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
> +        add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
> +
> +        assert_eq!(
> +            cache.read_state().cutoff_timestamp("pve-remote", "pve"),
> +            Some(1501),
> +        );
> +
> +        cache.rotate(2000)?;
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
> +
> +        add_tasks(&cache, vec![task(2000, true)])?;
> +        add_tasks(&cache, vec![task(1502, true)])?;
> +        add_tasks(&cache, vec![task(1002, true)])?;
> +
> +        // These are before the cut-off of 1000, so they will be discarded.
> +        // add_tasks(&cache, vec![task(800, true), task(900, true)])?;
> +
> +        // This one should be deduped
> +        add_tasks(&cache, vec![task(1000, true)])?;
> +
> +        assert_starttimes(
> +            &cache,
> +            &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
> +        );
> +
> +        cache.rotate(2500)?;
> +
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
> +
> +        assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
> +
> +        cache.rotate(3000)?;
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
> +
> +        assert_starttimes(&cache, &[2000]);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(
> +            tmp_dir.path(),
> +            CreateOptions::new(),
> +            3,
> +            1,
> +            0,
> +            DEFAULT_MAX_SIZE,
> +        )
> +        .unwrap()
> +        .write()?;
> +
> +        cache.new_file(1000, false)?;
> +        add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> +        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> +        assert_starttimes(&cache, &[1001, 1000]);
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn test_init() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(
> +            tmp_dir.path(),
> +            CreateOptions::new(),
> +            3,
> +            1,
> +            100,
> +            DEFAULT_MAX_SIZE,
> +        )
> +        .unwrap()
> +        .write()?;
> +
> +        cache.init(1000)?;
> +        assert_eq!(cache.cache.archive_files(&cache.lock)?.len(), 3);
> +
> +        add_tasks(
> +            &cache,
> +            vec![task(1050, true), task(950, true), task(850, true)],
> +        )?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
> +
> +        Ok(())
> +    }
> +
> +    fn add_finished_tracked(cache: &WritableTaskCache, starttime: i64) -> Result<(), Error> {
> +        let t = task(starttime, true);
> +        let upid = t.upid.clone();
> +
> +        let mut node_map = NodeFetchSuccessMap::default();
> +        node_map.set_node_success("pve-remote".to_string(), "pve".to_string());
> +
> +        cache.update(vec![t], &node_map, HashSet::from_iter([upid]))
> +    }
> +
> +    #[test]
> +    fn test_tracking_tasks() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(
> +            tmp_dir.path(),
> +            CreateOptions::new(),
> +            3,
> +            1,
> +            100,
> +            DEFAULT_MAX_SIZE,
> +        )
> +        .unwrap()
> +        .write()?;
> +
> +        cache.init(1000)?;
> +
> +        cache.add_tracked_task(task(1050, false))?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> +        cache.add_tracked_task(task(1060, false))?;
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> +        assert_eq!(cache.read_state().tracked_tasks().count(), 2);
> +
> +        // Mark first task as finished
> +        add_finished_tracked(&cache, 1050)?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
> +        assert_eq!(cache.read_state().tracked_tasks().count(), 1);
> +
> +        // Mark second task as finished
> +        add_finished_tracked(&cache, 1060)?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
> +        assert_eq!(cache.read_state().tracked_tasks().count(), 0);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn journal_is_applied_if_max_size_exceeded() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +
> +        // Should be *just* enough to fit a single task, which means that we apply the journal
> +        // after adding a second one.
> +        const ENOUGH_FOR_SINGLE_TASK: u64 = 200;
> +
> +        let cache = TaskCache::new(
> +            tmp_dir.path(),
> +            CreateOptions::new(),
> +            3,
> +            1,
> +            100,
> +            ENOUGH_FOR_SINGLE_TASK,
> +        )
> +        .unwrap()
> +        .write()?;
> +
> +        add_tasks(&cache, vec![task(1000, true)])?;
> +        assert!(cache.journal_size()? > 0);
> +
> +        add_tasks(&cache, vec![task(1000, true)])?;
> +
> +        assert_eq!(cache.journal_size()?, 0);
> +
> +        Ok(())
> +    }
> +}





More information about the pdm-devel mailing list