[pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted

Hannes Laimer h.laimer at proxmox.com
Fri Feb 28 09:18:53 CET 2025



On 2/4/25 15:33, Christian Ebner wrote:
> This patch needs to be rebased on current master.
> 
> High level comment:
> Mixing the mount api handler with the execution logic for the sync jobs 
> is not the best approach here I think. This mixes unrelated code and 
> makes it harder to further extend  this to e.g. also run verify, prune 
> or garbage collection in the future.
> 

The endpoint starts the task that does the mounting, and it starts the
tasks that manages possible sync jobs. I don't think this really does
anything to the logic of the sync job itself. Sure, this
"run these jobs in sequence" would make sense, but currently we can't do
that, we don't have an interface for jobs. Not even job-ids are unique
across different job types. I am not sure weather there would be a lot
of value in trying to have a more abstract job things. The only thing
I could think of is jobs triggering each other. But I feel like that
might be out of scope for this here.


> Just an idea: Maybe it would be possible to let the mount worker task 
> signal the mount event via e.g. a std::sync::mpsc::channel() or the 
> command socket (already used to request datastore cache updates on 
> unmount) to schedule a managing task in the proxmox-backup-proxy?
> 

This is a privileged endpoint, it runs on the api process, so we
can't really use channels here. And it don't think the command socket
is the right tool for this, one reason being it is more of a hit or miss
(technically) which is not really what we want here. Also having the api
process handle this is IMHO not what we want.


> further comments inline.
> 
> On 1/16/25 07:45, Hannes Laimer wrote:
>> When a datastore is mounted, spawn a new task to run all sync jobs
>> marked with `run-on-mount`. These jobs run sequentially and include
>> any job for which the mounted datastore is:
>>
>> - The source or target in a local push/pull job
>> - The source in a push job to a remote datastore
>> - The target in a pull job from a remote datastore
>>
>> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
>> ---
>>   src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++---
>>   src/api2/admin/sync.rs      |  2 +-
>>   src/server/sync.rs          |  7 +--
>>   3 files changed, 90 insertions(+), 9 deletions(-)
>>
>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>> index f5d80d610..21b58391d 100644
>> --- a/src/api2/admin/datastore.rs
>> +++ b/src/api2/admin/datastore.rs
>> @@ -41,8 +41,8 @@ use pbs_api_types::{
>>       DataStoreConfig, DataStoreListItem, DataStoreMountStatus, 
>> DataStoreStatus,
>>       GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, 
>> KeepOptions, MaintenanceMode,
>>       MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, 
>> SnapshotVerifyState,
>> -    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, 
>> BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
>> -    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, 
>> DATASTORE_SCHEMA,
>> +    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, 
>> BACKUP_NAMESPACE_SCHEMA,
>> +    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, 
>> CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, 
>> MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
>>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, 
>> PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
>>       PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, 
>> UPID, UPID_SCHEMA,
>> @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: 
>> DataStoreConfig) -> Result<(), Error> {
>>       Ok(())
>>   }
>> +async fn do_sync_jobs(
> 
> nit: this helper function makes more sense to be placed into `src/api2/ 
> admin/sync.rs` I would say, as it purely handles execution of sync jobs.
> 
>> +    jobs_to_run: Vec<SyncJobConfig>,
>> +    worker: Arc<WorkerTask>,
>> +) -> Result<(), Error> {
>> +    let count = jobs_to_run.len();
>> +    info!(
>> +        "will run {} sync jobs: {}",
>> +        count,
> 
> nit: this variable can be in-lined into the format string.
> 
>> +        jobs_to_run
>> +            .iter()
>> +            .map(|j| j.id.clone())
>> +            .collect::<Vec<String>>()
>> +            .join(", ")
> 
> nit: not sure if it is required to list all sync jobs a-priori, 
> especially since that line might get rather long if many sync jobs are 
> to be executed? Also, that would avoid the double iteration and id cloning.
> 
>> +    );
>> +
>> +    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
>> +        if worker.abort_requested() {
>> +            bail!("aborted due to user request");
>> +        }
>> +        let job_id = job_config.id.clone();
>> +        let Ok(job) = Job::new("syncjob", &job_id) else {
>> +            continue;
>> +        };
>> +        let auth_id = Authid::root_auth_id().clone();
>> +        info!("[{}/{count}] starting '{job_id}'...", i + 1);
>> +        match crate::server::do_sync_job(
>> +            job,
>> +            job_config,
>> +            &auth_id,
>> +            Some("mount".to_string()),
>> +            false,
>> +        ) {
>> +            Ok((_upid, handle)) => {
>> +                tokio::select! {
>> +                    sync_done = handle.fuse() => if let Err(err) = 
>> sync_done { warn!("could not wait for job to finish: {err}"); },
>> +                    _abort = worker.abort_future() => bail!("aborted 
>> due to user request"),
>> +                };
>> +            }
>> +            Err(err) => warn!("unable to start sync job {job_id} - 
>> {err}"),
>> +        }
>> +    }
>> +
>> +    Ok(())
>> +}
>> +
>>   #[api(
>>       protected: true,
>>       input: {
>> @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn 
>> RpcEnvironment) -> Result<Value, Er
>>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>> -    let upid = WorkerTask::new_thread(
>> +    let upid = WorkerTask::spawn(
>>           "mount-device",
>> -        Some(store),
>> +        Some(store.clone()),
>>           auth_id.to_string(),
>>           to_stdout,
>> -        move |_worker| do_mount_device(datastore),
>> +        move |_worker| async move {
>> +            do_mount_device(datastore.clone())?;
>> +            let Ok((sync_config, _digest)) = 
>> pbs_config::sync::config() else {
>> +                warn!("unable to read sync job config, won't run any 
>> sync jobs");
>> +                return Ok(());
>> +            };
>> +            let Ok(list) = sync_config.convert_to_typed_array("sync") 
>> else {
>> +                warn!("unable to parse sync job config, won't run any 
>> sync jobs");
>> +                return Ok(());
>> +            };
>> +            let jobs_to_run: Vec<SyncJobConfig> = list
>> +                .into_iter()
>> +                .filter(|job: &SyncJobConfig| {
>> +                    // add job iff any of these apply
>> +                    //   - the jobs is local and we are source or target
>> +                    //   - we are the source of a push to a remote
>> +                    //   - we are the target of a pull from a remote
>> +                    //
>> +                    // `job.store == datastore.name` iff we are the 
>> target for pull from remote or we
>> +                    // are the source for push to remote, therefore 
>> we don't have to check for the
>> +                    // direction of the job.
> 
> This misses a check for the `run_on_mount` flag here I think, as it is 
> never read in the rest of the patch series?
> 

I think you are right, I actually missed checking that...

>> +                    job.remote.is_none() && job.remote_store == 
>> datastore.name
>> +                        || job.store == datastore.name
>> +                })
>> +                .collect();
>> +            if !jobs_to_run.is_empty() {
>> +                let _ = WorkerTask::spawn(
>> +                    "mount-sync-jobs",
>> +                    Some(store),
>> +                    auth_id.to_string(),
>> +                    false,
>> +                    move |worker| async move 
>> { do_sync_jobs(jobs_to_run, worker).await },
>> +                );
>> +            }
>> +            Ok(())
>> +        },
>>       )?;
>>       Ok(json!(upid))
>> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
>> index 6722ebea0..01dea5126 100644
>> --- a/src/api2/admin/sync.rs
>> +++ b/src/api2/admin/sync.rs
>> @@ -161,7 +161,7 @@ pub fn run_sync_job(
>>       let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>> -    let upid_str = do_sync_job(job, sync_job, &auth_id, None, 
>> to_stdout)?;
>> +    let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, 
>> to_stdout)?;
>>       Ok(upid_str)
>>   }
>> diff --git a/src/server/sync.rs b/src/server/sync.rs
>> index 0bd7a7a85..2a290a8ee 100644
>> --- a/src/server/sync.rs
>> +++ b/src/server/sync.rs
>> @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error};
>>   use futures::{future::FutureExt, select};
>>   use http::StatusCode;
>>   use serde_json::json;
>> +use tokio::task::JoinHandle;
>>   use tracing::{info, warn};
>>   use proxmox_human_byte::HumanByte;
>> @@ -598,7 +599,7 @@ pub fn do_sync_job(
>>       auth_id: &Authid,
>>       schedule: Option<String>,
>>       to_stdout: bool,
>> -) -> Result<String, Error> {
>> +) -> Result<(String, JoinHandle<()>), Error> {
>>       let job_id = format!(
>>           "{}:{}:{}:{}:{}",
>>           sync_job.remote.as_deref().unwrap_or("-"),
>> @@ -614,7 +615,7 @@ pub fn do_sync_job(
>>           bail!("can't sync to same datastore");
>>       }
>> -    let upid_str = WorkerTask::spawn(
>> +    let (upid_str, handle) = WorkerTask::spawn_with_handle(
>>           &worker_type,
>>           Some(job_id.clone()),
>>           auth_id.to_string(),
>> @@ -728,5 +729,5 @@ pub fn do_sync_job(
>>           },
>>       )?;
>> -    Ok(upid_str)
>> +    Ok((upid_str, handle))
>>   }
> 





More information about the pbs-devel mailing list