[pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted

Christian Ebner c.ebner at proxmox.com
Tue Feb 4 15:33:24 CET 2025


This patch needs to be rebased on current master.

High level comment:
Mixing the mount api handler with the execution logic for the sync jobs 
is not the best approach here I think. This mixes unrelated code and 
makes it harder to further extend  this to e.g. also run verify, prune 
or garbage collection in the future.

Just an idea: Maybe it would be possible to let the mount worker task 
signal the mount event via e.g. a std::sync::mpsc::channel() or the 
command socket (already used to request datastore cache updates on 
unmount) to schedule a managing task in the proxmox-backup-proxy?

further comments inline.

On 1/16/25 07:45, Hannes Laimer wrote:
> When a datastore is mounted, spawn a new task to run all sync jobs
> marked with `run-on-mount`. These jobs run sequentially and include
> any job for which the mounted datastore is:
> 
> - The source or target in a local push/pull job
> - The source in a push job to a remote datastore
> - The target in a pull job from a remote datastore
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++---
>   src/api2/admin/sync.rs      |  2 +-
>   src/server/sync.rs          |  7 +--
>   3 files changed, 90 insertions(+), 9 deletions(-)
> 
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index f5d80d610..21b58391d 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -41,8 +41,8 @@ use pbs_api_types::{
>       DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
>       GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
>       MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
> -    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
> -    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
> +    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
> +    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
>       PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
> @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
>       Ok(())
>   }
>   
> +async fn do_sync_jobs(

nit: this helper function makes more sense to be placed into 
`src/api2/admin/sync.rs` I would say, as it purely handles execution of 
sync jobs.

> +    jobs_to_run: Vec<SyncJobConfig>,
> +    worker: Arc<WorkerTask>,
> +) -> Result<(), Error> {
> +    let count = jobs_to_run.len();
> +    info!(
> +        "will run {} sync jobs: {}",
> +        count,

nit: this variable can be in-lined into the format string.

> +        jobs_to_run
> +            .iter()
> +            .map(|j| j.id.clone())
> +            .collect::<Vec<String>>()
> +            .join(", ")

nit: not sure if it is required to list all sync jobs a-priori, 
especially since that line might get rather long if many sync jobs are 
to be executed? Also, that would avoid the double iteration and id cloning.

> +    );
> +
> +    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
> +        if worker.abort_requested() {
> +            bail!("aborted due to user request");
> +        }
> +        let job_id = job_config.id.clone();
> +        let Ok(job) = Job::new("syncjob", &job_id) else {
> +            continue;
> +        };
> +        let auth_id = Authid::root_auth_id().clone();
> +        info!("[{}/{count}] starting '{job_id}'...", i + 1);
> +        match crate::server::do_sync_job(
> +            job,
> +            job_config,
> +            &auth_id,
> +            Some("mount".to_string()),
> +            false,
> +        ) {
> +            Ok((_upid, handle)) => {
> +                tokio::select! {
> +                    sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); },
> +                    _abort = worker.abort_future() => bail!("aborted due to user request"),
> +                };
> +            }
> +            Err(err) => warn!("unable to start sync job {job_id} - {err}"),
> +        }
> +    }
> +
> +    Ok(())
> +}
> +
>   #[api(
>       protected: true,
>       input: {
> @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>   
> -    let upid = WorkerTask::new_thread(
> +    let upid = WorkerTask::spawn(
>           "mount-device",
> -        Some(store),
> +        Some(store.clone()),
>           auth_id.to_string(),
>           to_stdout,
> -        move |_worker| do_mount_device(datastore),
> +        move |_worker| async move {
> +            do_mount_device(datastore.clone())?;
> +            let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
> +                warn!("unable to read sync job config, won't run any sync jobs");
> +                return Ok(());
> +            };
> +            let Ok(list) = sync_config.convert_to_typed_array("sync") else {
> +                warn!("unable to parse sync job config, won't run any sync jobs");
> +                return Ok(());
> +            };
> +            let jobs_to_run: Vec<SyncJobConfig> = list
> +                .into_iter()
> +                .filter(|job: &SyncJobConfig| {
> +                    // add job iff any of these apply
> +                    //   - the jobs is local and we are source or target
> +                    //   - we are the source of a push to a remote
> +                    //   - we are the target of a pull from a remote
> +                    //
> +                    // `job.store == datastore.name` iff we are the target for pull from remote or we
> +                    // are the source for push to remote, therefore we don't have to check for the
> +                    // direction of the job.

This misses a check for the `run_on_mount` flag here I think, as it is 
never read in the rest of the patch series?

> +                    job.remote.is_none() && job.remote_store == datastore.name
> +                        || job.store == datastore.name
> +                })
> +                .collect();
> +            if !jobs_to_run.is_empty() {
> +                let _ = WorkerTask::spawn(
> +                    "mount-sync-jobs",
> +                    Some(store),
> +                    auth_id.to_string(),
> +                    false,
> +                    move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
> +                );
> +            }
> +            Ok(())
> +        },
>       )?;
>   
>       Ok(json!(upid))
> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
> index 6722ebea0..01dea5126 100644
> --- a/src/api2/admin/sync.rs
> +++ b/src/api2/admin/sync.rs
> @@ -161,7 +161,7 @@ pub fn run_sync_job(
>   
>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>   
> -    let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
> +    let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
>   
>       Ok(upid_str)
>   }
> diff --git a/src/server/sync.rs b/src/server/sync.rs
> index 0bd7a7a85..2a290a8ee 100644
> --- a/src/server/sync.rs
> +++ b/src/server/sync.rs
> @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error};
>   use futures::{future::FutureExt, select};
>   use http::StatusCode;
>   use serde_json::json;
> +use tokio::task::JoinHandle;
>   use tracing::{info, warn};
>   
>   use proxmox_human_byte::HumanByte;
> @@ -598,7 +599,7 @@ pub fn do_sync_job(
>       auth_id: &Authid,
>       schedule: Option<String>,
>       to_stdout: bool,
> -) -> Result<String, Error> {
> +) -> Result<(String, JoinHandle<()>), Error> {
>       let job_id = format!(
>           "{}:{}:{}:{}:{}",
>           sync_job.remote.as_deref().unwrap_or("-"),
> @@ -614,7 +615,7 @@ pub fn do_sync_job(
>           bail!("can't sync to same datastore");
>       }
>   
> -    let upid_str = WorkerTask::spawn(
> +    let (upid_str, handle) = WorkerTask::spawn_with_handle(
>           &worker_type,
>           Some(job_id.clone()),
>           auth_id.to_string(),
> @@ -728,5 +729,5 @@ pub fn do_sync_job(
>           },
>       )?;
>   
> -    Ok(upid_str)
> +    Ok((upid_str, handle))
>   }





More information about the pbs-devel mailing list