[pbs-devel] [PATCH proxmox-backup v3 3/7] api: admin: run configured sync jobs when a datastore is mounted

Hannes Laimer h.laimer at proxmox.com
Wed Jun 4 14:30:50 CEST 2025


When a datastore is mounted, spawn a new task to run all sync jobs
marked with `run-on-mount`. These jobs run sequentially and include
any job for which the mounted datastore is:

- The source or target in a local push/pull job
- The source in a push job to a remote datastore
- The target in a pull job from a remote datastore

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/api2/admin/datastore.rs | 106 ++++++++++++++++++++++++++++++++++--
 1 file changed, 100 insertions(+), 6 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 39249448..68bb2a1f 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -42,8 +42,8 @@ use pbs_api_types::{
     DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
     GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
     MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
-    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
+    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
     IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
     PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
@@ -66,7 +66,7 @@ use pbs_datastore::{
     DataStore, LocalChunkReader, StoreProgress,
 };
 use pbs_tools::json::required_string_param;
-use proxmox_rest_server::{formatter, WorkerTask};
+use proxmox_rest_server::{formatter, worker_is_active, WorkerTask};
 
 use crate::api2::backup::optional_ns_param;
 use crate::api2::node::rrd::create_value_from_rrd;
@@ -2510,6 +2510,63 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
     Ok(())
 }
 
+async fn do_sync_jobs(
+    jobs_to_run: Vec<SyncJobConfig>,
+    worker: Arc<WorkerTask>,
+) -> Result<(), Error> {
+    let count = jobs_to_run.len();
+    info!(
+        "will run {} sync jobs: {}",
+        count,
+        jobs_to_run
+            .iter()
+            .map(|job| job.id.as_str())
+            .collect::<Vec<&str>>()
+            .join(", ")
+    );
+
+    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
+        if worker.abort_requested() {
+            bail!("aborted due to user request");
+        }
+        let job_id = &job_config.id;
+        let client = crate::client_helpers::connect_to_localhost()?;
+        let Ok(result) = client
+            .post(format!("api2/json/admin/sync/{job_id}/run").as_str(), None)
+            .await
+        else {
+            warn!("unable to start sync job {job_id}");
+            continue;
+        };
+        info!("[{}/{count}] starting '{job_id}'...", i + 1);
+        let Some(upid_str) = &result["data"].as_str() else {
+            warn!(
+                "could not recieve UPID of started job (may be runnig, just can't track it here)"
+            );
+            continue;
+        };
+        let upid: UPID = upid_str.parse()?;
+
+        let sleep_duration = core::time::Duration::new(1, 0);
+        let mut status_retries = 1;
+        loop {
+            if worker.abort_requested() {
+                bail!("aborted due to user request, already started job will finish");
+            }
+            match worker_is_active(&upid).await {
+                Ok(true) => tokio::time::sleep(sleep_duration).await,
+                Ok(false) => break,
+                Err(_) if status_retries > 3 => break,
+                Err(err) => {
+                    warn!("could not get job status: {err} ({}/3)", status_retries);
+                    status_retries += 1;
+                }
+            }
+        }
+    }
+    Ok(())
+}
+
 #[api(
     protected: true,
     input: {
@@ -2541,12 +2598,49 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid = WorkerTask::new_thread(
+    let upid = WorkerTask::spawn(
         "mount-device",
-        Some(store),
+        Some(store.clone()),
         auth_id.to_string(),
         to_stdout,
-        move |_worker| do_mount_device(datastore),
+        move |_worker| async move {
+            do_mount_device(datastore.clone())?;
+            let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
+                warn!("unable to read sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let Ok(list) = sync_config.convert_to_typed_array("sync") else {
+                warn!("unable to parse sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let jobs_to_run: Vec<SyncJobConfig> = list
+                .into_iter()
+                .filter(|job: &SyncJobConfig| {
+                    // add job iff (running on mount is enabled and) any of these apply
+                    //   - the jobs is local and we are source or target
+                    //   - we are the source of a push to a remote
+                    //   - we are the target of a pull from a remote
+                    //
+                    // `job.store == datastore.name` iff we are the target for pull from remote or we
+                    // are the source for push to remote, therefore we don't have to check for the
+                    // direction of the job.
+                    job.run_on_mount.unwrap_or(false)
+                        && (job.remote.is_none() && job.remote_store == datastore.name
+                            || job.store == datastore.name)
+                })
+                .collect();
+            if !jobs_to_run.is_empty() {
+                info!("starting {} sync jobs", jobs_to_run.len());
+                let _ = WorkerTask::spawn(
+                    "mount-sync-jobs",
+                    Some(store),
+                    auth_id.to_string(),
+                    false,
+                    move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
+                );
+            }
+            Ok(())
+        },
     )?;
 
     Ok(json!(upid))
-- 
2.39.5





More information about the pbs-devel mailing list