[pdm-devel] [PATCH proxmox-datacenter-manager v2 1/4] remote tasks: implement improved cache for remote tasks

Wolfgang Bumiller w.bumiller at proxmox.com
Wed Apr 16 15:15:55 CEST 2025


looks mostly fine, some minor things noted inline...

On Fri, Apr 11, 2025 at 01:01:14PM +0200, Lukas Wagner wrote:
> This commit adds a new implementation for a cache for remote tasks, one
> that should improve performance characteristics in for pretty much all
> use cases.
> 
> In general storage works pretty similar to the task archive we already
> have for (local) PDM tasks.
> 
> root at pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
> total 40
> -rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 active
> -rw-r--r-- 1 www-data www-data  1676 Mar 11 14:51 archive.1741355462
> -rw-r--r-- 1 www-data www-data     0 Mar 11 14:51 archive.1741441862
> -rw-r--r-- 1 www-data www-data  2538 Mar 11 14:51 archive.1741528262
> -rw-r--r-- 1 www-data www-data  8428 Mar 11 15:07 archive.1741614662
> -rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
> -rw-r--r-- 1 www-data www-data  3364 Mar 13 13:18 archive.1741788270
> -rw-r--r-- 1 www-data www-data   287 Mar 13 13:18 state
> 
> Tasks are stored in the 'active' and multiple 'archive' files.
> Running tasks are placed into the 'active' file, tasks that are finished
> are persisted into one of the archive files.
> The archive files are suffixed with a UNIX timestamp which serves
> as a lower-bound for start times for tasks stored in this file.
> Encoding this lower-bound in the file name instead of using a more
> traditional increasing file index (.1, .2, .3) gives us the benefit
> that we can decide in which file a newly arrived tasks belongs from
> a single readdir call, without even reading the file itself.
> 
> The size of the entire archive can be controlled by doing file
> 'rotation', where the archive file with the oldest timestamp is deleted
> and a new file with the current timestamp is added.
> If 'rotation' happens at fixed intervals (e.g. once daily), this gives
> us a configurable number of days of task history.
> There is no direct cap for the total number
> of tasks, but this could be easily added by checking the size of the
> most recent archive file and by rotating early if a threshold is
> exceeded.
> 
> The format inside the files is also similar to the local task archive,
> with one task corresponding to one line in the file.
> One key difference is that here each line is a JSON object, that is to
> make it easier to add additional data later, if needed.
> The JSON object contains the tasks UPID, status, endtime and starttime
> (the starttime is technically also encoded in the UPID, but having it as
> a separate value simplified a couple of things)
> Each file is sorted by the task's start time, the youngest task coming first.
> 
> One key difference between this task cache and the local task archive is
> that we need to handle tasks coming in out-of-order, e.g. if remotes
> were not reachable for a certain time. To maintain the ordering
> in the file, we have to merge the newly arrived tasks into the existing
> task file. This was implemented in a way that avoids reading the entire
> file into memory at once, exploiting the fact that the contents of the
> existing file are already sorted. This allows to do a
> zipper/merge-sort like approach (see MergeTaskIterator for
> details). The existing file is only read line-by-line and finally
> replaced atomically.
> 
> The cache also has a separate state file, containing additional
> information, e.g. cut-off timestamps for the polling task.
> Some of the data in the statefile is technically also contained in the
> archive files, but reading the state file is much faster.
> 
> This commit also adds an elaborate suite of unit tests for this new
> cache. While adding some additional work up front, they paid off themselves
> during development quite quickly, since the overall approach for the
> cache changed a couple of times. The test suite gave me confidence that
> the design changes didn't screw anything up, catching a couple of bugs
> in the process.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> Reviewed-by: Max Carrara <m.carrara at proxmox.com>
> ---
>  server/src/remote_tasks/mod.rs        |   2 +
>  server/src/remote_tasks/task_cache.rs | 964 ++++++++++++++++++++++++++
>  2 files changed, 966 insertions(+)
>  create mode 100644 server/src/remote_tasks/task_cache.rs
> 
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 7ded5408..2062f2b7 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
>  
>  use crate::{api::pve, task_utils};
>  
> +mod task_cache;
> +
>  /// Get tasks for all remotes
>  // FIXME: filter for privileges
>  pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> new file mode 100644
> index 00000000..628a9a40
> --- /dev/null
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -0,0 +1,964 @@
> +//! Task cache implementation, based on rotating files.
> +
> +use std::{
> +    cmp::Ordering,
> +    collections::{HashMap, HashSet},
> +    fs::File,
> +    io::{BufRead, BufReader, BufWriter, Lines, Write},
> +    iter::Peekable,
> +    path::{Path, PathBuf},
> +    time::Duration,
> +};
> +
> +use anyhow::Error;
> +use proxmox_sys::fs::CreateOptions;
> +use serde::{Deserialize, Serialize};
> +
> +use pdm_api_types::RemoteUpid;
> +
> +/// Item which can be put into the task cache.
> +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
> +pub struct TaskCacheItem {
> +    /// The task's UPID
> +    pub upid: RemoteUpid,
> +    /// The time at which the task was started (seconds since the UNIX epoch).
> +    /// Technically this is also contained within the UPID, duplicating it here
> +    /// allows us to directly access it when sorting in new tasks, without having
> +    /// to parse the UPID.
> +    pub starttime: i64,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    /// The task's status.
> +    pub status: Option<String>,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    /// The task's endtime (seconds since the UNIX epoch).
> +    pub endtime: Option<i64>,
> +}
> +
> +/// State needed for task polling.
> +#[derive(Serialize, Deserialize, Default)]
> +#[serde(rename_all = "kebab-case")]
> +pub struct State {
> +    /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
> +    /// This can be used as a cut-off when requesting new task data.
> +    pub most_recent_archive_starttime: HashMap<String, i64>,
> +    /// Oldest running task. Useful as another cut-off for fetching tasks.
> +    /// The timestamp is based on seconds since the UNIX epoch.
> +    pub oldest_active_task: HashMap<String, i64>,
> +    /// Tracked tasks per remote.
> +    pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
> +}
> +
> +/// Cache for remote tasks.
> +#[derive(Clone)]
> +pub struct TaskCache {
> +    /// Path where the cache's files should be placed.
> +    base_path: PathBuf,
> +    /// File permissions for the cache's files.
> +    create_options: CreateOptions,
> +}
> +
> +/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
> +pub struct AddTasks {
> +    /// Update most recent archived task in state file.
> +    pub update_most_recent_archive_timestamp: bool,
> +    /// The tasks to add.
> +    pub tasks: Vec<TaskCacheItem>,
> +}
> +
> +/// Lock for the cache.
> +#[allow(dead_code)]
> +pub struct TaskCacheLock(File);
> +
> +/// Which tasks to fetch from the archive.
> +pub enum GetTasks {
> +    /// Get all tasks, finished and running.
> +    All,
> +    /// Only get running (active) tasks.
> +    Active,
> +    /// Only get finished (archived) tasks.
> +    #[allow(dead_code)]
> +    Archived,
> +}
> +
> +impl TaskCache {
> +    /// Create a new task cache instance.
> +    ///
> +    /// Remember to call `init` or `new_file` to create initial archive files before
> +    /// adding any tasks.
> +    pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
> +        Ok(Self {
> +            base_path: path.as_ref().into(),
> +            create_options,
> +        })
> +    }
> +
> +    /// Create initial task archives that can be backfilled with the
> +    /// recent task history from a remote.
> +    ///
> +    /// This function only has an effect if there are no archive files yet.
> +    pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
> +        let _lock = self.lock(true)?;
> +
> +        if self.archive_files()?.is_empty() {
> +            for i in 0..number_of_files {
> +                self.new_file(now - (i * period_per_file) as i64)?;
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Start a new archive file with a given timestamp.
> +    /// `now` is supposed to be a UNIX timestamp (seconds).
> +    fn new_file(&self, now: i64) -> Result<(), Error> {
> +        let new_path = self.base_path.join(format!("archive.{now}"));
> +        let mut file = File::create(&new_path)?;
> +        self.create_options.apply_to(&mut file, &new_path)?;
> +
> +        Ok(())
> +    }
> +
> +    /// Lock the cache.
> +    fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
> +        let lockfile = self.base_path.join(".lock");
> +
> +        let fd = proxmox_sys::fs::open_file_locked(
> +            lockfile,
> +            Duration::from_secs(15),
> +            exclusive,
> +            self.create_options.clone(),
> +        )?;
> +
> +        Ok(TaskCacheLock(fd))
> +    }
> +
> +    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
> +    ///
> +    /// The oldest archive files are removed if the total number of archive files exceeds
> +    /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
> +    ///
> +    /// This function requests an exclusive lock, don't call if you already hold a lock.
> +    pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
> +        let _lock = self.lock(true)?;
> +        let mut did_rotate = false;
> +
> +        let mut bounds = self.archive_files()?;
> +
> +        match bounds.first() {
> +            Some(bound) => {
> +                if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
> +                    self.new_file(now)?;
> +                    did_rotate = true;
> +                }
> +            }
> +            None => {
> +                self.new_file(now)?;
> +                did_rotate = true;
> +            }
> +        }
> +
> +        while bounds.len() >= max_files as usize {
> +            // Unwrap is safe because of the length check above
> +            let to_remove = bounds.pop().unwrap();
> +            std::fs::remove_file(&to_remove.file)?;
> +        }
> +
> +        Ok(did_rotate)
> +    }
> +
> +    /// Add new tasks to the task archive.
> +    ///
> +    /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
> +    /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
> +    /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
> +    /// archive file. If a task which was added as running previously is added again, this time in
> +    /// a finished state, it will be removed from the `active` file and also sorted into
> +    /// one of the archive files.
> +    /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
> +    ///
> +    /// Crash consistency:
> +    ///
> +    /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
> +    /// end after all tasks have been added into the archive. Adding tasks is an idempotent
> +    /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
> +    /// task archive. Individual archive files are updated atomically, but since
> +    /// adding tasks can involve updating multiple archive files, the archive could end up
> +    /// in a partially-updated, inconsistent state in case of a crash.
> +    /// However, since the state file with the cut-off timestamps is updated last,
> +    /// the consistency of the archive should be restored at the next update cycle of the archive.
> +    pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
> +        let lock = self.lock(true)?;
> +
> +        // Get a flat `Vec` of all new tasks
> +        let tasks: Vec<_> = added
> +            .iter_mut()
> +            .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
> +            .collect();
> +
> +        let update_state_for_remote: HashMap<String, bool> = added
> +            .into_iter()
> +            .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
> +            .collect();
> +
> +        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
> +
> +        let mut active_tasks =
> +            HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
> +                Ok(task) => Some((task.upid, task.starttime)),
> +                Err(err) => {
> +                    log::error!("failed to read task cache entry from active file: {err}");
> +                    None
> +                }
> +            }));
> +
> +        // Consume the iterator to get back the lock. The lock is held
> +        // until _lock is finally dropped at the end of the function.
> +        let _lock = task_iter.into_lock();
> +
> +        active_tasks.extend(
> +            tasks
> +                .iter()
> +                .filter(|task| task.endtime.is_none())
> +                .map(|a| (a.upid.clone(), a.starttime)),
> +        );
> +
> +        let mut state = self.read_state();
> +        let mut new_finished_tasks = tasks
> +            .into_iter()
> +            .filter(|task| task.endtime.is_some())
> +            .collect::<Vec<_>>();
> +
> +        new_finished_tasks.sort_by(compare_tasks_reverse);
> +        self.merge_tasks_into_archive(
> +            new_finished_tasks,
> +            &mut active_tasks,
> +            update_state_for_remote,
> +            &mut state,
> +        )?;
> +        self.update_oldest_active(&active_tasks, &mut state);
> +
> +        let mut active: Vec<TaskCacheItem> = active_tasks
> +            .into_iter()
> +            .map(|(upid, starttime)| TaskCacheItem {
> +                upid,
> +                starttime,
> +                status: None,
> +                endtime: None,
> +            })
> +            .collect();
> +
> +        active.sort_by(compare_tasks_reverse);
> +        self.write_active_tasks(active.into_iter())?;
> +        self.write_state(state)?;
> +
> +        Ok(())
> +    }
> +
> +    /// Update the timestamp of the oldest running task in `state`.
> +    fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) {
> +        // Update the state with timestamp of the oldest running task,
> +        // we also use this as cut-off when fetching tasks, so we
> +        // for sure know when a task finishes.
> +        let mut oldest_active_task_per_remote = HashMap::new();
> +        for (upid, startime) in active_tasks.iter() {
> +            oldest_active_task_per_remote
> +                .entry(upid.remote().to_owned())
> +                .and_modify(|time| *time = (*startime).min(*time))
> +                .or_insert(*startime);
> +        }
> +        state.oldest_active_task = oldest_active_task_per_remote;
> +    }
> +
> +    /// Merge a list of *finished* tasks into the remote task archive files.
> +    /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
> +    /// timestamp, ascending by UPID).
> +    ///
> +    /// The task archive should be locked when calling this.
> +    fn merge_tasks_into_archive(
> +        &self,
> +        tasks: Vec<TaskCacheItem>,
> +        active_tasks: &mut HashSet<(RemoteUpid, i64)>,
> +        update_state_for_remote: HashMap<String, bool>,
> +        state: &mut State,
> +    ) -> Result<(), Error> {
> +        debug_assert!(tasks
> +            .iter()
> +            .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
> +
> +        let files = self.archive_files()?;
> +
> +        let mut files = files.iter().peekable();
> +
> +        let mut current = files.next();
> +        let mut next = files.peek();
> +
> +        let mut tasks_for_current_file = Vec::new();
> +
> +        // Tasks are sorted youngest to oldest (biggest starttime first)
> +        for task in tasks {
> +            active_tasks.remove(&(task.upid.clone(), task.starttime));

^ As mention off list, rust nowadays actually understands temporary
partial moves, so we could avoid the clone with:

    let upid_starttime = (task.upid, task.starttime);
    active_tasks.remove(&upid_starttime);
    task.upid = upid_starttime.0;

> +
> +            if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) {
> +                tracked_tasks.remove(&task.upid);
> +            }
> +
> +            if let Some(true) = update_state_for_remote.get(task.upid.remote()) {
> +                // Update the most recent startime per remote, the task polling logic uses it as a
> +                // cut-off.
> +                state
> +                    .most_recent_archive_starttime
> +                    .entry(task.upid.remote().to_owned())
> +                    .and_modify(|time| *time = (task.starttime).max(*time))
> +                    .or_insert(task.starttime);
> +            }
> +
> +            // Skip ahead until we have found the correct file.
> +            while next.is_some() {
> +                if let Some(current) = current {
> +                    if task.starttime >= current.starttime {
> +                        break;
> +                    }
> +                    // The next entry's cut-off is larger then the task's start time, that means
> +                    // we want to finalized the current file by merging all tasks that
> +                    // should be stored in it...
> +                    self.merge_single_archive_file(
> +                        std::mem::take(&mut tasks_for_current_file),
> +                        &current.file,
> +                    )?;
> +                }
> +
> +                // ... and the the `current` file to the next entry.
> +                current = files.next();
> +                next = files.peek();
> +            }
> +
> +            if let Some(current) = current {
> +                if task.starttime < current.starttime {
> +                    continue;
> +                }
> +            }
> +            tasks_for_current_file.push(task);
> +        }
> +
> +        // Merge tasks for the last file.
> +        if let Some(current) = current {
> +            self.merge_single_archive_file(tasks_for_current_file, &current.file)?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Add a new tracked task.
> +    ///
> +    /// This will insert the task in the list of tracked tasks in the state file,
> +    /// as well as create an entry in the `active` file.
> +    ///
> +    /// This function will request an exclusive lock for the task cache,
> +    /// do not call if you are already holding a lock.
> +    pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
> +        let lock = self.lock(true)?;
> +
> +        let mut tasks = Vec::new();
> +        let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
> +
> +        for task in &mut task_iter {
> +            match task {
> +                Ok(task) => tasks.push(task),
> +                Err(err) => {
> +                    log::error!(
> +                        "could not read existing task cache entry from 'active' file: {err}"
> +                    );
> +                }
> +            }
> +        }
> +
> +        tasks.push(task.clone());
> +        tasks.sort_by(compare_tasks_reverse);
> +
> +        let _lock = task_iter.into_lock();
> +
> +        let mut state = self.read_state();
> +
> +        state
> +            .oldest_active_task
> +            .entry(task.upid.remote().to_owned())
> +            .and_modify(|a| *a = (*a).min(task.starttime))
> +            .or_insert(task.starttime);
> +
> +        let tracked_per_remote = state
> +            .tracked_tasks
> +            .entry(task.upid.remote().to_owned())
> +            .or_default();
> +        tracked_per_remote.insert(task.upid);
> +
> +        self.write_active_tasks(tasks.into_iter())?;
> +        self.write_state(state)?;
> +
> +        Ok(())
> +    }
> +
> +    /// Iterate over cached tasks.
> +    ///
> +    /// This function will request a non-exclusive read-lock, don't call if
> +    /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
> +    pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
> +        let lock = self.lock(false)?;
> +        self.get_tasks_with_lock(mode, lock)
> +    }
> +
> +    /// Iterate over cached tasks.
> +    ///
> +    /// This function requires you to pass a lock. If you want to continue to hold the lock
> +    /// after iterating, you can consume the iterator by calling
> +    /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
> +    pub fn get_tasks_with_lock(
> +        &self,
> +        mode: GetTasks,
> +        lock: TaskCacheLock,
> +    ) -> Result<TaskArchiveIterator, Error> {
> +        match mode {
> +            GetTasks::All => {
> +                let mut files = vec![ArchiveFile {
> +                    starttime: 0,

^ this is immediately thrown out

> +                    file: self.base_path.join("active"),
> +                }];
> +
> +                let archive_files = self.archive_files()?;
> +                files.extend(archive_files);

^ you could do the `.map` here in the extend call instead, that would
shorten the entire match arm to 3 lines.

> +
> +                Ok(TaskArchiveIterator::new(
> +                    Box::new(files.into_iter().map(|pair| pair.file)),
> +                    lock,
> +                ))
> +            }
> +            GetTasks::Active => {
> +                let files = vec![ArchiveFile {
> +                    starttime: 0,

^ Same here, here we don't even have a reason :)

> +                    file: self.base_path.join("active"),
> +                }];
> +                Ok(TaskArchiveIterator::new(
> +                    Box::new(files.into_iter().map(|pair| pair.file)),
> +                    lock,
> +                ))
> +            }
> +            GetTasks::Archived => {
> +                let files = self.archive_files()?;
> +
> +                Ok(TaskArchiveIterator::new(
> +                    Box::new(files.into_iter().map(|pair| pair.file)),
> +                    lock,
> +                ))
> +            }
> +        }
> +    }
> +
> +    /// Write the provided tasks to the 'active' file.
> +    ///
> +    /// The tasks are first written to a temporary file, which is then used
> +    /// to atomically replace the original.
> +    fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
> +        let (fd, path) = proxmox_sys::fs::make_tmp_file(
> +            self.base_path.join("active"),
> +            self.create_options.clone(),
> +        )?;
> +        let mut fd = BufWriter::new(fd);
> +
> +        Self::write_tasks(&mut fd, tasks)?;
> +
> +        if let Err(err) = fd.flush() {
> +            log::error!("could not flush 'active' file: {err}");
> +        }
> +        drop(fd);
> +
> +        std::fs::rename(path, self.base_path.join("active"))?;

^ if the rename fails we should `unlink(&path)` (and if *that* fails
just log its error)

At some point we need a *stateful* tempfile helper which also gets the
destination path and does the whole rename-or-unlink-log-error dance in
a `.commit()`...

> +
> +        Ok(())
> +    }
> +
> +    /// Read the state file.
> +    /// If the state file could not be read or does not exist, the default (empty) state
> +    /// is returned.
> +    /// A lock is only necessary if the returned state is modified and later passed
> +    /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as
> +    /// the state file is always replaced atomically.
> +    pub fn read_state(&self) -> State {
> +        fn do_read_state(path: &Path) -> Result<State, Error> {
> +            let data = proxmox_sys::fs::file_read_optional_string(path)?;

We could also skip the utf-8 check and use `serde_json::from_slice()`
instead...
(since we were concerned about performance?)

> +            match data {
> +                Some(data) => Ok(serde_json::from_str(&data)?),
> +                None => Ok(Default::default()),
> +            }
> +        }
> +
> +        let path = self.base_path.join("state");
> +
> +        do_read_state(&path).unwrap_or_else(|err| {
> +            log::error!("could not read state file: {err}");
> +            Default::default()
> +        })
> +    }
> +
> +    /// Write the state file.
> +    /// The task archive should be locked for writing when calling this function.
> +    pub fn write_state(&self, state: State) -> Result<(), Error> {
> +        let path = self.base_path.join("state");
> +
> +        let data = serde_json::to_vec_pretty(&state)?;
> +
> +        proxmox_sys::fs::replace_file(path, &data, self.create_options.clone(), true)?;
> +
> +        Ok(())
> +    }
> +
> +    /// Returns a list of existing archive files, together with their respective
> +    /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
> +    /// first).
> +    /// The task archive should be locked for reading when calling this function.
> +    fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> {
> +        let mut names = Vec::new();
> +
> +        for entry in std::fs::read_dir(&self.base_path)? {
> +            let entry = entry?;
> +
> +            if let Some(endtime) = entry
> +                .path()
> +                .file_name()
> +                .and_then(|s| s.to_str())
> +                .and_then(|s| s.strip_prefix("archive."))
> +            {
> +                match endtime.parse() {
> +                    Ok(starttime) => {
> +                        names.push(ArchiveFile {
> +                            starttime,
> +                            file: entry.path(),
> +                        });
> +                    }
> +                    Err(err) => log::error!("could not parse archive timestamp: {err}"),
> +                }
> +            }
> +        }
> +
> +        names.sort_by_key(|e| -e.starttime);
> +
> +        Ok(names)
> +    }
> +
> +    /// Merge `tasks` with an existing archive file.
> +    /// This function assumes that `tasks` and the pre-existing contents of the archive
> +    /// file are both sorted descending by startime (most recent tasks come first).
> +    /// The task archive must be locked when calling this function.
> +    fn merge_single_archive_file(
> +        &self,
> +        tasks: Vec<TaskCacheItem>,
> +        file: &Path,
> +    ) -> Result<(), Error> {
> +        if tasks.is_empty() {
> +            return Ok(());
> +        }
> +
> +        let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options.clone())?;
> +        let mut fd = BufWriter::new(fd);
> +
> +        if file.exists() {

^ Bad pattern.
Use `File::open` directly and deal with `err.kind() == NotFound` instead.

> +            let archive_reader = BufReader::new(File::open(file)?);
> +            let archive_iter = ArchiveIterator::new(archive_reader)
> +                .flat_map(|item| match item {
> +                    Ok(item) => Some(item),
> +                    Err(err) => {
> +                        log::error!("could not read task cache item while merging: {err}");
> +                        None
> +                    }
> +                })
> +                .peekable();
> +            let task_iter = tasks.into_iter().peekable();
> +
> +            Self::write_tasks(&mut fd, MergeTaskIterator::new(archive_iter, task_iter))?;
> +        } else {
> +            Self::write_tasks(&mut fd, tasks.into_iter())?;
> +        }
> +
> +        if let Err(err) = fd.flush() {
> +            log::error!("could not flush BufWriter for {file:?}: {err}");
> +        }
> +        drop(fd);
> +
> +        if let Err(err) = std::fs::rename(&new_path, file) {
> +            log::error!("could not replace archive file {new_path:?}: {err}");

^ as above, should also attempt to unlink(new_path).

> +        }
> +
> +        Ok(())
> +    }
> +
> +    /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
> +    /// The individual items are encoded as JSON followed by a newline.
> +    /// The task archive should be locked when calling this function.
> +    fn write_tasks(
> +        writer: &mut impl Write,
> +        tasks: impl Iterator<Item = TaskCacheItem>,
> +    ) -> Result<(), Error> {
> +        for task in tasks {
> +            serde_json::to_writer(&mut *writer, &task)?;
> +            writeln!(writer)?;
> +        }
> +
> +        Ok(())
> +    }
> +}
> +
> +/// Comparison function for sorting tasks.
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> +    a.starttime
> +        .cmp(&b.starttime)
> +        .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
> +}
> +
> +/// Comparison function for sorting tasks, reversed
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> +    compare_tasks(a, b).reverse()
> +}
> +
> +/// Iterator over the task archive.
> +pub struct TaskArchiveIterator {
> +    /// Archive files to read.
> +    files: Box<dyn Iterator<Item = PathBuf>>,
> +    /// Archive iterator we are currently using, if any
> +    current: Option<ArchiveIterator<BufReader<File>>>,
> +    /// Lock for this archive.
> +    lock: TaskCacheLock,
> +}
> +
> +impl TaskArchiveIterator {
> +    /// Create a new task archive iterator.
> +    pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
> +        Self {
> +            files,
> +            current: None,
> +            lock,
> +        }
> +    }
> +
> +    /// Return the task archive lock, consuming `self`.
> +    pub fn into_lock(self) -> TaskCacheLock {
> +        self.lock
> +    }
> +}
> +
> +impl Iterator for &mut TaskArchiveIterator {
> +    type Item = Result<TaskCacheItem, Error>;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        loop {
> +            match &mut self.current {
> +                Some(current) => {
> +                    let next = current.next();
> +                    if next.is_some() {
> +                        return next;
> +                    } else {
> +                        self.current = None;
> +                    }
> +                }
> +                None => 'inner: loop {
> +                    // Returns `None` if no more files are available, stopping iteration.
> +                    let next_file = self.files.next()?;
> +
> +                    match File::open(&next_file) {
> +                        Ok(file) => {
> +                            let archive_reader = BufReader::new(file);
> +                            let archive_iter = ArchiveIterator::new(archive_reader);
> +                            self.current = Some(archive_iter);
> +                            break 'inner;
> +                        }
> +                        Err(err) => {
> +                            log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}")
> +                        }
> +                    }
> +                },
> +            }
> +        }
> +    }
> +}
> +
> +/// Archive file.
> +#[derive(Clone, Debug)]
> +struct ArchiveFile {
> +    /// The path to the archive file.
> +    file: PathBuf,
> +    /// The archive's lowest permitted starttime (seconds since UNIX epoch).
> +    starttime: i64,
> +}
> +
> +/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
> +/// from both iterators sorted.
> +/// The two iterators are expected to be sorted descendingly based on the task's starttime and
> +/// ascendingly based on the task's UPID's string representation. This can be
> +/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
> +pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
> +    left: Peekable<T>,
> +    right: Peekable<U>,
> +}
> +
> +impl<T, U> MergeTaskIterator<T, U>
> +where
> +    T: Iterator<Item = TaskCacheItem>,
> +    U: Iterator<Item = TaskCacheItem>,
> +{
> +    /// Create a new merging iterator.
> +    pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
> +        Self { left, right }
> +    }
> +}
> +
> +impl<T, U> Iterator for MergeTaskIterator<T, U>
> +where
> +    T: Iterator<Item = TaskCacheItem>,
> +    U: Iterator<Item = TaskCacheItem>,
> +{
> +    type Item = T::Item;
> +
> +    fn next(&mut self) -> Option<T::Item> {
> +        let order = match (self.left.peek(), self.right.peek()) {
> +            (Some(l), Some(r)) => Some(compare_tasks(l, r)),
> +            (Some(_), None) => Some(Ordering::Greater),
> +            (None, Some(_)) => Some(Ordering::Less),
> +            (None, None) => None,
> +        };
> +
> +        match order {
> +            Some(Ordering::Greater) => self.left.next(),
> +            Some(Ordering::Less) => self.right.next(),
> +            Some(Ordering::Equal) => {
> +                // Dedup by consuming the other iterator as well
> +                let _ = self.right.next();
> +                self.left.next()
> +            }
> +            None => None,
> +        }
> +    }
> +}
> +
> +/// Iterator for a single task archive file.
> +///
> +/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
> +/// tasks are read line by line, without leading the entire archive file into memory.
> +pub struct ArchiveIterator<T> {
> +    iter: Lines<T>,
> +}
> +
> +impl<T: BufRead> ArchiveIterator<T> {
> +    /// Create a new iterator.
> +    pub fn new(file: T) -> Self {
> +        let reader = file.lines();
> +
> +        Self { iter: reader }
> +    }
> +}
> +
> +impl<T: BufRead> Iterator for ArchiveIterator<T> {
> +    type Item = Result<TaskCacheItem, Error>;
> +
> +    fn next(&mut self) -> Option<Self::Item> {
> +        self.iter.next().map(|result| {
> +            result
> +                .and_then(|line| Ok(serde_json::from_str(&line)?))
> +                .map_err(Into::into)
> +        })
> +    }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> +    use std::io::Cursor;
> +
> +    use crate::test_support::temp::NamedTempDir;
> +
> +    use super::*;
> +
> +    #[test]
> +    fn archive_iterator() -> Result<(), Error> {
> +        let file = r#"
> +            {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> +            {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> +            invalid"#
> +            .trim_start();
> +
> +        let cursor = Cursor::new(file.as_bytes());
> +        let mut iter = ArchiveIterator::new(cursor);
> +
> +        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
> +        assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
> +        assert!(iter.next().unwrap().is_err());
> +        assert!(iter.next().is_none());
> +
> +        Ok(())
> +    }
> +
> +    fn task(starttime: i64, ended: bool) -> TaskCacheItem {
> +        let (status, endtime) = if ended {
> +            (Some("OK".into()), Some(starttime + 10))
> +        } else {
> +            (None, None)
> +        };
> +
> +        TaskCacheItem {
> +            upid: format!(
> +                "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root at pam:"
> +            )
> +            .parse()
> +            .unwrap(),
> +            starttime,
> +            status,
> +            endtime,
> +        }
> +    }
> +
> +    fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) {
> +        let tasks: Vec<i64> = cache
> +            .get_tasks(GetTasks::All)
> +            .unwrap()
> +            .map(|task| task.unwrap().starttime)
> +            .collect();
> +
> +        assert_eq!(&tasks, starttimes);
> +    }
> +
> +    fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
> +        let param = AddTasks {
> +            update_most_recent_archive_timestamp: true,
> +            tasks,
> +        };
> +        let mut a = HashMap::new();
> +        a.insert("pve-remote".into(), param);
> +
> +        cache.add_tasks(a)
> +    }
> +
> +    #[test]
> +    fn test_add_tasks() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> +        cache.new_file(1000)?;
> +        assert_eq!(cache.archive_files()?.len(), 1);
> +
> +        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> +        cache.rotate(1500, 0, 3)?;
> +        assert_eq!(cache.archive_files()?.len(), 2);
> +
> +        add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
> +        add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
> +
> +        cache.rotate(2000, 0, 3)?;
> +        assert_eq!(cache.archive_files()?.len(), 3);
> +
> +        add_tasks(&cache, vec![task(2000, true)])?;
> +        add_tasks(&cache, vec![task(1502, true)])?;
> +        add_tasks(&cache, vec![task(1002, true)])?;
> +
> +        // These are before the cut-off of 1000, so they will be discarded.
> +        add_tasks(&cache, vec![task(800, true), task(900, true)])?;
> +
> +        // This one should be deduped
> +        add_tasks(&cache, vec![task(1000, true)])?;
> +
> +        assert_starttimes(
> +            &cache,
> +            &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
> +        );
> +
> +        cache.rotate(2500, 0, 3)?;
> +
> +        assert_eq!(cache.archive_files()?.len(), 3);
> +
> +        assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
> +
> +        cache.rotate(3000, 0, 3)?;
> +        assert_eq!(cache.archive_files()?.len(), 3);
> +
> +        assert_starttimes(&cache, &[2000]);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> +        cache.new_file(1000)?;
> +        add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> +        let state = cache.read_state();
> +        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000);
> +
> +        add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> +        assert_starttimes(&cache, &[1001, 1000]);
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn test_init() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> +        cache.init(1000, 3, 100)?;
> +        assert_eq!(cache.archive_files()?.len(), 3);
> +
> +        add_tasks(
> +            &cache,
> +            vec![
> +                task(1050, true),
> +                task(950, true),
> +                task(850, true),
> +                task(750, true), // This one is discarded
> +            ],
> +        )?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
> +
> +        Ok(())
> +    }
> +
> +    #[test]
> +    fn test_tracking_tasks() -> Result<(), Error> {
> +        let tmp_dir = NamedTempDir::new()?;
> +        let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> +        cache.init(1000, 3, 100)?;
> +
> +        cache.add_tracked_task(task(1050, false))?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> +        cache.add_tracked_task(task(1060, false))?;
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> +        let state = cache.read_state();
> +        assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2);
> +        assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050);
> +
> +        // Mark first task as finished
> +        add_tasks(&cache, vec![task(1050, true)])?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
> +
> +        // Mark second task as finished
> +        add_tasks(&cache, vec![task(1060, true)])?;
> +
> +        assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> +        assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
> +
> +        Ok(())
> +    }
> +}
> -- 
> 2.39.5




More information about the pdm-devel mailing list