[pbs-devel] [PATCH proxmox-backup v3 3/7] api: admin: run configured sync jobs when a datastore is mounted

Hannes Laimer h.laimer at proxmox.com
Wed Jul 16 14:14:00 CEST 2025



On 04.07.25 13:30, Christian Ebner wrote:
> a few comments inline.
> 
> On 6/4/25 14:30, Hannes Laimer wrote:
>> When a datastore is mounted, spawn a new task to run all sync jobs
>> marked with `run-on-mount`. These jobs run sequentially and include
>> any job for which the mounted datastore is:
>>
>> - The source or target in a local push/pull job
>> - The source in a push job to a remote datastore
>> - The target in a pull job from a remote datastore
>>
>> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
>> ---
>>   src/api2/admin/datastore.rs | 106 ++++++++++++++++++++++++++++++++++--
>>   1 file changed, 100 insertions(+), 6 deletions(-)
>>
>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>> index 39249448..68bb2a1f 100644
>> --- a/src/api2/admin/datastore.rs
>> +++ b/src/api2/admin/datastore.rs
>> @@ -42,8 +42,8 @@ use pbs_api_types::{
>>       DataStoreConfig, DataStoreListItem, DataStoreMountStatus, 
>> DataStoreStatus,
>>       GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, 
>> KeepOptions, MaintenanceMode,
>>       MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, 
>> SnapshotVerifyState,
>> -    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, 
>> BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
>> -    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, 
>> DATASTORE_SCHEMA,
>> +    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, 
>> BACKUP_NAMESPACE_SCHEMA,
>> +    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, 
>> CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, 
>> MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
>>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, 
>> PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
>>       PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, 
>> UPID, UPID_SCHEMA,
>> @@ -66,7 +66,7 @@ use pbs_datastore::{
>>       DataStore, LocalChunkReader, StoreProgress,
>>   };
>>   use pbs_tools::json::required_string_param;
>> -use proxmox_rest_server::{formatter, WorkerTask};
>> +use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
>>   use crate::api2::backup::optional_ns_param;
>>   use crate::api2::node::rrd::create_value_from_rrd;
>> @@ -2510,6 +2510,63 @@ pub fn do_mount_device(datastore: 
>> DataStoreConfig) -> Result<(), Error> {
>>       Ok(())
>>   }
>> +async fn do_sync_jobs(
>> +    jobs_to_run: Vec<SyncJobConfig>,
>> +    worker: Arc<WorkerTask>,
>> +) -> Result<(), Error> {
>> +    let count = jobs_to_run.len();
>> +    info!(
>> +        "will run {} sync jobs: {}",
>> +        count,
> 
> nit, `count` can be in-lined
> 
>> +        jobs_to_run
>> +            .iter()
>> +            .map(|job| job.id.as_str())
>> +            .collect::<Vec<&str>>()
>> +            .join(", ")
>> +    );
>> +
>> +    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
>> +        if worker.abort_requested() {
>> +            bail!("aborted due to user request");
>> +        }
>> +        let job_id = &job_config.id;
>> +        let client = crate::client_helpers::connect_to_localhost()?;
> 
> nit: Adding some error context here would be good so it is clear that an 
> eventual connection error stems from the client.
> 
> nit: Further, the whole client instantiation can be moved outside of the 
> sync jobs loop, so it does not need to be reconstructed for each loop.
> 
>> +        let Ok(result) = client
>> +            .post(format!("api2/json/admin/sync/{job_id}/ 
>> run").as_str(), None)
>> +            .await
>> +        else {
>> +            warn!("unable to start sync job {job_id}");
>> +            continue;
>> +        };
>> +        info!("[{}/{count}] starting '{job_id}'...", i + 1);
>> +        let Some(upid_str) = &result["data"].as_str() else {
> 
> nit: there is no need for the dereference, as_str() already takes care 
> of that
> 
>> +            warn!(
>> +                "could not recieve UPID of started job (may be 
>> runnig, just can't track it here)"
> 
> nit: 2 typos: should be `receive` and `running`
> 
>> +            );
>> +            continue;
>> +        };
>> +        let upid: UPID = upid_str.parse()?;
>> +
>> +        let sleep_duration = core::time::Duration::new(1, 0);
> 
> nit: this could use `Duration::from_secs(1)` instead, which inherently 
> documents the arguments timeunit scale
> 
>> +        let mut status_retries = 1;
>> +        loop {
>> +            if worker.abort_requested() {
>> +                bail!("aborted due to user request, already started 
>> job will finish");
>> +            }
>> +            match worker_is_active(&upid).await {
>> +                Ok(true) => tokio::time::sleep(sleep_duration).await,
>> +                Ok(false) => break,
>> +                Err(_) if status_retries > 3 => break,
>> +                Err(err) => {
>> +                    warn!("could not get job status: {err} ({}/3)", 
>> status_retries);
> 
> nit: the max retry count could be defined as constant and the same 
> constant used also for the warning output, to guarantee consistency. 
> Further, `status_retries` can be in-lined.
> 
>> +                    status_retries += 1;
>> +                }
>> +            }
>> +        }
>> +    }
>> +    Ok(())
>> +}
>> +
>>   #[api(
>>       protected: true,
>>       input: {
>> @@ -2541,12 +2598,49 @@ pub fn mount(store: String, rpcenv: &mut dyn 
>> RpcEnvironment) -> Result<Value, Er
>>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>> -    let upid = WorkerTask::new_thread(
>> +    let upid = WorkerTask::spawn(
> 
> question: already mentioned last time, but still not sure if I 
> understand why spawn() instead of new_thread() is okay here. After all 
> this would execute the whole mount code on the current thread, and 
> potentially block that? So shouldn't do_mount_device() be non-blocking 
> for this change to be okay?
> 

actually you are right. But we still need the tokio runtime context, and
we don't get that with new_thread(), I think just wrapping the call to
the mount function in a .spawn_blocking(|| ...) should solve this. Will
fix this in v4

>>           "mount-device",
>> -        Some(store),
>> +        Some(store.clone()),
>>           auth_id.to_string(),
>>           to_stdout,
>> -        move |_worker| do_mount_device(datastore),
>> +        move |_worker| async move {
>> +            do_mount_device(datastore.clone())?;
>> +            let Ok((sync_config, _digest)) = 
>> pbs_config::sync::config() else {
>> +                warn!("unable to read sync job config, won't run any 
>> sync jobs");
>> +                return Ok(());
>> +            };
>> +            let Ok(list) = sync_config.convert_to_typed_array("sync") 
>> else {
>> +                warn!("unable to parse sync job config, won't run any 
>> sync jobs");
>> +                return Ok(());
>> +            };
>> +            let jobs_to_run: Vec<SyncJobConfig> = list
>> +                .into_iter()
>> +                .filter(|job: &SyncJobConfig| {
>> +                    // add job iff (running on mount is enabled and) 
>> any of these apply
>> +                    //   - the jobs is local and we are source or target
>> +                    //   - we are the source of a push to a remote
>> +                    //   - we are the target of a pull from a remote
>> +                    //
>> +                    // `job.store == datastore.name` iff we are the 
>> target for pull from remote or we
>> +                    // are the source for push to remote, therefore 
>> we don't have to check for the
>> +                    // direction of the job.
>> +                    job.run_on_mount.unwrap_or(false)
>> +                        && (job.remote.is_none() && job.remote_store 
>> == datastore.name
>> +                            || job.store == datastore.name)
>> +                })
>> +                .collect();
> 
> note: it would make sense to sort the list of sync jobs by their sync id 
> here, so that the execution ordering can be inferred. This should be 
> documented in the user documentation as well.
> >> +            if !jobs_to_run.is_empty() {
>> +                info!("starting {} sync jobs", jobs_to_run.len());
>> +                let _ = WorkerTask::spawn(
>> +                    "mount-sync-jobs",
>> +                    Some(store),
>> +                    auth_id.to_string(),
>> +                    false,
>> +                    move |worker| async move 
>> { do_sync_jobs(jobs_to_run, worker).await },
>> +                );
>> +            }
>> +            Ok(())
>> +        },
>>       )?;
>>       Ok(json!(upid))
> 





More information about the pbs-devel mailing list