[pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted

Hannes Laimer h.laimer at proxmox.com
Thu Jan 16 07:45:39 CET 2025


When a datastore is mounted, spawn a new task to run all sync jobs
marked with `run-on-mount`. These jobs run sequentially and include
any job for which the mounted datastore is:

- The source or target in a local push/pull job
- The source in a push job to a remote datastore
- The target in a pull job from a remote datastore

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++---
 src/api2/admin/sync.rs      |  2 +-
 src/server/sync.rs          |  7 +--
 3 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f5d80d610..21b58391d 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -41,8 +41,8 @@ use pbs_api_types::{
     DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
     GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
     MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
-    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
+    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
     IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
     PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
@@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
     Ok(())
 }
 
+async fn do_sync_jobs(
+    jobs_to_run: Vec<SyncJobConfig>,
+    worker: Arc<WorkerTask>,
+) -> Result<(), Error> {
+    let count = jobs_to_run.len();
+    info!(
+        "will run {} sync jobs: {}",
+        count,
+        jobs_to_run
+            .iter()
+            .map(|j| j.id.clone())
+            .collect::<Vec<String>>()
+            .join(", ")
+    );
+
+    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
+        if worker.abort_requested() {
+            bail!("aborted due to user request");
+        }
+        let job_id = job_config.id.clone();
+        let Ok(job) = Job::new("syncjob", &job_id) else {
+            continue;
+        };
+        let auth_id = Authid::root_auth_id().clone();
+        info!("[{}/{count}] starting '{job_id}'...", i + 1);
+        match crate::server::do_sync_job(
+            job,
+            job_config,
+            &auth_id,
+            Some("mount".to_string()),
+            false,
+        ) {
+            Ok((_upid, handle)) => {
+                tokio::select! {
+                    sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); },
+                    _abort = worker.abort_future() => bail!("aborted due to user request"),
+                };
+            }
+            Err(err) => warn!("unable to start sync job {job_id} - {err}"),
+        }
+    }
+
+    Ok(())
+}
+
 #[api(
     protected: true,
     input: {
@@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid = WorkerTask::new_thread(
+    let upid = WorkerTask::spawn(
         "mount-device",
-        Some(store),
+        Some(store.clone()),
         auth_id.to_string(),
         to_stdout,
-        move |_worker| do_mount_device(datastore),
+        move |_worker| async move {
+            do_mount_device(datastore.clone())?;
+            let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
+                warn!("unable to read sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let Ok(list) = sync_config.convert_to_typed_array("sync") else {
+                warn!("unable to parse sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let jobs_to_run: Vec<SyncJobConfig> = list
+                .into_iter()
+                .filter(|job: &SyncJobConfig| {
+                    // add job iff any of these apply
+                    //   - the jobs is local and we are source or target
+                    //   - we are the source of a push to a remote
+                    //   - we are the target of a pull from a remote
+                    //
+                    // `job.store == datastore.name` iff we are the target for pull from remote or we
+                    // are the source for push to remote, therefore we don't have to check for the
+                    // direction of the job.
+                    job.remote.is_none() && job.remote_store == datastore.name
+                        || job.store == datastore.name
+                })
+                .collect();
+            if !jobs_to_run.is_empty() {
+                let _ = WorkerTask::spawn(
+                    "mount-sync-jobs",
+                    Some(store),
+                    auth_id.to_string(),
+                    false,
+                    move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
+                );
+            }
+            Ok(())
+        },
     )?;
 
     Ok(json!(upid))
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 6722ebea0..01dea5126 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -161,7 +161,7 @@ pub fn run_sync_job(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+    let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
 
     Ok(upid_str)
 }
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..2a290a8ee 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error};
 use futures::{future::FutureExt, select};
 use http::StatusCode;
 use serde_json::json;
+use tokio::task::JoinHandle;
 use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
@@ -598,7 +599,7 @@ pub fn do_sync_job(
     auth_id: &Authid,
     schedule: Option<String>,
     to_stdout: bool,
-) -> Result<String, Error> {
+) -> Result<(String, JoinHandle<()>), Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
         sync_job.remote.as_deref().unwrap_or("-"),
@@ -614,7 +615,7 @@ pub fn do_sync_job(
         bail!("can't sync to same datastore");
     }
 
-    let upid_str = WorkerTask::spawn(
+    let (upid_str, handle) = WorkerTask::spawn_with_handle(
         &worker_type,
         Some(job_id.clone()),
         auth_id.to_string(),
@@ -728,5 +729,5 @@ pub fn do_sync_job(
         },
     )?;
 
-    Ok(upid_str)
+    Ok((upid_str, handle))
 }
-- 
2.39.5





More information about the pbs-devel mailing list