[pbs-devel] [PATCH proxmox-backup v2 4/8] api: admin: run configured sync jobs when a datastore is mounted
Christian Ebner
c.ebner at proxmox.com
Fri May 30 12:02:36 CEST 2025
please see some comments inline
On 5/15/25 14:41, Hannes Laimer wrote:
> When a datastore is mounted, spawn a new task to run all sync jobs
> marked with `run-on-mount`. These jobs run sequentially and include
> any job for which the mounted datastore is:
>
> - The source or target in a local push/pull job
> - The source in a push job to a remote datastore
> - The target in a pull job from a remote datastore
>
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> src/api2/admin/datastore.rs | 91 +++++++++++++++++++++++++++++++++++--
> src/api2/admin/sync.rs | 2 +-
> src/server/sync.rs | 7 +--
> 3 files changed, 91 insertions(+), 9 deletions(-)
>
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 392494488..8463adb6a 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -42,8 +42,8 @@ use pbs_api_types::{
> DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
> GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
> MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
> - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
> + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
> PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
> @@ -2510,6 +2510,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
> Ok(())
> }
>
> +async fn do_sync_jobs(
> + jobs_to_run: Vec<SyncJobConfig>,
> + worker: Arc<WorkerTask>,
> +) -> Result<(), Error> {
> + let count = jobs_to_run.len();
> + info!(
> + "will run {} sync jobs: {}",
> + count,
> + jobs_to_run
> + .iter()
> + .map(|j| j.id.clone())
> + .collect::<Vec<String>>()
> + .join(", ")
> + );
nit:
above can be rewritten without cloning the job ids and `count` in-lined as:
```
info!(
"will run {count} sync jobs: {}",
jobs_to_run
.iter()
.map(|sync_job_config| sync_job_config.id.as_str())
.collect::<Vec<&str>>()
.join(", ")
);
```
> +
> + for (i, job_config) in jobs_to_run.into_iter().enumerate() {
> + if worker.abort_requested() {
> + bail!("aborted due to user request");
> + }
> + let job_id = job_config.id.clone();
> + let Ok(job) = Job::new("syncjob", &job_id) else {
nit: this should log an error/warning and the status progress, as this
will fail if the job lock cannot be acquired. That is something which is
of interest for debugging.
> + continue;
> + };
> + let auth_id = Authid::root_auth_id().clone();
nit: auth_id does not need to be cloned here ...
> + info!("[{}/{count}] starting '{job_id}'...", i + 1);
> + match crate::server::do_sync_job(
> + job,
> + job_config,
> + &auth_id,
... since only passed as reference here.
> + Some("mount".to_string()),
> + false,
> + ) {
> + Ok((_upid, handle)) => {
> + tokio::select! {
> + sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); },
question: should this be logged as error rather than a warning ...
> + _abort = worker.abort_future() => bail!("aborted due to user request"),
> + };
> + }
> + Err(err) => warn!("unable to start sync job {job_id} - {err}"),
... same here?
> + }
> + }
> +
> + Ok(())
> +}
> +
> #[api(
> protected: true,
> input: {
> @@ -2541,12 +2586,48 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>
> - let upid = WorkerTask::new_thread(
> + let upid = WorkerTask::spawn(
question: is it okay to run this on the same thread here?
`do_mount_device` does some blocking calls after all?
> "mount-device",
> - Some(store),
> + Some(store.clone()),
> auth_id.to_string(),
> to_stdout,
> - move |_worker| do_mount_device(datastore),
> + move |_worker| async move {
> + do_mount_device(datastore.clone())?;
> + let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
> + warn!("unable to read sync job config, won't run any sync jobs");
> + return Ok(());
> + };
> + let Ok(list) = sync_config.convert_to_typed_array("sync") else {
> + warn!("unable to parse sync job config, won't run any sync jobs");
> + return Ok(());
> + };
> + let jobs_to_run: Vec<SyncJobConfig> = list
> + .into_iter()
> + .filter(|job: &SyncJobConfig| {
> + // add job iff (running on mount is enabled and) any of these apply
> + // - the jobs is local and we are source or target
> + // - we are the source of a push to a remote
> + // - we are the target of a pull from a remote
> + //
> + // `job.store == datastore.name` iff we are the target for pull from remote or we
> + // are the source for push to remote, therefore we don't have to check for the
> + // direction of the job.
> + job.run_on_mount.unwrap_or(false)
> + && (job.remote.is_none() && job.remote_store == datastore.name
> + || job.store == datastore.name)
> + })
> + .collect();
> + if !jobs_to_run.is_empty() {
comment: an additional log info to the mount task log would be nice, so
one sees from it as well that some sync jobs were triggered.
> + let _ = WorkerTask::spawn(
> + "mount-sync-jobs",
> + Some(store),
> + auth_id.to_string(),
> + false,
> + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
> + );
> + }
> + Ok(())
> + },
comment: all of above is executed within a api endpoint flagged as
protected! This however leads to the sync job to be executed by the
proxmox-backup-proxy, all the synced contents therefore written and
owner by the root user instead of the backup user.
> )?;
>
> Ok(json!(upid))
> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
> index 6722ebea0..01dea5126 100644
> --- a/src/api2/admin/sync.rs
> +++ b/src/api2/admin/sync.rs
> @@ -161,7 +161,7 @@ pub fn run_sync_job(
>
> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>
> - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
> + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
>
> Ok(upid_str)
> }
> diff --git a/src/server/sync.rs b/src/server/sync.rs
> index 09814ef0c..c45a8975e 100644
> --- a/src/server/sync.rs
> +++ b/src/server/sync.rs
> @@ -12,6 +12,7 @@ use futures::{future::FutureExt, select};
> use hyper::http::StatusCode;
> use pbs_config::BackupLockGuard;
> use serde_json::json;
> +use tokio::task::JoinHandle;
> use tracing::{info, warn};
>
> use proxmox_human_byte::HumanByte;
> @@ -598,7 +599,7 @@ pub fn do_sync_job(
> auth_id: &Authid,
> schedule: Option<String>,
> to_stdout: bool,
> -) -> Result<String, Error> {
> +) -> Result<(String, JoinHandle<()>), Error> {
> let job_id = format!(
> "{}:{}:{}:{}:{}",
> sync_job.remote.as_deref().unwrap_or("-"),
> @@ -614,7 +615,7 @@ pub fn do_sync_job(
> bail!("can't sync to same datastore");
> }
>
> - let upid_str = WorkerTask::spawn(
> + let (upid_str, handle) = WorkerTask::spawn_with_handle(
> &worker_type,
> Some(job_id.clone()),
> auth_id.to_string(),
> @@ -730,7 +731,7 @@ pub fn do_sync_job(
> },
> )?;
>
> - Ok(upid_str)
> + Ok((upid_str, handle))
> }
>
> pub(super) fn ignore_not_verified_or_encrypted(
More information about the pbs-devel
mailing list