[pdm-devel] [PATCH proxmox-datacenter-manager v5 1/6] remote tasks: implement improved cache for remote tasks

Lukas Wagner l.wagner at proxmox.com
Thu Jul 3 10:05:42 CEST 2025


On  2025-05-14 16:08, Dominik Csapak wrote:
> you write below that you use the oldest active task as a cutoff
> (i guess in a later patch), but that has maybe some bad consequences:
> 
> if there is a long running task (e.g. multiple days; not that uncommon
> for e.g. pbs), we'd revisit old tasks over and over again for that
> remote, hitting the task archive on disk again every time.
> 

Yes, I think you are right, my approach is a bit flawed for tasks running for a very long time.
I chose this approach because it simplifies the overall fetching of finished and polling of tracked
tasks. But now with your input and some time passed since I implemented this, I can see
why this might be problematic.
I guess I'll go back to polling tracked tasks individually. I just now dug through
the /nodes/<node>/tasks/<upid>/status API implementation and was pleasently suprised
how efficient that endpoint is - so I guess it should be no problem at all to
keep hitting that one repeatedly, even if there is a larger number of running, tracked tasks for a
single node.

> could it maybe make more sense to either:
> * just request finished tasks so we don't have to deal with running
>   tasks we did not track
> * track the active tasks that got synced, maybe with a lower
>   interval than the ones we started?
> 
> This way we can avoid hitting the on disk task list (even if
> it's just read calls) for remotes with long running tasks.
> 
> What do you think?
> 

I think for now I'd tend towards the first option, only requesting finished tasks.
The second one, tracking 'foreign' active tasks with some intermediate polling
interval can always be added later.

> 
> Also some comments inline
> 
> On 5/12/25 13:41, Lukas Wagner wrote:
>> This commit adds a new implementation for a cache for remote tasks, one
>> that should improve performance characteristics in for pretty much all
>> use cases.
>>
>> In general storage works pretty similar to the task archive we already
>> have for (local) PDM tasks.
>>
>> root at pdm-dev:/var/cache/proxmox-datacenter-manager/remote-tasks# ls -l
>> total 40
>> -rw-r--r-- 1 www-data www-data     0 Mar 13 13:18 active
>> -rw-r--r-- 1 www-data www-data  1676 Mar 11 14:51 archive.1741355462
>> -rw-r--r-- 1 www-data www-data     0 Mar 11 14:51 archive.1741441862
>> -rw-r--r-- 1 www-data www-data  2538 Mar 11 14:51 archive.1741528262
>> -rw-r--r-- 1 www-data www-data  8428 Mar 11 15:07 archive.1741614662
>> -rw-r--r-- 1 www-data www-data 11740 Mar 13 10:18 archive.1741701062
>> -rw-r--r-- 1 www-data www-data  3364 Mar 13 13:18 archive.1741788270
>> -rw-r--r-- 1 www-data www-data   287 Mar 13 13:18 state
>>
>> Tasks are stored in the 'active' and multiple 'archive' files. Running
>> tasks are placed into the 'active' file, tasks that are finished are
>> persisted into one of the archive files. The archive files are suffixed
>> with a UNIX timestamp which serves as a lower-bound for start times for
>> tasks stored in this file. Encoding this lower-bound in the file name
>> instead of using a more traditional increasing file index (.1, .2, .3)
>> gives us the benefit that we can decide in which file a newly arrived
>> tasks belongs from a single readdir call, without even reading the file
>> itself.
>>
>> The size of the entire archive can be controlled by doing file
>> 'rotation', where the archive file with the oldest timestamp is deleted
>> and a new file with the current timestamp is added. If 'rotation'
>> happens at fixed intervals (e.g. once daily), this gives us a
>> configurable number of days of task history. There is no direct cap for
>> the total number of tasks, but this could be easily added by checking
>> the size of the most recent archive file and by rotating early if a
>> threshold is exceeded.
>>
>> The format inside the files is also similar to the local task archive,
>> with one task corresponding to one line in the file. One key difference
>> is that here each line is a JSON object, that is to make it easier to
>> add additional data later, if needed. The JSON object contains the tasks
>> UPID, status, endtime and starttime (the starttime is technically also
>> encoded in the UPID, but having it as a separate value simplified a
>> couple of things) Each file is sorted by the task's start time, the
>> youngest task coming first.
>>
>> One key difference between this task cache and the local task archive is
>> that we need to handle tasks coming in out-of-order, e.g. if remotes
>> were not reachable for a certain time. To maintain the ordering in the
>> file, we have to merge the newly arrived tasks into the existing task
>> file. This was implemented in a way that avoids reading the entire file
>> into memory at once, exploiting the fact that the contents of the
>> existing file are already sorted. This allows to do a zipper/merge-sort
>> like approach (see MergeTaskIterator for details). The existing file is
>> only read line-by-line and finally replaced atomically.
>>
>> The cache also has a separate state file, containing additional
>> information, e.g. cut-off timestamps for the polling task. Some of the
>> data in the statefile is technically also contained in the archive
>> files, but reading the state file is much faster.
>>
>> This commit also adds an elaborate suite of unit tests for this new
>> cache. While adding some additional work up front, they paid off
>> themselves during development quite quickly, since the overall approach
>> for the cache changed a couple of times. The test suite gave me
>> confidence that the design changes didn't screw anything up, catching a
>> couple of bugs in the process.
>>
>> Finally, some concrete numbers about the performance improvments.
>> Benchmarking was done using the 'fake-remote' feature. There were 100
>> remotes, 10 PVE nodes per remote. The task cache contained
>> about 1.5 million tasks.
>>                                                before         after
>> list of active tasks (*):                     ~1.3s         ~300µs
>> list of 500 tasks, offset 0 (**):             ~1.3s        ~1450µs
>> list of 500 tasks, offset 1 million (***):    ~1.3s         ~175ms
>> Size on disk:                                 ~500MB        ~200MB
>>
>> (*):  Requested by the UI every 3s
>> (**): Requested by the UI when visiting Remotes > Tasks
>> (***): E.g. when scrolling towards the bottom of 'Remotes > Tasks'
>>
>> In the old implementation, the archive file was *always* fully
>> deserialized and loaded into RAM, this is the reason why the time needed
>> is pretty idential for all scenarios. The new implementation reads the
>> archive files only line by line, and only 500 tasks were loaded into RAM
>> at the same time. The higher the offset, the more archive lines/files we
>> have to scan, which increases the time needed to access the data. The
>> tasks are sorted descending by starttime, as a result the requests get
>> slower the further you go back in history.
>>
>> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
>> ---
>>
>> Notes:
>>      Changes since v3:
>>        - Included benchmark results in the commit message
>>        - Drop `pub` for TaskCache::write_state
>>           Changes since v2:
>>        - Incorporate feedback from Wolfang (thx!)
>>           - eliminate one .clone()
>>           - get_tasks_with_lock: directly create iterator over PathBuf
>>             instead of first creating ArchiveFile vec and the mapping
>>           - Fixed TOCTOU race condition (checked Path::exists instead of
>>             trying the actual operation and reacting to ErrorKind::NotFound)
>>           - handle error when replacing 'active' file
>>           - unlink temp file when replacing the archive file did not work
>>           - avoid UTF8 decoding where not really necessary
>>        - Added a couple of .context()/.with_context() for better error
>>          messages
>>        - Extracted the file names into consts
>>        - Fixed some clippy issues (.clone() for CreateOptions - when
>>          this series was written, CreateOptions were not Copy yet)
>>
>>   server/src/remote_tasks/mod.rs        |    2 +
>>   server/src/remote_tasks/task_cache.rs | 1022 +++++++++++++++++++++++++
>>   2 files changed, 1024 insertions(+)
>>   create mode 100644 server/src/remote_tasks/task_cache.rs
>>
>> diff --git a/server/src/remote_tasks/mod.rs b/server/src/remote_tasks/mod.rs
>> index 234ffa76..7c8e31ef 100644
>> --- a/server/src/remote_tasks/mod.rs
>> +++ b/server/src/remote_tasks/mod.rs
>> @@ -18,6 +18,8 @@ use tokio::task::JoinHandle;
>>     use crate::{api::pve, task_utils};
>>   +mod task_cache;
>> +
>>   /// Get tasks for all remotes
>>   // FIXME: filter for privileges
>>   pub async fn get_tasks(max_age: i64, filters: TaskFilters) -> Result<Vec<TaskListItem>, Error> {
>> diff --git a/server/src/remote_tasks/task_cache.rs b/server/src/remote_tasks/task_cache.rs
>> new file mode 100644
>> index 00000000..f8dd821e
>> --- /dev/null
>> +++ b/server/src/remote_tasks/task_cache.rs
>> @@ -0,0 +1,1022 @@
>> +//! Task cache implementation, based on rotating files.
>> +
>> +use std::{
>> +    cmp::Ordering,
>> +    collections::{HashMap, HashSet},
>> +    fs::File,
>> +    io::{BufRead, BufReader, BufWriter, ErrorKind, Lines, Write},
>> +    iter::Peekable,
>> +    path::{Path, PathBuf},
>> +    time::Duration,
>> +};
>> +
>> +use anyhow::{Context, Error};
>> +use proxmox_sys::fs::CreateOptions;
>> +use serde::{Deserialize, Serialize};
>> +
>> +use pdm_api_types::RemoteUpid;
>> +
>> +/// Filename for the file containing running tasks.
>> +const ACTIVE_FILENAME: &str = "active";
>> +/// Filename prefix for archive files.
>> +const ARCHIVE_FILENAME_PREFIX: &str = "archive.";
>> +/// Filename for the state file.
>> +const STATE_FILENAME: &str = "state";
>> +/// Filename of the archive lockfile.
>> +const LOCKFILE_FILENAME: &str = ".lock";
>> +
>> +/// Item which can be put into the task cache.
>> +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
>> +pub struct TaskCacheItem {
>> +    /// The task's UPID
>> +    pub upid: RemoteUpid,
>> +    /// The time at which the task was started (seconds since the UNIX epoch).
>> +    /// Technically this is also contained within the UPID, duplicating it here
>> +    /// allows us to directly access it when sorting in new tasks, without having
>> +    /// to parse the UPID.
>> +    pub starttime: i64,
> 
> instead of saving the starttime here, couldn't we also use the parsed
> UPID struct + remote instead?
> 
> not sure about the memory implications, but then we could avoid things
> like you having to partially move the upid out of a struct and back again
> for removing out of a hashset
> 
> (though it's also fine as it is for me)

I'd rather keep this as is, as it keeps the archive files smaller.

> 
>> +    #[serde(skip_serializing_if = "Option::is_none")]
>> +    /// The task's status.
>> +    pub status: Option<String>,
>> +    #[serde(skip_serializing_if = "Option::is_none")]
>> +    /// The task's endtime (seconds since the UNIX epoch).
>> +    pub endtime: Option<i64>,
>> +}
>> +
>> +/// State needed for task polling.
>> +#[derive(Serialize, Deserialize, Default)]
>> +#[serde(rename_all = "kebab-case")]
>> +pub struct State {
>> +    /// Map of remote -> most recent task starttime (UNIX epoch) in the archive.
>> +    /// This can be used as a cut-off when requesting new task data.
>> +    pub most_recent_archive_starttime: HashMap<String, i64>,
>> +    /// Oldest running task. Useful as another cut-off for fetching tasks.
>> +    /// The timestamp is based on seconds since the UNIX epoch.
>> +    pub oldest_active_task: HashMap<String, i64>,> +    /// Tracked tasks per remote.
>> +    pub tracked_tasks: HashMap<String, HashSet<RemoteUpid>>,
>> +}
> 
> wouldn't it be possible to just save the cutoff time?
> we just have to first increate the time for the finished tasks
> and afterwards reduce it again if there is still an active task?
> 
> also would it be possible to combine th hashmaps into one?
> e.g. HashMap<String, (i64, HashSet<String>)> ?
> 
> that way we would reduce the number of times we save the remote in the file
> which can add up if we have many remotes (e.g. hundreds)
> 
> so instead of
> 
> {
>     "most-recent-archive-starttime": {
>         "remote-1": 123123,
>         "remote-2": 123123,
>     },
>      "oldest_active_task" : {
>         "remote-1": 232323,
>         "remote-2": 232323,
>     },
>     "tracked-tasks": {
>         "remote-1": [
>             "remote-1!<UPID1>",
>             "remote-1!<UPID2>"
>         ]
>     }
> }
> 
> it would be:
> 
> {
>     "state": {
>         "remote-1": [
>             123123,
>             ["<UPID1>", "<UPID2>"]
>         ],
>         "remote-2": [
>             123123,
>             []
>         ]
>     }
> }

Some of your points will not be relevant any more once I change the polling approach as described above,
but I'll see if I can make the state file a bit more 'compact' with the new approach.

> 
>> +
>> +/// Cache for remote tasks.
>> +#[derive(Clone)]
>> +pub struct TaskCache {
>> +    /// Path where the cache's files should be placed.
>> +    base_path: PathBuf,
>> +    /// File permissions for the cache's files.
>> +    create_options: CreateOptions,
>> +}
>> +
>> +/// Tasks that should be added to the cache via [`TaskCache::add_tasks`].
>> +pub struct AddTasks {
>> +    /// Update the remote's entry in the statefile which contains the
>> +    /// timestamp of the most recent *finished* task from the task archive.
>> +    pub update_most_recent_archive_timestamp: bool,
>> +    /// The tasks to add.
>> +    pub tasks: Vec<TaskCacheItem>,
>> +}
>> +
>> +/// Lock for the cache.
>> +#[allow(dead_code)]
>> +pub struct TaskCacheLock(File);
>> +
>> +/// Which tasks to fetch from the archive.
>> +pub enum GetTasks {
>> +    /// Get all tasks, finished and running.
>> +    All,
>> +    /// Only get running (active) tasks.
>> +    Active,
>> +    #[cfg(test)] // Used by tests, might be used by production code in the future
>> +    /// Only get finished (archived) tasks.
>> +    Archived,
>> +}
>> +
>> +impl TaskCache {
>> +    /// Create a new task cache instance.
>> +    ///
>> +    /// Remember to call `init` or `new_file` to create initial archive files before
>> +    /// adding any tasks.
> 
> Not sure if we use this kind of pattern somewhere already, but if you use different
> types for different methods?
> 
> 
> e.g. having a TaskCache and an WritableTaskCache where only the second has
> add_tasks and you can only reach it by using init/new_file ?
> 
> then it cannot happen that add is called falsely
> 
> Same with the .lock() calls
> 
> If there would be a LockedTaskCache that can do everything that you can do with a lock
> there is no need to manually handle the lock anywhere outside this module
> (probably needs two variants, one for reading and one for writing)
> 

Might be a good idea, thanks. I'd prefer to do that in a follow-up series once this is applied, since this is mostly
a stylistic choice.

> 
>> +    pub fn new<P: AsRef<Path>>(path: P, create_options: CreateOptions) -> Result<Self, Error> {
>> +        Ok(Self {
>> +            base_path: path.as_ref().into(),
>> +            create_options,
>> +        })
>> +    }
>> +
>> +    /// Create initial task archives that can be backfilled with the
>> +    /// recent task history from a remote.
>> +    ///
>> +    /// This function only has an effect if there are no archive files yet.
>> +    pub fn init(&self, now: i64, number_of_files: u64, period_per_file: u64) -> Result<(), Error> {
>> +        let _lock = self.lock(true).context("failed to lock archive")?;
>> +
>> +        if self.archive_files()?.is_empty() {
>> +            for i in 0..number_of_files {
>> +                self.new_file(now - (i * period_per_file) as i64)?;
>> +            }
>> +        }
>> +
>> +        Ok(())
>> +    }
>> +
>> +    /// Start a new archive file with a given timestamp.
>> +    /// `now` is supposed to be a UNIX timestamp (seconds).
>> +    fn new_file(&self, now: i64) -> Result<(), Error> {
>> +        let new_path = self
>> +            .base_path
>> +            .join(format!("{ARCHIVE_FILENAME_PREFIX}{now}"));
>> +        let mut file = File::create(&new_path)?;
>> +        self.create_options.apply_to(&mut file, &new_path)?;
>> +
>> +        Ok(())
>> +    }
>> +
>> +    /// Lock the cache.
>> +    fn lock(&self, exclusive: bool) -> Result<TaskCacheLock, Error> {
>> +        let lockfile = self.base_path.join(LOCKFILE_FILENAME);
>> +
>> +        let fd = proxmox_sys::fs::open_file_locked(
>> +            lockfile,
>> +            Duration::from_secs(15),
> 
> 
> just for my curiosity (not a critique): is there a special reason
> why you set the timeout manually to 15 seconds instead of leaving it to
> the function to use the default?
> 
> reason for my question is: if you often ran into timeouts with just 10 seconds (afair the default),
> chances are users in the wild will run into it with 15 seconds too, so we might want to either use
> a *much* bigger timeout, or make it somehow configurable.

No good reason actually, I might have copied that line from somewhere and then didn't change it. At least I cannot
remember any reason why I chose 15 seconds. I'll change it to the default. Thanks!
> 
>> +            exclusive,
>> +            self.create_options,
>> +        )?;
>> +
>> +        Ok(TaskCacheLock(fd))
>> +    }
>> +
>> +    /// Rotate task archive if the the newest archive file is older than `rotate_after`.
>> +    ///
>> +    /// The oldest archive files are removed if the total number of archive files exceeds
>> +    /// `max_files`. `now` is supposed to be a UNIX timestamp (seconds).
>> +    ///
>> +    /// This function requests an exclusive lock, don't call if you already hold a lock.
>> +    pub fn rotate(&self, now: i64, rotate_after: u64, max_files: u64) -> Result<bool, Error> {
>> +        let _lock = self.lock(true).context("failed to lock archive")?;
>> +        let mut did_rotate = false;
>> +
>> +        let mut bounds = self.archive_files()?;
>> +
>> +        match bounds.first() {
>> +            Some(bound) => {
>> +                if now > bound.starttime && now - bound.starttime > rotate_after as i64 {
>> +                    self.new_file(now)?;
>> +                    did_rotate = true;
>> +                }
>> +            }
>> +            None => {
>> +                self.new_file(now)?;
>> +                did_rotate = true;
>> +            }
>> +        }
>> +
>> +        while bounds.len() >= max_files as usize {
>> +            // Unwrap is safe because of the length check above
>> +            let to_remove = bounds.pop().unwrap();
>> +            std::fs::remove_file(&to_remove.file)
>> +                .with_context(|| format!("failed to remove {}", to_remove.file.display()))?;
>> +        }
>> +
>> +        Ok(did_rotate)
>> +    }
>> +
>> +    /// Add new tasks to the task archive.
>> +    ///
>> +    /// Running tasks (tasks without an endtime) are placed into the 'active' file in the
>> +    /// task cache base directory. Finished tasks are sorted into `archive.<startime>` archive
>> +    /// files, where `<starttimes>` denotes the lowest permitted start time timestamp for a given
>> +    /// archive file. If a task which was added as running previously is added again, this time in
>> +    /// a finished state, it will be removed from the `active` file and also sorted into
>> +    /// one of the archive files.
>> +    /// Same goes for the list of tracked tasks; the entry in the state file will be removed.
>> +    ///
>> +    /// Crash consistency:
>> +    ///
>> +    /// The state file, which contains the cut-off timestamps for future task fetching, is updated at the
>> +    /// end after all tasks have been added into the archive. Adding tasks is an idempotent
>> +    /// operation; adding the *same* task multiple times does not lead to duplicated entries in the
>> +    /// task archive. Individual archive files are updated atomically, but since
>> +    /// adding tasks can involve updating multiple archive files, the archive could end up
>> +    /// in a partially-updated, inconsistent state in case of a crash.
>> +    /// However, since the state file with the cut-off timestamps is updated last,
>> +    /// the consistency of the archive should be restored at the next update cycle of the archive.
>> +    pub fn add_tasks(&self, mut added: HashMap<String, AddTasks>) -> Result<(), Error> {
>> +        let lock = self.lock(true).context("failed to lock archive")?;
>> +
>> +        // Get a flat `Vec` of all new tasks
>> +        let tasks: Vec<_> = added
>> +            .iter_mut()
>> +            .flat_map(|(_, tasks)| std::mem::take(&mut tasks.tasks))
>> +            .collect();
>> +
>> +        let update_state_for_remote: HashMap<String, bool> = added
>> +            .into_iter()
>> +            .map(|(remote, add_tasks)| (remote, add_tasks.update_most_recent_archive_timestamp))
>> +            .collect();
>> +
>> +        let mut task_iter = self
>> +            .get_tasks_with_lock(GetTasks::Active, lock)
>> +            .context("failed to create archive iterator for active tasks")?;
>> +
>> +        let mut active_tasks =
>> +            HashSet::from_iter(task_iter.flat_map(|active_task| match active_task {
> 
> i think you want to use 'filter_map' here? even though it seems to do the same thing in
> this instance, i think 'filter_map' better expresses what you want to do

True, filter_map is a better fit. Thanks!

> 
>> +                Ok(task) => Some((task.upid, task.starttime)),
>> +                Err(err) => {
>> +                    log::error!("failed to read task cache entry from active file: {err}");
>> +                    None
>> +                }
>> +            }));
> 
> does it make a big difference that you save the starttime seperately here?
> 
> FWICT, this makes the remove code later a bit more complicated, and
> would only need to be parsed in `update_oldest_active`. Since i don't assume too many concurrent
> active tasks in one set of updates, i don't think the parsing should be very bad here...

Well, the rationale for having the startime in TaskCacheItem as a separate field so that I can sort tasks in the MergeTaskIterator without
parsing the UPID.
I this function here, I deser TaskCacheItems and get the starttime from that, I don't see why I should
discard the info I already have to parse it again from the UPID?
But maybe there is some misunderstanding here.

> 
>> +
>> +        // Consume the iterator to get back the lock. The lock is held
>> +        // until _lock is finally dropped at the end of the function.
>> +        let _lock = task_iter.into_lock();
>> +
>> +        let mut new_finished_tasks = Vec::new();
>> +
>> +        for task in tasks {
>> +            if task.endtime.is_none() {
>> +                active_tasks.insert((task.upid, task.starttime));
>> +            } else {
>> +                new_finished_tasks.push(task);
>> +            }
>> +        }
>> +


-- 
- Lukas





More information about the pdm-devel mailing list