[pbs-devel] [PATCH v2 proxmox-backup 17/31] api: sync: move sync job invocation to common module
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Aug 7 12:51:01 CEST 2024
Quoting Christian Ebner (2024-08-01 09:43:49)
> Moves and refactored the sync_job_do function into a common sync
> module so that it can be reused for both sync directions, pull and
> push.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - adapt to new sync direction config type
> - refactor log outputs to reduce code and to fix meaning for push
> direction
>
> src/api2/admin/sync.rs | 15 +++-
> src/api2/mod.rs | 1 +
> src/api2/pull.rs | 108 --------------------------
> src/api2/sync.rs | 131 ++++++++++++++++++++++++++++++++
the do_sync_job is not really API code now, and could move to
src/server/sync.rs? or at least the part that is executed as worker task
could, with the task spawning left in src/api2/pull.rs and push.rs
that's how the other jobs are handled as well, except for the tape backup jobs
;)
> src/bin/proxmox-backup-proxy.rs | 15 +++-
> 5 files changed, 155 insertions(+), 115 deletions(-)
> create mode 100644 src/api2/sync.rs
>
> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
> index 4e2ba0be8..5bab1e396 100644
> --- a/src/api2/admin/sync.rs
> +++ b/src/api2/admin/sync.rs
> @@ -10,14 +10,16 @@ use proxmox_router::{
> use proxmox_schema::api;
> use proxmox_sortable_macro::sortable;
>
> -use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA};
> +use pbs_api_types::{
> + Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA,
> +};
> use pbs_config::sync;
> use pbs_config::CachedUserInfo;
>
> use crate::{
> api2::{
> config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
> - pull::do_sync_job,
> + sync::do_sync_job,
> },
> server::jobstate::{compute_schedule_status, Job, JobState},
> };
> @@ -116,7 +118,14 @@ pub fn run_sync_job(
>
> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>
> - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
> + let upid_str = do_sync_job(
> + job,
> + sync_job,
> + &auth_id,
> + None,
> + SyncDirection::Pull,
> + to_stdout,
> + )?;
>
> Ok(upid_str)
> }
> diff --git a/src/api2/mod.rs b/src/api2/mod.rs
> index 03596326b..44e3776a4 100644
> --- a/src/api2/mod.rs
> +++ b/src/api2/mod.rs
> @@ -15,6 +15,7 @@ pub mod pull;
> pub mod push;
> pub mod reader;
> pub mod status;
> +pub mod sync;
> pub mod tape;
> pub mod types;
> pub mod version;
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index e733c9839..d039dab59 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -13,10 +13,8 @@ use pbs_api_types::{
> TRANSFER_LAST_SCHEMA,
> };
> use pbs_config::CachedUserInfo;
> -use proxmox_human_byte::HumanByte;
> use proxmox_rest_server::WorkerTask;
>
> -use crate::server::jobstate::Job;
> use crate::server::pull::{pull_store, PullParameters};
>
> pub fn check_pull_privs(
> @@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
> }
> }
>
> -pub fn do_sync_job(
> - mut job: Job,
> - sync_job: SyncJobConfig,
> - auth_id: &Authid,
> - schedule: Option<String>,
> - to_stdout: bool,
> -) -> Result<String, Error> {
> - let job_id = format!(
> - "{}:{}:{}:{}:{}",
> - sync_job.remote.as_deref().unwrap_or("-"),
> - sync_job.remote_store,
> - sync_job.store,
> - sync_job.ns.clone().unwrap_or_default(),
> - job.jobname()
> - );
> - let worker_type = job.jobtype().to_string();
> -
> - if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
> - bail!("can't sync to same datastore");
> - }
> -
> - let upid_str = WorkerTask::spawn(
> - &worker_type,
> - Some(job_id.clone()),
> - auth_id.to_string(),
> - to_stdout,
> - move |worker| async move {
> - job.start(&worker.upid().to_string())?;
> -
> - let worker2 = worker.clone();
> - let sync_job2 = sync_job.clone();
> -
> - let worker_future = async move {
> - let pull_params = PullParameters::try_from(&sync_job)?;
> -
> - info!("Starting datastore sync job '{job_id}'");
> - if let Some(event_str) = schedule {
> - info!("task triggered by schedule '{event_str}'");
> - }
> -
> - info!(
> - "sync datastore '{}' from '{}{}'",
> - sync_job.store,
> - sync_job
> - .remote
> - .as_deref()
> - .map_or(String::new(), |remote| format!("{remote}/")),
> - sync_job.remote_store,
> - );
> -
> - let pull_stats = pull_store(pull_params).await?;
> -
> - if pull_stats.bytes != 0 {
> - let amount = HumanByte::from(pull_stats.bytes);
> - let rate = HumanByte::new_binary(
> - pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
> - );
> - info!(
> - "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
> - pull_stats.chunk_count,
> - );
> - } else {
> - info!("Summary: sync job found no new data to pull");
> - }
> -
> - if let Some(removed) = pull_stats.removed {
> - info!(
> - "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
> - removed.snapshots, removed.groups, removed.namespaces,
> - );
> - }
> -
> - info!("sync job '{}' end", &job_id);
> -
> - Ok(())
> - };
> -
> - let mut abort_future = worker2
> - .abort_future()
> - .map(|_| Err(format_err!("sync aborted")));
> -
> - let result = select! {
> - worker = worker_future.fuse() => worker,
> - abort = abort_future => abort,
> - };
> -
> - let status = worker2.create_state(&result);
> -
> - match job.finish(status) {
> - Ok(_) => {}
> - Err(err) => {
> - eprintln!("could not finish job state: {}", err);
> - }
> - }
> -
> - if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
> - eprintln!("send sync notification failed: {err}");
> - }
> -
> - result
> - },
> - )?;
> -
> - Ok(upid_str)
> -}
> -
> #[api(
> input: {
> properties: {
> diff --git a/src/api2/sync.rs b/src/api2/sync.rs
> new file mode 100644
> index 000000000..8de413556
> --- /dev/null
> +++ b/src/api2/sync.rs
> @@ -0,0 +1,131 @@
> +//! Sync datastore from remote server
> +use anyhow::{bail, format_err, Error};
> +use futures::{future::FutureExt, select};
> +use tracing::info;
> +
> +use pbs_api_types::{Authid, SyncDirection, SyncJobConfig};
> +use proxmox_human_byte::HumanByte;
> +use proxmox_rest_server::WorkerTask;
> +
> +use crate::server::jobstate::Job;
> +use crate::server::pull::{pull_store, PullParameters};
> +use crate::server::push::{push_store, PushParameters};
> +
> +pub fn do_sync_job(
> + mut job: Job,
> + sync_job: SyncJobConfig,
> + auth_id: &Authid,
> + schedule: Option<String>,
> + sync_direction: SyncDirection,
> + to_stdout: bool,
> +) -> Result<String, Error> {
> + let job_id = format!(
> + "{}:{}:{}:{}:{}",
> + sync_job.remote.as_deref().unwrap_or("-"),
> + sync_job.remote_store,
> + sync_job.store,
> + sync_job.ns.clone().unwrap_or_default(),
> + job.jobname(),
> + );
> + let worker_type = job.jobtype().to_string();
> +
> + if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
> + bail!("can't sync to same datastore");
> + }
> +
> + let upid_str = WorkerTask::spawn(
> + &worker_type,
> + Some(job_id.clone()),
> + auth_id.to_string(),
> + to_stdout,
> + move |worker| async move {
> + job.start(&worker.upid().to_string())?;
> +
> + let worker2 = worker.clone();
> + let sync_job2 = sync_job.clone();
> +
> + let worker_future = async move {
> + info!("Starting datastore sync job '{job_id}'");
> + if let Some(event_str) = schedule {
> + info!("task triggered by schedule '{event_str}'");
> + }
> + let sync_stats = match sync_direction {
> + SyncDirection::Pull => {
> + info!(
> + "sync datastore '{}' from '{}{}'",
> + sync_job.store,
> + sync_job
> + .remote
> + .as_deref()
> + .map_or(String::new(), |remote| format!("{remote}/")),
> + sync_job.remote_store,
> + );
> + let pull_params = PullParameters::try_from(&sync_job)?;
> + pull_store(pull_params).await?
> + }
> + SyncDirection::Push => {
> + info!(
> + "sync datastore '{}' to '{}{}'",
> + sync_job.store,
> + sync_job
> + .remote
> + .as_deref()
> + .map_or(String::new(), |remote| format!("{remote}/")),
> + sync_job.remote_store,
> + );
> + let push_params = PushParameters::try_from(&sync_job)?;
> + push_store(push_params).await?
> + }
> + };
> +
> + if sync_stats.bytes != 0 {
> + let amount = HumanByte::from(sync_stats.bytes);
> + let rate = HumanByte::new_binary(
> + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
> + );
> + info!(
> + "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)",
> + sync_stats.chunk_count,
> + );
> + } else {
> + info!("Summary: sync job found no new data to {sync_direction}");
> + }
> +
> + if let Some(removed) = sync_stats.removed {
> + info!(
> + "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
> + removed.snapshots, removed.groups, removed.namespaces,
> + );
> + }
> +
> + info!("sync job '{job_id}' end");
> +
> + Ok(())
> + };
> +
> + let mut abort_future = worker2
> + .abort_future()
> + .map(|_| Err(format_err!("sync aborted")));
> +
> + let result = select! {
> + worker = worker_future.fuse() => worker,
> + abort = abort_future => abort,
> + };
> +
> + let status = worker2.create_state(&result);
> +
> + match job.finish(status) {
> + Ok(_) => {}
> + Err(err) => eprintln!("could not finish job state: {err}"),
> + }
> +
> + if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
> + eprintln!("send sync notification failed: {err}");
> + }
> +
> + result
> + },
> + )?;
> +
> + Ok(upid_str)
> +}
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 041f3aff9..278adfa43 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -46,8 +46,8 @@ use pbs_buildcfg::configdir;
> use proxmox_time::CalendarEvent;
>
> use pbs_api_types::{
> - Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
> - VerificationJobConfig,
> + Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig,
> + TapeBackupJobConfig, VerificationJobConfig,
> };
>
> use proxmox_backup::auth_helpers::*;
> @@ -57,7 +57,7 @@ use proxmox_backup::tools::{
> PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
> };
>
> -use proxmox_backup::api2::pull::do_sync_job;
> +use proxmox_backup::api2::sync::do_sync_job;
> use proxmox_backup::api2::tape::backup::do_tape_backup_job;
> use proxmox_backup::server::do_prune_job;
> use proxmox_backup::server::do_verification_job;
> @@ -630,7 +630,14 @@ async fn schedule_datastore_sync_jobs() {
> };
>
> let auth_id = Authid::root_auth_id().clone();
> - if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
> + if let Err(err) = do_sync_job(
> + job,
> + job_config,
> + &auth_id,
> + Some(event_str),
> + SyncDirection::Pull,
> + false,
> + ) {
> eprintln!("unable to start datastore sync job {job_id} - {err}");
> }
> };
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
More information about the pbs-devel
mailing list