[pdm-devel] [PATCH proxmox-datacenter-manager v2 1/4] remote tasks: implement improved cache for remote tasks
Wolfgang Bumiller
w.bumiller at proxmox.com
Wed Apr 16 15:15:55 CEST 2025
looks mostly fine, some minor things noted inline...
On Fri, Apr 11, 2025 at 01:01:14PM +0200, Lukas Wagner wrote:
> This commit adds a new implementation for a cache for remote tasks, one
> that should improve performance characteristics in for pretty much all
> use cases.
>
> In general storage works pretty similar to the task archive we already
> have for (local) PDM tasks.
>
> root at pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
> total 40
> -rw-r--r-- 1 www-data www-data 0 Mar 13 13:18 active
> -rw-r--r-- 1 www-data www-data 1676 Mar 11 14:51 archive.1741355462
> -rw-r--r-- 1 www-data www-data 0 Mar 11 14:51 archive.1741441862
> -rw-r--r-- 1 www-data www-data 2538 Mar 11 14:51 archive.1741528262
> -rw-r--r-- 1 www-data www-data 8428 Mar 11 15:07 archive.1741614662
> -rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
> -rw-r--r-- 1 www-data www-data 3364 Mar 13 13:18 archive.1741788270
> -rw-r--r-- 1 www-data www-data 287 Mar 13 13:18 state
>
> Tasks are stored in the 'active' and multiple 'archive' files.
> Running tasks are placed into the 'active' file, tasks that are finished
> are persisted into one of the archive files.
> The archive files are suffixed with a UNIX timestamp which serves
> as a lower-bound for start times for tasks stored in this file.
> Encoding this lower-bound in the file name instead of using a more
> traditional increasing file index (.1, .2, .3) gives us the benefit
> that we can decide in which file a newly arrived tasks belongs from
> a single readdir call, without even reading the file itself.
>
> The size of the entire archive can be controlled by doing file
> 'rotation', where the archive file with the oldest timestamp is deleted
> and a new file with the current timestamp is added.
> If 'rotation' happens at fixed intervals (e.g. once daily), this gives
> us a configurable number of days of task history.
> There is no direct cap for the total number
> of tasks, but this could be easily added by checking the size of the
> most recent archive file and by rotating early if a threshold is
> exceeded.
>
> The format inside the files is also similar to the local task archive,
> with one task corresponding to one line in the file.
> One key difference is that here each line is a JSON object, that is to
> make it easier to add additional data later, if needed.
> The JSON object contains the tasks UPID, status, endtime and starttime
> (the starttime is technically also encoded in the UPID, but having it as
> a separate value simplified a couple of things)
> Each file is sorted by the task's start time, the youngest task coming first.
>
> One key difference between this task cache and the local task archive is
> that we need to handle tasks coming in out-of-order, e.g. if remotes
> were not reachable for a certain time. To maintain the ordering
> in the file, we have to merge the newly arrived tasks into the existing
> task file. This was implemented in a way that avoids reading the entire
> file into memory at once, exploiting the fact that the contents of the
> existing file are already sorted. This allows to do a
> zipper/merge-sort like approach (see MergeTaskIterator for
> details). The existing file is only read line-by-line and finally
> replaced atomically.
>
> The cache also has a separate state file, containing additional
> information, e.g. cut-off timestamps for the polling task.
> Some of the data in the statefile is technically also contained in the
> archive files, but reading the state file is much faster.
>
> This commit also adds an elaborate suite of unit tests for this new
> cache. While adding some additional work up front, they paid off themselves
> during development quite quickly, since the overall approach for the
> cache changed a couple of times. The test suite gave me confidence that
> the design changes didn't screw anything up, catching a couple of bugs
> in the process.
>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> Reviewed-by: Max Carrara <m.carrara at proxmox.com>
> ---
> server/src/remote_tasks/mod.rs | 2 +
> server/src/remote_tasks/task_cache.rs | 964 ++++++++++++++++++++++++++
> 2 files changed, 966 insertions(+)
> create mode 100644 server/src/remote_tasks/task_cache.rs
>
> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
> index 7ded5408..2062f2b7 100644
> --- a/server/src/remote_tasks/mod.rs
> +++ b/server/src/remote_tasks/mod.rs
> @@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
>
> use crate::{api::pve, task_utils};
>
> +mod task_cache;
> +
> /// Get tasks for all remotes
> // FIXME: filter for privileges
> pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
> new file mode 100644
> index 00000000..628a9a40
> --- /dev/null
> +++ b/server/src/remote_tasks/task_cache.rs
> @@ -0,0 +1,964 @@
> +//! Task cache implementation, based on rotating files.
> +
> +use std::{
> + cmp::Ordering,
> + collections::{HashMap, HashSet},
> + fs::File,
> + io::{BufRead, BufReader, BufWriter, Lines, Write},
> + iter::Peekable,
> + path::{Path, PathBuf},
> + time::Duration,
> +};
> +
> +use anyhow::Error;
> +use proxmox_sys::fs::CreateOptions;
> +use serde::{Deserialize, Serialize};
> +
> +use pdm_api_types::RemoteUpid;
> +
> +/// Item which can be put into the task cache.
> +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
> +pub struct TaskCacheItem {
> + /// The task's UPID
> + pub upid: RemoteUpid,
> + /// The time at which the task was started (seconds since the UNIX epoch).
> + /// Technically this is also contained within the UPID, duplicating it here
> + /// allows us to directly access it when sorting in new tasks, without having
> + /// to parse the UPID.
> + pub starttime: i64,
> + #[serde(skip_serializing_if = "Option::is_none")]
> + /// The task's status.
> + pub status: Option<String>,
> + #[serde(skip_serializing_if = "Option::is_none")]
> + /// The task's endtime (seconds since the UNIX epoch).
> + pub endtime: Option<i64>,
> +}
> +
> +/// State needed for task polling.
> +#[derive(Serialize, Deserialize, Default)]
> +#[serde(rename_all = "kebab-case")]
> +pub struct State {
> + /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
> + /// This can be used as a cut-off when requesting new task data.
> + pub most_recent_archive_starttime: HashMap<String, i64>,
> + /// Oldest running task. Useful as another cut-off for fetching tasks.
> + /// The timestamp is based on seconds since the UNIX epoch.
> + pub oldest_active_task: HashMap<String, i64>,
> + /// Tracked tasks per remote.
> + pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
> +}
> +
> +/// Cache for remote tasks.
> +#[derive(Clone)]
> +pub struct TaskCache {
> + /// Path where the cache's files should be placed.
> + base_path: PathBuf,
> + /// File permissions for the cache's files.
> + create_options: CreateOptions,
> +}
> +
> +/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
> +pub struct AddTasks {
> + /// Update most recent archived task in state file.
> + pub update_most_recent_archive_timestamp: bool,
> + /// The tasks to add.
> + pub tasks: Vec<TaskCacheItem>,
> +}
> +
> +/// Lock for the cache.
> +#[allow(dead_code)]
> +pub struct TaskCacheLock(File);
> +
> +/// Which tasks to fetch from the archive.
> +pub enum GetTasks {
> + /// Get all tasks, finished and running.
> + All,
> + /// Only get running (active) tasks.
> + Active,
> + /// Only get finished (archived) tasks.
> + #[allow(dead_code)]
> + Archived,
> +}
> +
> +impl TaskCache {
> + /// Create a new task cache instance.
> + ///
> + /// Remember to call `init` or `new_file` to create initial archive files before
> + /// adding any tasks.
> + pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
> + Ok(Self {
> + base_path: path.as_ref().into(),
> + create_options,
> + })
> + }
> +
> + /// Create initial task archives that can be backfilled with the
> + /// recent task history from a remote.
> + ///
> + /// This function only has an effect if there are no archive files yet.
> + pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
> + let _lock = self.lock(true)?;
> +
> + if self.archive_files()?.is_empty() {
> + for i in 0..number_of_files {
> + self.new_file(now - (i * period_per_file) as i64)?;
> + }
> + }
> +
> + Ok(())
> + }
> +
> + /// Start a new archive file with a given timestamp.
> + /// `now` is supposed to be a UNIX timestamp (seconds).
> + fn new_file(&self, now: i64) -> Result<(), Error> {
> + let new_path = self.base_path.join(format!("archive.{now}"));
> + let mut file = File::create(&new_path)?;
> + self.create_options.apply_to(&mut file, &new_path)?;
> +
> + Ok(())
> + }
> +
> + /// Lock the cache.
> + fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
> + let lockfile = self.base_path.join(".lock");
> +
> + let fd = proxmox_sys::fs::open_file_locked(
> + lockfile,
> + Duration::from_secs(15),
> + exclusive,
> + self.create_options.clone(),
> + )?;
> +
> + Ok(TaskCacheLock(fd))
> + }
> +
> + /// Rotate task archive if the the newest archive file is older than `rotate_after`.
> + ///
> + /// The oldest archive files are removed if the total number of archive files exceeds
> + /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
> + ///
> + /// This function requests an exclusive lock, don't call if you already hold a lock.
> + pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
> + let _lock = self.lock(true)?;
> + let mut did_rotate = false;
> +
> + let mut bounds = self.archive_files()?;
> +
> + match bounds.first() {
> + Some(bound) => {
> + if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
> + self.new_file(now)?;
> + did_rotate = true;
> + }
> + }
> + None => {
> + self.new_file(now)?;
> + did_rotate = true;
> + }
> + }
> +
> + while bounds.len() >= max_files as usize {
> + // Unwrap is safe because of the length check above
> + let to_remove = bounds.pop().unwrap();
> + std::fs::remove_file(&to_remove.file)?;
> + }
> +
> + Ok(did_rotate)
> + }
> +
> + /// Add new tasks to the task archive.
> + ///
> + /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
> + /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
> + /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
> + /// archive file. If a task which was added as running previously is added again, this time in
> + /// a finished state, it will be removed from the `active` file and also sorted into
> + /// one of the archive files.
> + /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
> + ///
> + /// Crash consistency:
> + ///
> + /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
> + /// end after all tasks have been added into the archive. Adding tasks is an idempotent
> + /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
> + /// task archive. Individual archive files are updated atomically, but since
> + /// adding tasks can involve updating multiple archive files, the archive could end up
> + /// in a partially-updated, inconsistent state in case of a crash.
> + /// However, since the state file with the cut-off timestamps is updated last,
> + /// the consistency of the archive should be restored at the next update cycle of the archive.
> + pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
> + let lock = self.lock(true)?;
> +
> + // Get a flat `Vec` of all new tasks
> + let tasks: Vec<_> = added
> + .iter_mut()
> + .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
> + .collect();
> +
> + let update_state_for_remote: HashMap<String, bool> = added
> + .into_iter()
> + .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
> + .collect();
> +
> + let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
> +
> + let mut active_tasks =
> + HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
> + Ok(task) => Some((task.upid, task.starttime)),
> + Err(err) => {
> + log::error!("failed to read task cache entry from active file: {err}");
> + None
> + }
> + }));
> +
> + // Consume the iterator to get back the lock. The lock is held
> + // until _lock is finally dropped at the end of the function.
> + let _lock = task_iter.into_lock();
> +
> + active_tasks.extend(
> + tasks
> + .iter()
> + .filter(|task| task.endtime.is_none())
> + .map(|a| (a.upid.clone(), a.starttime)),
> + );
> +
> + let mut state = self.read_state();
> + let mut new_finished_tasks = tasks
> + .into_iter()
> + .filter(|task| task.endtime.is_some())
> + .collect::<Vec<_>>();
> +
> + new_finished_tasks.sort_by(compare_tasks_reverse);
> + self.merge_tasks_into_archive(
> + new_finished_tasks,
> + &mut active_tasks,
> + update_state_for_remote,
> + &mut state,
> + )?;
> + self.update_oldest_active(&active_tasks, &mut state);
> +
> + let mut active: Vec<TaskCacheItem> = active_tasks
> + .into_iter()
> + .map(|(upid, starttime)| TaskCacheItem {
> + upid,
> + starttime,
> + status: None,
> + endtime: None,
> + })
> + .collect();
> +
> + active.sort_by(compare_tasks_reverse);
> + self.write_active_tasks(active.into_iter())?;
> + self.write_state(state)?;
> +
> + Ok(())
> + }
> +
> + /// Update the timestamp of the oldest running task in `state`.
> + fn update_oldest_active(&self, active_tasks: &HashSet<(RemoteUpid, i64)>, state: &mut State) {
> + // Update the state with timestamp of the oldest running task,
> + // we also use this as cut-off when fetching tasks, so we
> + // for sure know when a task finishes.
> + let mut oldest_active_task_per_remote = HashMap::new();
> + for (upid, startime) in active_tasks.iter() {
> + oldest_active_task_per_remote
> + .entry(upid.remote().to_owned())
> + .and_modify(|time| *time = (*startime).min(*time))
> + .or_insert(*startime);
> + }
> + state.oldest_active_task = oldest_active_task_per_remote;
> + }
> +
> + /// Merge a list of *finished* tasks into the remote task archive files.
> + /// The list of task in `tasks` *must* be sorted by their timestamp and UPID (descending by
> + /// timestamp, ascending by UPID).
> + ///
> + /// The task archive should be locked when calling this.
> + fn merge_tasks_into_archive(
> + &self,
> + tasks: Vec<TaskCacheItem>,
> + active_tasks: &mut HashSet<(RemoteUpid, i64)>,
> + update_state_for_remote: HashMap<String, bool>,
> + state: &mut State,
> + ) -> Result<(), Error> {
> + debug_assert!(tasks
> + .iter()
> + .is_sorted_by(|a, b| compare_tasks(a, b).is_ge()));
> +
> + let files = self.archive_files()?;
> +
> + let mut files = files.iter().peekable();
> +
> + let mut current = files.next();
> + let mut next = files.peek();
> +
> + let mut tasks_for_current_file = Vec::new();
> +
> + // Tasks are sorted youngest to oldest (biggest starttime first)
> + for task in tasks {
> + active_tasks.remove(&(task.upid.clone(), task.starttime));
^ As mention off list, rust nowadays actually understands temporary
partial moves, so we could avoid the clone with:
let upid_starttime = (task.upid, task.starttime);
active_tasks.remove(&upid_starttime);
task.upid = upid_starttime.0;
> +
> + if let Some(tracked_tasks) = state.tracked_tasks.get_mut(task.upid.remote()) {
> + tracked_tasks.remove(&task.upid);
> + }
> +
> + if let Some(true) = update_state_for_remote.get(task.upid.remote()) {
> + // Update the most recent startime per remote, the task polling logic uses it as a
> + // cut-off.
> + state
> + .most_recent_archive_starttime
> + .entry(task.upid.remote().to_owned())
> + .and_modify(|time| *time = (task.starttime).max(*time))
> + .or_insert(task.starttime);
> + }
> +
> + // Skip ahead until we have found the correct file.
> + while next.is_some() {
> + if let Some(current) = current {
> + if task.starttime >= current.starttime {
> + break;
> + }
> + // The next entry's cut-off is larger then the task's start time, that means
> + // we want to finalized the current file by merging all tasks that
> + // should be stored in it...
> + self.merge_single_archive_file(
> + std::mem::take(&mut tasks_for_current_file),
> + ¤t.file,
> + )?;
> + }
> +
> + // ... and the the `current` file to the next entry.
> + current = files.next();
> + next = files.peek();
> + }
> +
> + if let Some(current) = current {
> + if task.starttime < current.starttime {
> + continue;
> + }
> + }
> + tasks_for_current_file.push(task);
> + }
> +
> + // Merge tasks for the last file.
> + if let Some(current) = current {
> + self.merge_single_archive_file(tasks_for_current_file, ¤t.file)?;
> + }
> +
> + Ok(())
> + }
> +
> + /// Add a new tracked task.
> + ///
> + /// This will insert the task in the list of tracked tasks in the state file,
> + /// as well as create an entry in the `active` file.
> + ///
> + /// This function will request an exclusive lock for the task cache,
> + /// do not call if you are already holding a lock.
> + pub fn add_tracked_task(&self, task: TaskCacheItem) -> Result<(), Error> {
> + let lock = self.lock(true)?;
> +
> + let mut tasks = Vec::new();
> + let mut task_iter = self.get_tasks_with_lock(GetTasks::Active, lock)?;
> +
> + for task in &mut task_iter {
> + match task {
> + Ok(task) => tasks.push(task),
> + Err(err) => {
> + log::error!(
> + "could not read existing task cache entry from 'active' file: {err}"
> + );
> + }
> + }
> + }
> +
> + tasks.push(task.clone());
> + tasks.sort_by(compare_tasks_reverse);
> +
> + let _lock = task_iter.into_lock();
> +
> + let mut state = self.read_state();
> +
> + state
> + .oldest_active_task
> + .entry(task.upid.remote().to_owned())
> + .and_modify(|a| *a = (*a).min(task.starttime))
> + .or_insert(task.starttime);
> +
> + let tracked_per_remote = state
> + .tracked_tasks
> + .entry(task.upid.remote().to_owned())
> + .or_default();
> + tracked_per_remote.insert(task.upid);
> +
> + self.write_active_tasks(tasks.into_iter())?;
> + self.write_state(state)?;
> +
> + Ok(())
> + }
> +
> + /// Iterate over cached tasks.
> + ///
> + /// This function will request a non-exclusive read-lock, don't call if
> + /// you already hold a lock for this cache. See [`Self::get_tasks_with_lock`].
> + pub fn get_tasks(&self, mode: GetTasks) -> Result<TaskArchiveIterator, Error> {
> + let lock = self.lock(false)?;
> + self.get_tasks_with_lock(mode, lock)
> + }
> +
> + /// Iterate over cached tasks.
> + ///
> + /// This function requires you to pass a lock. If you want to continue to hold the lock
> + /// after iterating, you can consume the iterator by calling
> + /// [`TaskArchiveIterator::into_lock`], yielding the original lock.
> + pub fn get_tasks_with_lock(
> + &self,
> + mode: GetTasks,
> + lock: TaskCacheLock,
> + ) -> Result<TaskArchiveIterator, Error> {
> + match mode {
> + GetTasks::All => {
> + let mut files = vec![ArchiveFile {
> + starttime: 0,
^ this is immediately thrown out
> + file: self.base_path.join("active"),
> + }];
> +
> + let archive_files = self.archive_files()?;
> + files.extend(archive_files);
^ you could do the `.map` here in the extend call instead, that would
shorten the entire match arm to 3 lines.
> +
> + Ok(TaskArchiveIterator::new(
> + Box::new(files.into_iter().map(|pair| pair.file)),
> + lock,
> + ))
> + }
> + GetTasks::Active => {
> + let files = vec![ArchiveFile {
> + starttime: 0,
^ Same here, here we don't even have a reason :)
> + file: self.base_path.join("active"),
> + }];
> + Ok(TaskArchiveIterator::new(
> + Box::new(files.into_iter().map(|pair| pair.file)),
> + lock,
> + ))
> + }
> + GetTasks::Archived => {
> + let files = self.archive_files()?;
> +
> + Ok(TaskArchiveIterator::new(
> + Box::new(files.into_iter().map(|pair| pair.file)),
> + lock,
> + ))
> + }
> + }
> + }
> +
> + /// Write the provided tasks to the 'active' file.
> + ///
> + /// The tasks are first written to a temporary file, which is then used
> + /// to atomically replace the original.
> + fn write_active_tasks(&self, tasks: impl Iterator<Item = TaskCacheItem>) -> Result<(), Error> {
> + let (fd, path) = proxmox_sys::fs::make_tmp_file(
> + self.base_path.join("active"),
> + self.create_options.clone(),
> + )?;
> + let mut fd = BufWriter::new(fd);
> +
> + Self::write_tasks(&mut fd, tasks)?;
> +
> + if let Err(err) = fd.flush() {
> + log::error!("could not flush 'active' file: {err}");
> + }
> + drop(fd);
> +
> + std::fs::rename(path, self.base_path.join("active"))?;
^ if the rename fails we should `unlink(&path)` (and if *that* fails
just log its error)
At some point we need a *stateful* tempfile helper which also gets the
destination path and does the whole rename-or-unlink-log-error dance in
a `.commit()`...
> +
> + Ok(())
> + }
> +
> + /// Read the state file.
> + /// If the state file could not be read or does not exist, the default (empty) state
> + /// is returned.
> + /// A lock is only necessary if the returned state is modified and later passed
> + /// to [`Self::write_state`]. In case of a read-only access no lock is necessary, as
> + /// the state file is always replaced atomically.
> + pub fn read_state(&self) -> State {
> + fn do_read_state(path: &Path) -> Result<State, Error> {
> + let data = proxmox_sys::fs::file_read_optional_string(path)?;
We could also skip the utf-8 check and use `serde_json::from_slice()`
instead...
(since we were concerned about performance?)
> + match data {
> + Some(data) => Ok(serde_json::from_str(&data)?),
> + None => Ok(Default::default()),
> + }
> + }
> +
> + let path = self.base_path.join("state");
> +
> + do_read_state(&path).unwrap_or_else(|err| {
> + log::error!("could not read state file: {err}");
> + Default::default()
> + })
> + }
> +
> + /// Write the state file.
> + /// The task archive should be locked for writing when calling this function.
> + pub fn write_state(&self, state: State) -> Result<(), Error> {
> + let path = self.base_path.join("state");
> +
> + let data = serde_json::to_vec_pretty(&state)?;
> +
> + proxmox_sys::fs::replace_file(path, &data, self.create_options.clone(), true)?;
> +
> + Ok(())
> + }
> +
> + /// Returns a list of existing archive files, together with their respective
> + /// cut-off timestamp. The result is sorted ascending by cut-off timestamp (most recent one
> + /// first).
> + /// The task archive should be locked for reading when calling this function.
> + fn archive_files(&self) -> Result<Vec<ArchiveFile>, Error> {
> + let mut names = Vec::new();
> +
> + for entry in std::fs::read_dir(&self.base_path)? {
> + let entry = entry?;
> +
> + if let Some(endtime) = entry
> + .path()
> + .file_name()
> + .and_then(|s| s.to_str())
> + .and_then(|s| s.strip_prefix("archive."))
> + {
> + match endtime.parse() {
> + Ok(starttime) => {
> + names.push(ArchiveFile {
> + starttime,
> + file: entry.path(),
> + });
> + }
> + Err(err) => log::error!("could not parse archive timestamp: {err}"),
> + }
> + }
> + }
> +
> + names.sort_by_key(|e| -e.starttime);
> +
> + Ok(names)
> + }
> +
> + /// Merge `tasks` with an existing archive file.
> + /// This function assumes that `tasks` and the pre-existing contents of the archive
> + /// file are both sorted descending by startime (most recent tasks come first).
> + /// The task archive must be locked when calling this function.
> + fn merge_single_archive_file(
> + &self,
> + tasks: Vec<TaskCacheItem>,
> + file: &Path,
> + ) -> Result<(), Error> {
> + if tasks.is_empty() {
> + return Ok(());
> + }
> +
> + let (fd, new_path) = proxmox_sys::fs::make_tmp_file(file, self.create_options.clone())?;
> + let mut fd = BufWriter::new(fd);
> +
> + if file.exists() {
^ Bad pattern.
Use `File::open` directly and deal with `err.kind() == NotFound` instead.
> + let archive_reader = BufReader::new(File::open(file)?);
> + let archive_iter = ArchiveIterator::new(archive_reader)
> + .flat_map(|item| match item {
> + Ok(item) => Some(item),
> + Err(err) => {
> + log::error!("could not read task cache item while merging: {err}");
> + None
> + }
> + })
> + .peekable();
> + let task_iter = tasks.into_iter().peekable();
> +
> + Self::write_tasks(&mut fd, MergeTaskIterator::new(archive_iter, task_iter))?;
> + } else {
> + Self::write_tasks(&mut fd, tasks.into_iter())?;
> + }
> +
> + if let Err(err) = fd.flush() {
> + log::error!("could not flush BufWriter for {file:?}: {err}");
> + }
> + drop(fd);
> +
> + if let Err(err) = std::fs::rename(&new_path, file) {
> + log::error!("could not replace archive file {new_path:?}: {err}");
^ as above, should also attempt to unlink(new_path).
> + }
> +
> + Ok(())
> + }
> +
> + /// Write an iterator of [`TaskCacheItem`] to a something that implements [`Write`].
> + /// The individual items are encoded as JSON followed by a newline.
> + /// The task archive should be locked when calling this function.
> + fn write_tasks(
> + writer: &mut impl Write,
> + tasks: impl Iterator<Item = TaskCacheItem>,
> + ) -> Result<(), Error> {
> + for task in tasks {
> + serde_json::to_writer(&mut *writer, &task)?;
> + writeln!(writer)?;
> + }
> +
> + Ok(())
> + }
> +}
> +
> +/// Comparison function for sorting tasks.
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +pub fn compare_tasks(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> + a.starttime
> + .cmp(&b.starttime)
> + .then_with(|| b.upid.to_string().cmp(&a.upid.to_string()))
> +}
> +
> +/// Comparison function for sorting tasks, reversed
> +/// The tasks are compared based on the task's start time, falling
> +/// back to the task's UPID as a secondary criterion in case the
> +/// start times are equal.
> +pub fn compare_tasks_reverse(a: &TaskCacheItem, b: &TaskCacheItem) -> Ordering {
> + compare_tasks(a, b).reverse()
> +}
> +
> +/// Iterator over the task archive.
> +pub struct TaskArchiveIterator {
> + /// Archive files to read.
> + files: Box<dyn Iterator<Item = PathBuf>>,
> + /// Archive iterator we are currently using, if any
> + current: Option<ArchiveIterator<BufReader<File>>>,
> + /// Lock for this archive.
> + lock: TaskCacheLock,
> +}
> +
> +impl TaskArchiveIterator {
> + /// Create a new task archive iterator.
> + pub fn new(files: Box<dyn Iterator<Item = PathBuf>>, lock: TaskCacheLock) -> Self {
> + Self {
> + files,
> + current: None,
> + lock,
> + }
> + }
> +
> + /// Return the task archive lock, consuming `self`.
> + pub fn into_lock(self) -> TaskCacheLock {
> + self.lock
> + }
> +}
> +
> +impl Iterator for &mut TaskArchiveIterator {
> + type Item = Result<TaskCacheItem, Error>;
> +
> + fn next(&mut self) -> Option<Self::Item> {
> + loop {
> + match &mut self.current {
> + Some(current) => {
> + let next = current.next();
> + if next.is_some() {
> + return next;
> + } else {
> + self.current = None;
> + }
> + }
> + None => 'inner: loop {
> + // Returns `None` if no more files are available, stopping iteration.
> + let next_file = self.files.next()?;
> +
> + match File::open(&next_file) {
> + Ok(file) => {
> + let archive_reader = BufReader::new(file);
> + let archive_iter = ArchiveIterator::new(archive_reader);
> + self.current = Some(archive_iter);
> + break 'inner;
> + }
> + Err(err) => {
> + log::error!("could not open {next_file:?} while iteration over task archive files, skipping: {err}")
> + }
> + }
> + },
> + }
> + }
> + }
> +}
> +
> +/// Archive file.
> +#[derive(Clone, Debug)]
> +struct ArchiveFile {
> + /// The path to the archive file.
> + file: PathBuf,
> + /// The archive's lowest permitted starttime (seconds since UNIX epoch).
> + starttime: i64,
> +}
> +
> +/// Iterator that merges two _sorted_ `Iterator<Item = TaskCacheItem>`, returning the items
> +/// from both iterators sorted.
> +/// The two iterators are expected to be sorted descendingly based on the task's starttime and
> +/// ascendingly based on the task's UPID's string representation. This can be
> +/// achieved by using the [`compare_tasks_reverse`] function when sorting an array of tasks.
> +pub struct MergeTaskIterator<T: Iterator<Item = TaskCacheItem>, U: Iterator<Item = TaskCacheItem>> {
> + left: Peekable<T>,
> + right: Peekable<U>,
> +}
> +
> +impl<T, U> MergeTaskIterator<T, U>
> +where
> + T: Iterator<Item = TaskCacheItem>,
> + U: Iterator<Item = TaskCacheItem>,
> +{
> + /// Create a new merging iterator.
> + pub fn new(left: Peekable<T>, right: Peekable<U>) -> Self {
> + Self { left, right }
> + }
> +}
> +
> +impl<T, U> Iterator for MergeTaskIterator<T, U>
> +where
> + T: Iterator<Item = TaskCacheItem>,
> + U: Iterator<Item = TaskCacheItem>,
> +{
> + type Item = T::Item;
> +
> + fn next(&mut self) -> Option<T::Item> {
> + let order = match (self.left.peek(), self.right.peek()) {
> + (Some(l), Some(r)) => Some(compare_tasks(l, r)),
> + (Some(_), None) => Some(Ordering::Greater),
> + (None, Some(_)) => Some(Ordering::Less),
> + (None, None) => None,
> + };
> +
> + match order {
> + Some(Ordering::Greater) => self.left.next(),
> + Some(Ordering::Less) => self.right.next(),
> + Some(Ordering::Equal) => {
> + // Dedup by consuming the other iterator as well
> + let _ = self.right.next();
> + self.left.next()
> + }
> + None => None,
> + }
> + }
> +}
> +
> +/// Iterator for a single task archive file.
> +///
> +/// This iterator implements `Iterator<Item = Result<TaskCacheItem, Error>`. When iterating,
> +/// tasks are read line by line, without leading the entire archive file into memory.
> +pub struct ArchiveIterator<T> {
> + iter: Lines<T>,
> +}
> +
> +impl<T: BufRead> ArchiveIterator<T> {
> + /// Create a new iterator.
> + pub fn new(file: T) -> Self {
> + let reader = file.lines();
> +
> + Self { iter: reader }
> + }
> +}
> +
> +impl<T: BufRead> Iterator for ArchiveIterator<T> {
> + type Item = Result<TaskCacheItem, Error>;
> +
> + fn next(&mut self) -> Option<Self::Item> {
> + self.iter.next().map(|result| {
> + result
> + .and_then(|line| Ok(serde_json::from_str(&line)?))
> + .map_err(Into::into)
> + })
> + }
> +}
> +
> +#[cfg(test)]
> +mod tests {
> + use std::io::Cursor;
> +
> + use crate::test_support::temp::NamedTempDir;
> +
> + use super::*;
> +
> + #[test]
> + fn archive_iterator() -> Result<(), Error> {
> + let file = r#"
> + {"upid":"pve-remote!UPID:pve:00039E4D:002638B8:67B4A9D1:stopall::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> + {"upid":"pbs-remote!UPID:pbs:000002B2:00000158:00000000:674D828C:logrotate::root at pam:","status":"OK","endtime":12345, "starttime": 1234}
> + invalid"#
> + .trim_start();
> +
> + let cursor = Cursor::new(file.as_bytes());
> + let mut iter = ArchiveIterator::new(cursor);
> +
> + assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pve-remote");
> + assert_eq!(iter.next().unwrap().unwrap().upid.remote(), "pbs-remote");
> + assert!(iter.next().unwrap().is_err());
> + assert!(iter.next().is_none());
> +
> + Ok(())
> + }
> +
> + fn task(starttime: i64, ended: bool) -> TaskCacheItem {
> + let (status, endtime) = if ended {
> + (Some("OK".into()), Some(starttime + 10))
> + } else {
> + (None, None)
> + };
> +
> + TaskCacheItem {
> + upid: format!(
> + "pve-remote!UPID:pve:00039E4D:002638B8:{starttime:08X}:stopall::root at pam:"
> + )
> + .parse()
> + .unwrap(),
> + starttime,
> + status,
> + endtime,
> + }
> + }
> +
> + fn assert_starttimes(cache: &TaskCache, starttimes: &[i64]) {
> + let tasks: Vec<i64> = cache
> + .get_tasks(GetTasks::All)
> + .unwrap()
> + .map(|task| task.unwrap().starttime)
> + .collect();
> +
> + assert_eq!(&tasks, starttimes);
> + }
> +
> + fn add_tasks(cache: &TaskCache, tasks: Vec<TaskCacheItem>) -> Result<(), Error> {
> + let param = AddTasks {
> + update_most_recent_archive_timestamp: true,
> + tasks,
> + };
> + let mut a = HashMap::new();
> + a.insert("pve-remote".into(), param);
> +
> + cache.add_tasks(a)
> + }
> +
> + #[test]
> + fn test_add_tasks() -> Result<(), Error> {
> + let tmp_dir = NamedTempDir::new()?;
> + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> + cache.new_file(1000)?;
> + assert_eq!(cache.archive_files()?.len(), 1);
> +
> + add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> + cache.rotate(1500, 0, 3)?;
> + assert_eq!(cache.archive_files()?.len(), 2);
> +
> + add_tasks(&cache, vec![task(1500, true), task(1501, true)])?;
> + add_tasks(&cache, vec![task(1200, true), task(1300, true)])?;
> +
> + cache.rotate(2000, 0, 3)?;
> + assert_eq!(cache.archive_files()?.len(), 3);
> +
> + add_tasks(&cache, vec![task(2000, true)])?;
> + add_tasks(&cache, vec![task(1502, true)])?;
> + add_tasks(&cache, vec![task(1002, true)])?;
> +
> + // These are before the cut-off of 1000, so they will be discarded.
> + add_tasks(&cache, vec![task(800, true), task(900, true)])?;
> +
> + // This one should be deduped
> + add_tasks(&cache, vec![task(1000, true)])?;
> +
> + assert_starttimes(
> + &cache,
> + &[2000, 1502, 1501, 1500, 1300, 1200, 1002, 1001, 1000],
> + );
> +
> + cache.rotate(2500, 0, 3)?;
> +
> + assert_eq!(cache.archive_files()?.len(), 3);
> +
> + assert_starttimes(&cache, &[2000, 1502, 1501, 1500]);
> +
> + cache.rotate(3000, 0, 3)?;
> + assert_eq!(cache.archive_files()?.len(), 3);
> +
> + assert_starttimes(&cache, &[2000]);
> +
> + Ok(())
> + }
> +
> + #[test]
> + fn test_active_tasks_are_migrated_to_archive() -> Result<(), Error> {
> + let tmp_dir = NamedTempDir::new()?;
> + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> + cache.new_file(1000)?;
> + add_tasks(&cache, vec![task(1000, false), task(1001, false)])?;
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> + let state = cache.read_state();
> + assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1000);
> +
> + add_tasks(&cache, vec![task(1000, true), task(1001, true)])?;
> +
> + assert_starttimes(&cache, &[1001, 1000]);
> +
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> +
> + Ok(())
> + }
> +
> + #[test]
> + fn test_init() -> Result<(), Error> {
> + let tmp_dir = NamedTempDir::new()?;
> + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> + cache.init(1000, 3, 100)?;
> + assert_eq!(cache.archive_files()?.len(), 3);
> +
> + add_tasks(
> + &cache,
> + vec![
> + task(1050, true),
> + task(950, true),
> + task(850, true),
> + task(750, true), // This one is discarded
> + ],
> + )?;
> +
> + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 3);
> +
> + Ok(())
> + }
> +
> + #[test]
> + fn test_tracking_tasks() -> Result<(), Error> {
> + let tmp_dir = NamedTempDir::new()?;
> + let cache = TaskCache::new(tmp_dir.path(), CreateOptions::new()).unwrap();
> +
> + cache.init(1000, 3, 100)?;
> +
> + cache.add_tracked_task(task(1050, false))?;
> +
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> + cache.add_tracked_task(task(1060, false))?;
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 2);
> +
> + let state = cache.read_state();
> + assert_eq!(state.tracked_tasks.get("pve-remote").unwrap().len(), 2);
> + assert_eq!(*state.oldest_active_task.get("pve-remote").unwrap(), 1050);
> +
> + // Mark first task as finished
> + add_tasks(&cache, vec![task(1050, true)])?;
> +
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 1);
> + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 1);
> +
> + // Mark second task as finished
> + add_tasks(&cache, vec![task(1060, true)])?;
> +
> + assert_eq!(cache.get_tasks(GetTasks::Active)?.count(), 0);
> + assert_eq!(cache.get_tasks(GetTasks::Archived)?.count(), 2);
> +
> + Ok(())
> + }
> +}
> --
> 2.39.5
More information about the pdm-devel
mailing list