[pbs-devel] [PATCH v2 proxmox-backup 2/6] api: config/sync: add optional `parallel-groups` property

Christian Ebner c.ebner at proxmox.com
Mon Jan 20 11:51:00 CET 2025


Allow to configure from 1 up to 8 concurrent futures to perform
multiple group syncs in parallel.

The property is exposed via the sync job config and passed to
the pull/push parameters for the sync job to setup and execute the
futures accordingly.

Implements the schema definitions and includes the new property to
the `SyncJobConfig`, `PullParameters` and `PushParameters`.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- renamed from `group-sync-tasks` to `parallel-groups` as this reflects
  the functionality better
- cover also the previously not present push direction for sync jobs

 pbs-api-types/src/jobs.rs | 14 ++++++++++++++
 src/api2/config/sync.rs   | 10 ++++++++++
 src/api2/pull.rs          |  9 ++++++++-
 src/api2/push.rs          |  9 ++++++++-
 src/server/pull.rs        |  4 ++++
 src/server/push.rs        |  4 ++++
 src/server/sync.rs        |  1 +
 7 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 04631d920..600ec8ed4 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -64,6 +64,14 @@ pub const REMOVE_VANISHED_BACKUPS_SCHEMA: Schema = BooleanSchema::new(
 .default(false)
 .schema();
 
+const SYNC_PARALLEL_GROUPS_MAX: usize = 8;
+pub const SYNC_PARALLEL_GROUPS_SCHEMA: Schema =
+    IntegerSchema::new("Maximum number of groups to synchronzie in parallel for a sync jobs")
+        .minimum(1)
+        .maximum(SYNC_PARALLEL_GROUPS_MAX as isize)
+        .default(1)
+        .schema();
+
 #[api(
     properties: {
         "next-run": {
@@ -585,6 +593,10 @@ pub const RESYNC_CORRUPT_SCHEMA: Schema =
             type: SyncDirection,
             optional: true,
         },
+        "parallel-groups": {
+            schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+            optional: true,
+        },
     }
 )]
 #[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -622,6 +634,8 @@ pub struct SyncJobConfig {
     pub resync_corrupt: Option<bool>,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub sync_direction: Option<SyncDirection>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub parallel_groups: Option<usize>,
 }
 
 impl SyncJobConfig {
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index a8ea93465..517bcd868 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -337,6 +337,8 @@ pub enum DeletableProperty {
     TransferLast,
     /// Delete the sync_direction property,
     SyncDirection,
+    /// Delete the parallel_groups property,
+    ParallelGroups,
 }
 
 #[api(
@@ -451,6 +453,9 @@ pub fn update_sync_job(
                 DeletableProperty::SyncDirection => {
                     data.sync_direction = None;
                 }
+                DeletableProperty::ParallelGroups => {
+                    data.parallel_groups = None;
+                }
             }
         }
     }
@@ -495,6 +500,10 @@ pub fn update_sync_job(
         data.sync_direction = Some(sync_direction);
     }
 
+    if let Some(parallel_groups) = update.parallel_groups {
+        data.parallel_groups = Some(parallel_groups);
+    }
+
     if update.limit.rate_in.is_some() {
         data.limit.rate_in = update.limit.rate_in;
     }
@@ -666,6 +675,7 @@ acl:1:/remote/remote1/remotestore1:write at pbs:RemoteSyncOperator
         limit: pbs_api_types::RateLimitConfig::default(), // no limit
         transfer_last: None,
         sync_direction: None, // use default
+        parallel_groups: None,
     };
 
     // should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index d8ed1a734..36f66d9b0 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -10,7 +10,7 @@ use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
     GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
-    RESYNC_CORRUPT_SCHEMA, TRANSFER_LAST_SCHEMA,
+    RESYNC_CORRUPT_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::WorkerTask;
@@ -88,6 +88,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
             sync_job.limit.clone(),
             sync_job.transfer_last,
             sync_job.resync_corrupt,
+            sync_job.parallel_groups,
         )
     }
 }
@@ -137,6 +138,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
                 schema: RESYNC_CORRUPT_SCHEMA,
                 optional: true,
             },
+            "parallel-groups": {
+                schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+                optional: true,
+            },
         },
     },
     access: {
@@ -162,6 +167,7 @@ async fn pull(
     limit: RateLimitConfig,
     transfer_last: Option<usize>,
     resync_corrupt: Option<bool>,
+    parallel_groups: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -200,6 +206,7 @@ async fn pull(
         limit,
         transfer_last,
         resync_corrupt,
+        parallel_groups,
     )?;
 
     // fixme: set to_stdout to false?
diff --git a/src/api2/push.rs b/src/api2/push.rs
index bf846bb37..4bb91ed61 100644
--- a/src/api2/push.rs
+++ b/src/api2/push.rs
@@ -5,7 +5,8 @@ use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, DATASTORE_SCHEMA,
     GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_PRUNE,
-    REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
+    REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA,
+    TRANSFER_LAST_SCHEMA,
 };
 use proxmox_rest_server::WorkerTask;
 use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -99,6 +100,10 @@ fn check_push_privs(
                 schema: TRANSFER_LAST_SCHEMA,
                 optional: true,
             },
+            "parallel-groups": {
+                schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+                optional: true,
+            },
         },
     },
     access: {
@@ -122,6 +127,7 @@ async fn push(
     group_filter: Option<Vec<GroupFilter>>,
     limit: RateLimitConfig,
     transfer_last: Option<usize>,
+    parallel_groups: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -151,6 +157,7 @@ async fn push(
         group_filter,
         limit,
         transfer_last,
+        parallel_groups,
     )
     .await?;
 
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 516abfe5d..0986bc5c8 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -57,6 +57,8 @@ pub(crate) struct PullParameters {
     transfer_last: Option<usize>,
     /// Whether to re-sync corrupted snapshots
     resync_corrupt: bool,
+    /// Maximum number of parallel groups to pull during sync job
+    parallel_groups: Option<usize>,
 }
 
 impl PullParameters {
@@ -75,6 +77,7 @@ impl PullParameters {
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
         resync_corrupt: Option<bool>,
+        parallel_groups: Option<usize>,
     ) -> Result<Self, Error> {
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
@@ -121,6 +124,7 @@ impl PullParameters {
             group_filter,
             transfer_last,
             resync_corrupt,
+            parallel_groups,
         })
     }
 }
diff --git a/src/server/push.rs b/src/server/push.rs
index 8911b80fc..c61f0fe73 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -75,6 +75,8 @@ pub(crate) struct PushParameters {
     group_filter: Vec<GroupFilter>,
     /// How many snapshots should be transferred at most (taking the newest N snapshots)
     transfer_last: Option<usize>,
+    /// Maximum number of parallel groups to push during sync job
+    parallel_groups: Option<usize>,
 }
 
 impl PushParameters {
@@ -92,6 +94,7 @@ impl PushParameters {
         group_filter: Option<Vec<GroupFilter>>,
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
+        parallel_groups: Option<usize>,
     ) -> Result<Self, Error> {
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
@@ -150,6 +153,7 @@ impl PushParameters {
             max_depth,
             group_filter,
             transfer_last,
+            parallel_groups,
         })
     }
 
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..8e5d39182 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -673,6 +673,7 @@ pub fn do_sync_job(
                             sync_job.group_filter.clone(),
                             sync_job.limit.clone(),
                             sync_job.transfer_last,
+                            sync_job.parallel_groups,
                         )
                         .await?;
                         push_store(push_params).await?
-- 
2.39.5





More information about the pbs-devel mailing list