[pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted
Hannes Laimer
h.laimer at proxmox.com
Thu Jan 16 07:45:39 CET 2025
When a datastore is mounted, spawn a new task to run all sync jobs
marked with `run-on-mount`. These jobs run sequentially and include
any job for which the mounted datastore is:
- The source or target in a local push/pull job
- The source in a push job to a remote datastore
- The target in a pull job from a remote datastore
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++---
src/api2/admin/sync.rs | 2 +-
src/server/sync.rs | 7 +--
3 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f5d80d610..21b58391d 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -41,8 +41,8 @@ use pbs_api_types::{
DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
- BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
- BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
+ SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+ BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
@@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
Ok(())
}
+async fn do_sync_jobs(
+ jobs_to_run: Vec<SyncJobConfig>,
+ worker: Arc<WorkerTask>,
+) -> Result<(), Error> {
+ let count = jobs_to_run.len();
+ info!(
+ "will run {} sync jobs: {}",
+ count,
+ jobs_to_run
+ .iter()
+ .map(|j| j.id.clone())
+ .collect::<Vec<String>>()
+ .join(", ")
+ );
+
+ for (i, job_config) in jobs_to_run.into_iter().enumerate() {
+ if worker.abort_requested() {
+ bail!("aborted due to user request");
+ }
+ let job_id = job_config.id.clone();
+ let Ok(job) = Job::new("syncjob", &job_id) else {
+ continue;
+ };
+ let auth_id = Authid::root_auth_id().clone();
+ info!("[{}/{count}] starting '{job_id}'...", i + 1);
+ match crate::server::do_sync_job(
+ job,
+ job_config,
+ &auth_id,
+ Some("mount".to_string()),
+ false,
+ ) {
+ Ok((_upid, handle)) => {
+ tokio::select! {
+ sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); },
+ _abort = worker.abort_future() => bail!("aborted due to user request"),
+ };
+ }
+ Err(err) => warn!("unable to start sync job {job_id} - {err}"),
+ }
+ }
+
+ Ok(())
+}
+
#[api(
protected: true,
input: {
@@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
- let upid = WorkerTask::new_thread(
+ let upid = WorkerTask::spawn(
"mount-device",
- Some(store),
+ Some(store.clone()),
auth_id.to_string(),
to_stdout,
- move |_worker| do_mount_device(datastore),
+ move |_worker| async move {
+ do_mount_device(datastore.clone())?;
+ let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
+ warn!("unable to read sync job config, won't run any sync jobs");
+ return Ok(());
+ };
+ let Ok(list) = sync_config.convert_to_typed_array("sync") else {
+ warn!("unable to parse sync job config, won't run any sync jobs");
+ return Ok(());
+ };
+ let jobs_to_run: Vec<SyncJobConfig> = list
+ .into_iter()
+ .filter(|job: &SyncJobConfig| {
+ // add job iff any of these apply
+ // - the jobs is local and we are source or target
+ // - we are the source of a push to a remote
+ // - we are the target of a pull from a remote
+ //
+ // `job.store == datastore.name` iff we are the target for pull from remote or we
+ // are the source for push to remote, therefore we don't have to check for the
+ // direction of the job.
+ job.remote.is_none() && job.remote_store == datastore.name
+ || job.store == datastore.name
+ })
+ .collect();
+ if !jobs_to_run.is_empty() {
+ let _ = WorkerTask::spawn(
+ "mount-sync-jobs",
+ Some(store),
+ auth_id.to_string(),
+ false,
+ move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
+ );
+ }
+ Ok(())
+ },
)?;
Ok(json!(upid))
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 6722ebea0..01dea5126 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -161,7 +161,7 @@ pub fn run_sync_job(
let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
- let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+ let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
Ok(upid_str)
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..2a290a8ee 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error};
use futures::{future::FutureExt, select};
use http::StatusCode;
use serde_json::json;
+use tokio::task::JoinHandle;
use tracing::{info, warn};
use proxmox_human_byte::HumanByte;
@@ -598,7 +599,7 @@ pub fn do_sync_job(
auth_id: &Authid,
schedule: Option<String>,
to_stdout: bool,
-) -> Result<String, Error> {
+) -> Result<(String, JoinHandle<()>), Error> {
let job_id = format!(
"{}:{}:{}:{}:{}",
sync_job.remote.as_deref().unwrap_or("-"),
@@ -614,7 +615,7 @@ pub fn do_sync_job(
bail!("can't sync to same datastore");
}
- let upid_str = WorkerTask::spawn(
+ let (upid_str, handle) = WorkerTask::spawn_with_handle(
&worker_type,
Some(job_id.clone()),
auth_id.to_string(),
@@ -728,5 +729,5 @@ pub fn do_sync_job(
},
)?;
- Ok(upid_str)
+ Ok((upid_str, handle))
}
--
2.39.5
More information about the pbs-devel
mailing list