[pbs-devel] [PATCH proxmox-backup v4 2/7] api: admin: run configured sync jobs when a datastore is mounted

Christian Ebner c.ebner at proxmox.com
Thu Jul 17 15:27:52 CEST 2025


On 7/16/25 4:52 PM, Hannes Laimer wrote:
> When a datastore is mounted, spawn a new task to run all sync jobs
> marked with `run-on-mount`. These jobs run sequentially and include
> any job for which the mounted datastore is:
> 
> - The source or target in a local push/pull job

just noticed this now, but there are no local push jobs, so this should 
state pull only.

> - The source in a push job to a remote datastore
> - The target in a pull job from a remote datastore
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> new in v4:
> - warp do_mount_device in .spawn_blocking(|| ...)
> 
>   src/api2/admin/datastore.rs | 115 ++++++++++++++++++++++++++++++++++--
>   1 file changed, 109 insertions(+), 6 deletions(-)
> 
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index e24bc1c1..753772f9 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -44,8 +44,8 @@ use pbs_api_types::{
>       DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
>       GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
>       MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
> -    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> -    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> +    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
> +    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
>       PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
> @@ -68,7 +68,7 @@ use pbs_datastore::{
>       DataStore, LocalChunkReader, StoreProgress,
>   };
>   use pbs_tools::json::required_string_param;
> -use proxmox_rest_server::{formatter, WorkerTask};
> +use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
>   
>   use crate::api2::backup::optional_ns_param;
>   use crate::api2::node::rrd::create_value_from_rrd;
> @@ -2495,6 +2495,63 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
>       Ok(())
>   }
>   
> +async fn do_sync_jobs(
> +    jobs_to_run: Vec<SyncJobConfig>,
> +    worker: Arc<WorkerTask>,
> +) -> Result<(), Error> {
> +    let count = jobs_to_run.len();
> +    info!(
> +        "will run {count} sync jobs: {}",
> +        jobs_to_run
> +            .iter()
> +            .map(|job| job.id.as_str())
> +            .collect::<Vec<&str>>()
> +            .join(", ")
> +    );
> +
> +    let client = crate::client_helpers::connect_to_localhost()
> +        .with_context(|| format!("Failed to connect to localhost for starting sync jobs"))?;

nit: this contains a 'static &str only, so no need for the string 
instantiation via format! and the with_context.

Should be a `.context("Failed to connect to localhost for starting sync 
jobs")?;`

> +    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
> +        if worker.abort_requested() {
> +            bail!("aborted due to user request");
> +        }
> +        let job_id = &job_config.id;
> +        let Ok(result) = client
> +            .post(format!("api2/json/admin/sync/{job_id}/run").as_str(), None)
> +            .await
> +        else {
> +            warn!("unable to start sync job {job_id}");

nit: this could include the actual error message with context for ease 
of debugging!

> +            continue;
> +        };
> +        info!("[{}/{count}] starting '{job_id}'...", i + 1);

nit: moving above log output to right before the api call to start the 
job makes more sense I think.

so all in all:
```
         let job_id = &job_config.id;
         info!("[{}/{count}] starting '{job_id}'...", i + 1);
         let result = match client
  
.post(format!("api2/json/admin/sync/{job_id}/run").as_str(), None)
             .await
         {
             Ok(result) => result,
             Err(err) => {
                 warn!("unable to start sync job {job_id}: {err:#}");
                 continue;
             }
         };
         let Some(upid_str) = result["data"].as_str() else {
             warn!(
                 "could not receive UPID of started job (may be running, 
just can't track it here)"
             );
             continue;
         };
```

> +        let Some(upid_str) = result["data"].as_str() else {
> +            warn!(
> +                "could not receive UPID of started job (may be running, just can't track it here)"
> +            );
> +            continue;
> +        };
> +        let upid: UPID = upid_str.parse()?;
> +
> +        let sleep_duration = core::time::Duration::from_secs(1);
> +        let mut status_retries = 1;
> +        loop {
> +            if worker.abort_requested() {
> +                bail!("aborted due to user request, already started job will finish");
> +            }
> +            match worker_is_active(&upid).await {
> +                Ok(true) => tokio::time::sleep(sleep_duration).await,
> +                Ok(false) => break,
> +                Err(_) if status_retries > 3 => break,
> +                Err(err) => {
> +                    warn!("could not get job status: {err} ({status_retries}/3)");
> +                    status_retries += 1;
> +                }
> +            }
> +        }
> +    }
> +    Ok(())
> +}
> +
>   #[api(
>       protected: true,
>       input: {
> @@ -2526,12 +2583,58 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>   
> -    let upid = WorkerTask::new_thread(
> +    let upid = WorkerTask::spawn(
>           "mount-device",
> -        Some(store),
> +        Some(store.clone()),
>           auth_id.to_string(),
>           to_stdout,
> -        move |_worker| do_mount_device(datastore),
> +        move |_worker| async move {
> +            let name = datastore.name.clone();
> +            let log_context = LogContext::current();
> +            tokio::task::spawn_blocking(|| {
> +                if let Some(log_context) = log_context {
> +                    log_context.sync_scope(|| do_mount_device(datastore))
> +                } else {
> +                    do_mount_device(datastore)
> +                }
> +            })
> +            .await??;
> +            let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
> +                warn!("unable to read sync job config, won't run any sync jobs");
> +                return Ok(());
> +            };
> +            let Ok(list) = sync_config.convert_to_typed_array("sync") else {
> +                warn!("unable to parse sync job config, won't run any sync jobs");
> +                return Ok(());
> +            };
> +            let mut jobs_to_run: Vec<SyncJobConfig> = list
> +                .into_iter()
> +                .filter(|job: &SyncJobConfig| {
> +                    // add job iff (running on mount is enabled and) any of these apply
> +                    //   - the jobs is local and we are source or target
> +                    //   - we are the source of a push to a remote
> +                    //   - we are the target of a pull from a remote
> +                    //
> +                    // `job.store == datastore.name` iff we are the target for pull from remote or we
> +                    // are the source for push to remote, therefore we don't have to check for the
> +                    // direction of the job.
> +                    job.run_on_mount.unwrap_or(false)
> +                        && (job.remote.is_none() && job.remote_store == name || job.store == name)
> +                })
> +                .collect();
> +            jobs_to_run.sort_by(|j1, j2| j1.id.cmp(&j2.id));
> +            if !jobs_to_run.is_empty() {
> +                info!("starting {} sync jobs", jobs_to_run.len());
> +                let _ = WorkerTask::spawn(
> +                    "mount-sync-jobs",
> +                    Some(store),
> +                    auth_id.to_string(),
> +                    false,
> +                    move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
> +                );
> +            }
> +            Ok(())
> +        },
>       )?;
>   
>       Ok(json!(upid))





More information about the pbs-devel mailing list