[pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module

Christian Ebner c.ebner at proxmox.com
Mon Jul 15 12:15:59 CEST 2024


Moves and refactored the sync_job_do function into a common sync
module so that it can be reused for both sync directions, pull and
push.
The sync direction for the job is determined based on the sync job
configuration, with pull being the fallback default if not
explicitly set.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/api2/admin/sync.rs          |   2 +-
 src/api2/mod.rs                 |   1 +
 src/api2/pull.rs                | 108 ------------------------
 src/api2/sync.rs                | 142 ++++++++++++++++++++++++++++++++
 src/bin/proxmox-backup-proxy.rs |   2 +-
 5 files changed, 145 insertions(+), 110 deletions(-)
 create mode 100644 src/api2/sync.rs

diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 4e2ba0be8..8def14c72 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -17,7 +17,7 @@ use pbs_config::CachedUserInfo;
 use crate::{
     api2::{
         config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
-        pull::do_sync_job,
+        sync::do_sync_job,
     },
     server::jobstate::{compute_schedule_status, Job, JobState},
 };
diff --git a/src/api2/mod.rs b/src/api2/mod.rs
index 03596326b..44e3776a4 100644
--- a/src/api2/mod.rs
+++ b/src/api2/mod.rs
@@ -15,6 +15,7 @@ pub mod pull;
 pub mod push;
 pub mod reader;
 pub mod status;
+pub mod sync;
 pub mod tape;
 pub mod types;
 pub mod version;
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..d039dab59 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,10 +13,8 @@ use pbs_api_types::{
     TRANSFER_LAST_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
-use proxmox_human_byte::HumanByte;
 use proxmox_rest_server::WorkerTask;
 
-use crate::server::jobstate::Job;
 use crate::server::pull::{pull_store, PullParameters};
 
 pub fn check_pull_privs(
@@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
     }
 }
 
-pub fn do_sync_job(
-    mut job: Job,
-    sync_job: SyncJobConfig,
-    auth_id: &Authid,
-    schedule: Option<String>,
-    to_stdout: bool,
-) -> Result<String, Error> {
-    let job_id = format!(
-        "{}:{}:{}:{}:{}",
-        sync_job.remote.as_deref().unwrap_or("-"),
-        sync_job.remote_store,
-        sync_job.store,
-        sync_job.ns.clone().unwrap_or_default(),
-        job.jobname()
-    );
-    let worker_type = job.jobtype().to_string();
-
-    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
-        bail!("can't sync to same datastore");
-    }
-
-    let upid_str = WorkerTask::spawn(
-        &worker_type,
-        Some(job_id.clone()),
-        auth_id.to_string(),
-        to_stdout,
-        move |worker| async move {
-            job.start(&worker.upid().to_string())?;
-
-            let worker2 = worker.clone();
-            let sync_job2 = sync_job.clone();
-
-            let worker_future = async move {
-                let pull_params = PullParameters::try_from(&sync_job)?;
-
-                info!("Starting datastore sync job '{job_id}'");
-                if let Some(event_str) = schedule {
-                    info!("task triggered by schedule '{event_str}'");
-                }
-
-                info!(
-                    "sync datastore '{}' from '{}{}'",
-                    sync_job.store,
-                    sync_job
-                        .remote
-                        .as_deref()
-                        .map_or(String::new(), |remote| format!("{remote}/")),
-                    sync_job.remote_store,
-                );
-
-                let pull_stats = pull_store(pull_params).await?;
-
-                if pull_stats.bytes != 0 {
-                    let amount = HumanByte::from(pull_stats.bytes);
-                    let rate = HumanByte::new_binary(
-                        pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
-                    );
-                    info!(
-                        "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
-                        pull_stats.chunk_count,
-                    );
-                } else {
-                    info!("Summary: sync job found no new data to pull");
-                }
-
-                if let Some(removed) = pull_stats.removed {
-                    info!(
-                        "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
-                        removed.snapshots, removed.groups, removed.namespaces,
-                    );
-                }
-
-                info!("sync job '{}' end", &job_id);
-
-                Ok(())
-            };
-
-            let mut abort_future = worker2
-                .abort_future()
-                .map(|_| Err(format_err!("sync aborted")));
-
-            let result = select! {
-                worker = worker_future.fuse() => worker,
-                abort = abort_future => abort,
-            };
-
-            let status = worker2.create_state(&result);
-
-            match job.finish(status) {
-                Ok(_) => {}
-                Err(err) => {
-                    eprintln!("could not finish job state: {}", err);
-                }
-            }
-
-            if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
-                eprintln!("send sync notification failed: {err}");
-            }
-
-            result
-        },
-    )?;
-
-    Ok(upid_str)
-}
-
 #[api(
     input: {
         properties: {
diff --git a/src/api2/sync.rs b/src/api2/sync.rs
new file mode 100644
index 000000000..3972c6ead
--- /dev/null
+++ b/src/api2/sync.rs
@@ -0,0 +1,142 @@
+//! Sync datastore from remote server
+use anyhow::{bail, format_err, Error};
+use futures::{future::FutureExt, select};
+use tracing::info;
+
+use pbs_api_types::{Authid, SyncDirection, SyncJobConfig};
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
+
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+use crate::server::push::{push_store, PushParameters};
+
+pub fn do_sync_job(
+    mut job: Job,
+    sync_job: SyncJobConfig,
+    auth_id: &Authid,
+    schedule: Option<String>,
+    to_stdout: bool,
+) -> Result<String, Error> {
+    let job_id = format!(
+        "{}:{}:{}:{}:{}",
+        sync_job.remote.as_deref().unwrap_or("-"),
+        sync_job.remote_store,
+        sync_job.store,
+        sync_job.ns.clone().unwrap_or_default(),
+        job.jobname(),
+    );
+    let worker_type = job.jobtype().to_string();
+
+    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+        bail!("can't sync to same datastore");
+    }
+
+    let upid_str = WorkerTask::spawn(
+        &worker_type,
+        Some(job_id.clone()),
+        auth_id.to_string(),
+        to_stdout,
+        move |worker| async move {
+            job.start(&worker.upid().to_string())?;
+
+            let worker2 = worker.clone();
+            let sync_job2 = sync_job.clone();
+
+            let worker_future = async move {
+                let direction = sync_job
+                    .sync_direction
+                    .as_ref()
+                    .unwrap_or(&SyncDirection::Pull);
+                info!("Starting datastore sync job '{job_id}'");
+                if let Some(event_str) = schedule {
+                    info!("task triggered by schedule '{event_str}'");
+                }
+                info!(
+                    "sync datastore '{}' from '{}{}'",
+                    sync_job.store,
+                    sync_job
+                        .remote
+                        .as_deref()
+                        .map_or(String::new(), |remote| format!("{remote}/")),
+                    sync_job.remote_store,
+                );
+
+                let sync_stats = match direction {
+                    SyncDirection::Pull => {
+                        let pull_params = PullParameters::try_from(&sync_job)?;
+                        pull_store(pull_params).await?
+                    }
+                    SyncDirection::Push => {
+                        let push_params = PushParameters::try_from(&sync_job)?;
+                        push_store(push_params).await?
+                    }
+                };
+
+                if sync_stats.bytes != 0 {
+                    let amount = HumanByte::from(sync_stats.bytes);
+                    let rate = HumanByte::new_binary(
+                        sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
+                    );
+
+                    match direction {
+                        SyncDirection::Pull =>
+                            info!(
+                                "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
+                                sync_stats.chunk_count,
+                            ),
+                        SyncDirection::Push =>
+                            info!(
+                                "Summary: sync job pushed {amount} in {} chunks (average rate: {rate}/s)",
+                                sync_stats.chunk_count,
+                            ),
+                    }
+                } else {
+                    match direction {
+                        SyncDirection::Pull => {
+                            info!("Summary: sync job found no new data to pull")
+                        }
+                        SyncDirection::Push => {
+                            info!("Summary: sync job found no new data to push")
+                        }
+                    }
+                }
+
+                if let Some(removed) = sync_stats.removed {
+                    info!(
+                        "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+                        removed.snapshots, removed.groups, removed.namespaces,
+                    );
+                }
+
+                info!("sync job '{job_id}' end");
+
+                Ok(())
+            };
+
+            let mut abort_future = worker2
+                .abort_future()
+                .map(|_| Err(format_err!("sync aborted")));
+
+            let result = select! {
+                worker = worker_future.fuse() => worker,
+                abort = abort_future => abort,
+            };
+
+            let status = worker2.create_state(&result);
+
+            match job.finish(status) {
+                Ok(_) => {}
+                Err(err) => eprintln!("could not finish job state: {err}"),
+            }
+
+            if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
+                eprintln!("send sync notification failed: {err}");
+            }
+
+            result
+        },
+    )?;
+
+    Ok(upid_str)
+}
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index e0bd47b45..2dd0c9d00 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -58,7 +58,7 @@ use proxmox_backup::tools::{
     PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
 };
 
-use proxmox_backup::api2::pull::do_sync_job;
+use proxmox_backup::api2::sync::do_sync_job;
 use proxmox_backup::api2::tape::backup::do_tape_backup_job;
 use proxmox_backup::server::do_prune_job;
 use proxmox_backup::server::do_verification_job;
-- 
2.39.2





More information about the pbs-devel mailing list