[pbs-devel] [PATCH v2 proxmox-backup 2/6] api: config/sync: add optional `parallel-groups` property
Christian Ebner
c.ebner at proxmox.com
Mon Jan 20 11:51:00 CET 2025
Allow to configure from 1 up to 8 concurrent futures to perform
multiple group syncs in parallel.
The property is exposed via the sync job config and passed to
the pull/push parameters for the sync job to setup and execute the
futures accordingly.
Implements the schema definitions and includes the new property to
the `SyncJobConfig`, `PullParameters` and `PushParameters`.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- renamed from `group-sync-tasks` to `parallel-groups` as this reflects
the functionality better
- cover also the previously not present push direction for sync jobs
pbs-api-types/src/jobs.rs | 14 ++++++++++++++
src/api2/config/sync.rs | 10 ++++++++++
src/api2/pull.rs | 9 ++++++++-
src/api2/push.rs | 9 ++++++++-
src/server/pull.rs | 4 ++++
src/server/push.rs | 4 ++++
src/server/sync.rs | 1 +
7 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 04631d920..600ec8ed4 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -64,6 +64,14 @@ pub const REMOVE_VANISHED_BACKUPS_SCHEMA: Schema = BooleanSchema::new(
.default(false)
.schema();
+const SYNC_PARALLEL_GROUPS_MAX: usize = 8;
+pub const SYNC_PARALLEL_GROUPS_SCHEMA: Schema =
+ IntegerSchema::new("Maximum number of groups to synchronzie in parallel for a sync jobs")
+ .minimum(1)
+ .maximum(SYNC_PARALLEL_GROUPS_MAX as isize)
+ .default(1)
+ .schema();
+
#[api(
properties: {
"next-run": {
@@ -585,6 +593,10 @@ pub const RESYNC_CORRUPT_SCHEMA: Schema =
type: SyncDirection,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -622,6 +634,8 @@ pub struct SyncJobConfig {
pub resync_corrupt: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_direction: Option<SyncDirection>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parallel_groups: Option<usize>,
}
impl SyncJobConfig {
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index a8ea93465..517bcd868 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -337,6 +337,8 @@ pub enum DeletableProperty {
TransferLast,
/// Delete the sync_direction property,
SyncDirection,
+ /// Delete the parallel_groups property,
+ ParallelGroups,
}
#[api(
@@ -451,6 +453,9 @@ pub fn update_sync_job(
DeletableProperty::SyncDirection => {
data.sync_direction = None;
}
+ DeletableProperty::ParallelGroups => {
+ data.parallel_groups = None;
+ }
}
}
}
@@ -495,6 +500,10 @@ pub fn update_sync_job(
data.sync_direction = Some(sync_direction);
}
+ if let Some(parallel_groups) = update.parallel_groups {
+ data.parallel_groups = Some(parallel_groups);
+ }
+
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
}
@@ -666,6 +675,7 @@ acl:1:/remote/remote1/remotestore1:write at pbs:RemoteSyncOperator
limit: pbs_api_types::RateLimitConfig::default(), // no limit
transfer_last: None,
sync_direction: None, // use default
+ parallel_groups: None,
};
// should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index d8ed1a734..36f66d9b0 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -10,7 +10,7 @@ use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
- RESYNC_CORRUPT_SCHEMA, TRANSFER_LAST_SCHEMA,
+ RESYNC_CORRUPT_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
use proxmox_rest_server::WorkerTask;
@@ -88,6 +88,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
sync_job.limit.clone(),
sync_job.transfer_last,
sync_job.resync_corrupt,
+ sync_job.parallel_groups,
)
}
}
@@ -137,6 +138,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
schema: RESYNC_CORRUPT_SCHEMA,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -162,6 +167,7 @@ async fn pull(
limit: RateLimitConfig,
transfer_last: Option<usize>,
resync_corrupt: Option<bool>,
+ parallel_groups: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -200,6 +206,7 @@ async fn pull(
limit,
transfer_last,
resync_corrupt,
+ parallel_groups,
)?;
// fixme: set to_stdout to false?
diff --git a/src/api2/push.rs b/src/api2/push.rs
index bf846bb37..4bb91ed61 100644
--- a/src/api2/push.rs
+++ b/src/api2/push.rs
@@ -5,7 +5,8 @@ use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, DATASTORE_SCHEMA,
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_PRUNE,
- REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
+ REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA,
+ TRANSFER_LAST_SCHEMA,
};
use proxmox_rest_server::WorkerTask;
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -99,6 +100,10 @@ fn check_push_privs(
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -122,6 +127,7 @@ async fn push(
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ parallel_groups: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -151,6 +157,7 @@ async fn push(
group_filter,
limit,
transfer_last,
+ parallel_groups,
)
.await?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 516abfe5d..0986bc5c8 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -57,6 +57,8 @@ pub(crate) struct PullParameters {
transfer_last: Option<usize>,
/// Whether to re-sync corrupted snapshots
resync_corrupt: bool,
+ /// Maximum number of parallel groups to pull during sync job
+ parallel_groups: Option<usize>,
}
impl PullParameters {
@@ -75,6 +77,7 @@ impl PullParameters {
limit: RateLimitConfig,
transfer_last: Option<usize>,
resync_corrupt: Option<bool>,
+ parallel_groups: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -121,6 +124,7 @@ impl PullParameters {
group_filter,
transfer_last,
resync_corrupt,
+ parallel_groups,
})
}
}
diff --git a/src/server/push.rs b/src/server/push.rs
index 8911b80fc..c61f0fe73 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -75,6 +75,8 @@ pub(crate) struct PushParameters {
group_filter: Vec<GroupFilter>,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
+ /// Maximum number of parallel groups to push during sync job
+ parallel_groups: Option<usize>,
}
impl PushParameters {
@@ -92,6 +94,7 @@ impl PushParameters {
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ parallel_groups: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -150,6 +153,7 @@ impl PushParameters {
max_depth,
group_filter,
transfer_last,
+ parallel_groups,
})
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..8e5d39182 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -673,6 +673,7 @@ pub fn do_sync_job(
sync_job.group_filter.clone(),
sync_job.limit.clone(),
sync_job.transfer_last,
+ sync_job.parallel_groups,
)
.await?;
push_store(push_params).await?
--
2.39.5
More information about the pbs-devel
mailing list