[pbs-devel] applied-series: [PATCH v7 proxmox-backup 00/31] fix #3044: push datastore to remote target

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Nov 21 13:08:09 CET 2024


with some follow-ups, notably:

- dropped the named features, checking just the api version instead
- simplified the namespace filtering
- added a new datastore_api_path helper
- add a check whether the source namespace anchor exists
- simplify the snapshot selection for remove_vanished
- reduce the (expensive) queries to the remote snapshot lists
- some code simplification and style cleanups

the error handling/contexts could still benefit from some attention
(mostly making clear where an error from the remote side is bubbled up,
since the ACL paths are confusing if that context is not included), but
that can be done as follow-up..

full diff of changes on top of the series as it is on-list:

diff --git a/Cargo.toml b/Cargo.toml
index 2fa1f04bf..6c9bf878c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,5 +1,5 @@
 [workspace.package]
-version = "3.2.10"
+version = "3.2.11"
 authors = [
     "Dietmar Maurer <dietmar at proxmox.com>",
     "Dominik Csapak <d.csapak at proxmox.com>",
diff --git a/debian/changelog b/debian/changelog
index d200b063c..b69ac905d 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,11 @@
+rust-proxmox-backup (3.2.11-1) bookworm; urgency=medium
+
+  * fix #3044: server: implement push support for sync operations
+
+  * push sync related refactors
+
+ -- Proxmox Support Team <support at proxmox.com>  Thu, 21 Nov 2024 12:03:50 +0100
+
 rust-proxmox-backup (3.2.10-1) bookworm; urgency=medium
 
   * api: disk list: do not fail but just log error on gathering smart data
diff --git a/pbs-api-types/src/version.rs b/pbs-api-types/src/version.rs
index 7a4c6cb74..80f87e372 100644
--- a/pbs-api-types/src/version.rs
+++ b/pbs-api-types/src/version.rs
@@ -20,14 +20,6 @@ use proxmox_schema::api;
             description: "Version repository id",
             type: String,
         },
-        "features": {
-            description: "List of supported features",
-            type: Array,
-            items: {
-                type: String,
-                description: "Feature id",
-            },
-        },
     }
 )]
 #[derive(serde::Deserialize, serde::Serialize)]
@@ -35,8 +27,6 @@ pub struct ApiVersionInfo {
     pub version: String,
     pub release: String,
     pub repoid: String,
-    #[serde(default, skip_serializing_if = "Vec::is_empty")]
-    pub features: Vec<String>,
 }
 
 pub type ApiVersionMajor = u64;
@@ -48,7 +38,6 @@ pub struct ApiVersion {
     pub minor: ApiVersionMinor,
     pub release: ApiVersionRelease,
     pub repoid: String,
-    pub features: Vec<String>,
 }
 
 impl TryFrom<ApiVersionInfo> for ApiVersion {
@@ -76,13 +65,6 @@ impl TryFrom<ApiVersionInfo> for ApiVersion {
             minor,
             release,
             repoid: value.repoid.clone(),
-            features: value.features.clone(),
         })
     }
 }
-
-impl ApiVersion {
-    pub fn supports_feature(&self, feature: &str) -> bool {
-        self.features.iter().any(|f| f == feature)
-    }
-}
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 97d599e30..78eb73205 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -76,24 +76,31 @@ pub fn check_sync_job_modify_access(
     match sync_direction {
         SyncDirection::Pull => {
             let ns_anchor_privs = user_info.lookup_privs(auth_id, &job.acl_path());
-            if ns_anchor_privs & PRIV_DATASTORE_BACKUP == 0
-                || ns_anchor_privs & PRIV_DATASTORE_AUDIT == 0
-            {
+
+            // job visibility check
+            if ns_anchor_privs & PRIV_DATASTORE_AUDIT == 0 {
+                return false;
+            }
+
+            // creating backups on target check
+            if ns_anchor_privs & PRIV_DATASTORE_BACKUP == 0 {
                 return false;
             }
 
             if let Some(true) = job.remove_vanished {
+                // pruning backups on target check
                 if ns_anchor_privs & PRIV_DATASTORE_PRUNE == 0 {
                     return false;
                 }
             }
 
-            // same permission as changing ownership after syncing
+            // same permission as changing ownership after syncing on the target side
             if !is_correct_owner(auth_id, job) && ns_anchor_privs & PRIV_DATASTORE_MODIFY == 0 {
                 return false;
             }
 
             if let Some(remote) = &job.remote {
+                // remote read access check
                 let remote_privs =
                     user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
                 return remote_privs & PRIV_REMOTE_READ != 0;
@@ -127,13 +134,13 @@ pub fn check_sync_job_modify_access(
                 return true;
             }
 
-            // check user is not the owner of the sync job, but has datastore modify permissions,
-            // which implies permissions to change group ownership
+            // check datastore modify permission if user is not the owner of the sync job
+            // this implies permissions to change group ownership
             if !is_correct_owner(auth_id, job) && source_privs & PRIV_DATASTORE_MODIFY == 0 {
                 return false;
             }
 
-            // user has Datastore.Modify, check also for Datastore.Backup to allow modify access
+            // no read on full datastore, so check backup access for owned backups
             source_privs & PRIV_DATASTORE_BACKUP != 0
         }
     }
diff --git a/src/api2/version.rs b/src/api2/version.rs
index da2cb74b4..4d104f2d6 100644
--- a/src/api2/version.rs
+++ b/src/api2/version.rs
@@ -8,8 +8,6 @@ use proxmox_schema::api;
 
 use pbs_api_types::ApiVersionInfo;
 
-const FEATURES: &[&str] = &["prune-delete-stats"];
-
 #[api(
     returns: {
         type: ApiVersionInfo,
@@ -28,7 +26,6 @@ fn version(
         version: pbs_buildcfg::PROXMOX_PKG_VERSION.to_string(),
         release: pbs_buildcfg::PROXMOX_PKG_RELEASE.to_string(),
         repoid: pbs_buildcfg::PROXMOX_PKG_REPOID.to_string(),
-        features: FEATURES.iter().map(|feature| feature.to_string()).collect(),
     })
 }
 
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index f91d5bf29..d887dc1d5 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -647,8 +647,10 @@ async fn run() -> Result<(), Error> {
             CliCommand::new(&API_METHOD_PUSH_DATASTORE)
                 .arg_param(&["store", "remote", "remote-store"])
                 .completion_cb("store", pbs_config::datastore::complete_datastore_name)
+                .completion_cb("ns", complete_sync_local_datastore_namespace)
                 .completion_cb("remote", pbs_config::remote::complete_remote_name)
-                .completion_cb("remote-store", complete_remote_datastore_name),
+                .completion_cb("remote-store", complete_remote_datastore_name)
+                .completion_cb("remote-ns", complete_remote_datastore_namespace),
         )
         .insert(
             "verify",
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e00187764..08b55956c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -739,15 +739,9 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
     let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
         vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
     } else {
-        let user_info = CachedUserInfo::new()?;
         params
             .source
-            .list_namespaces(
-                &mut params.max_depth,
-                &params.owner,
-                &user_info,
-                Box::new(|_| true),
-            )
+            .list_namespaces(&mut params.max_depth, Box::new(|_| true))
             .await?
     };
 
diff --git a/src/server/push.rs b/src/server/push.rs
index f6dd02fc9..4a222627b 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -50,6 +50,13 @@ impl PushTarget {
     fn remote_user(&self) -> Authid {
         self.remote.config.auth_id.clone()
     }
+
+    fn datastore_api_path(&self, endpoint: &str) -> String {
+        format!(
+            "api2/json/admin/datastore/{store}/{endpoint}",
+            store = self.repo.store()
+        )
+    }
 }
 
 /// Parameters for a push operation
@@ -92,11 +99,16 @@ impl PushParameters {
             remote_ns.check_max_depth(max_depth)?;
         };
         let remove_vanished = remove_vanished.unwrap_or(false);
+        let store = DataStore::lookup_datastore(store, Some(Operation::Read))?;
+
+        if !store.namespace_exists(&ns) {
+            bail!(
+                "Source namespace '{ns}' doesn't exist in datastore '{store}'!",
+                store = store.name()
+            );
+        }
 
-        let source = Arc::new(LocalSource {
-            store: DataStore::lookup_datastore(store, Some(Operation::Read))?,
-            ns,
-        });
+        let source = Arc::new(LocalSource { store, ns });
 
         let (remote_config, _digest) = pbs_config::remote::config()?;
         let remote: Remote = remote_config.lookup("remote", remote_id)?;
@@ -114,11 +126,15 @@ impl PushParameters {
         let data = result["data"].take();
         let version_info: ApiVersionInfo = serde_json::from_value(data)?;
         let api_version = ApiVersion::try_from(version_info)?;
-        let supports_prune_delete_stats = api_version.supports_feature("prune-delete-stats");
+
+        // push assumes namespace support on the remote side, fail early if missing
         if api_version.major < 2 || (api_version.major == 2 && api_version.minor < 2) {
             bail!("unsupported remote api version, minimum v2.2 required");
         }
 
+        let supports_prune_delete_stats = api_version.major > 3
+            || (api_version.major == 3 && api_version.minor >= 2 && api_version.release >= 11);
+
         let target = PushTarget {
             remote,
             repo,
@@ -163,10 +179,7 @@ fn check_ns_remote_datastore_privs(
 
 // Fetch the list of namespaces found on target
 async fn fetch_target_namespaces(params: &PushParameters) -> Result<Vec<BackupNamespace>, Error> {
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/namespace",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("namespace");
     let mut result = params.target.client.get(&api_path, None).await?;
     let namespaces: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
     let mut namespaces: Vec<BackupNamespace> = namespaces
@@ -188,12 +201,9 @@ async fn remove_target_namespace(
     }
 
     check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_MODIFY)
-        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
+        .map_err(|err| format_err!("Pruning remote datastore namespaces not allowed - {err}"))?;
 
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/namespace",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("namespace");
 
     let mut args = serde_json::json!({
         "ns": target_namespace.name(),
@@ -222,25 +232,20 @@ async fn fetch_target_groups(
     params: &PushParameters,
     target_namespace: &BackupNamespace,
 ) -> Result<(Vec<BackupGroup>, HashSet<BackupGroup>), Error> {
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/groups",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("groups");
     let args = Some(serde_json::json!({ "ns": target_namespace.name() }));
 
     let mut result = params.target.client.get(&api_path, args).await?;
     let groups: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
 
-    let (mut owned, not_owned) = groups.iter().fold(
+    let (mut owned, not_owned) = groups.into_iter().fold(
         (Vec::new(), HashSet::new()),
         |(mut owned, mut not_owned), group| {
-            if let Some(ref owner) = group.owner {
-                if params.target.remote_user() == *owner {
-                    owned.push(group.backup.clone());
-                    return (owned, not_owned);
-                }
+            if Some(params.target.remote_user()) == group.owner {
+                owned.push(group.backup);
+            } else {
+                not_owned.insert(group.backup);
             }
-            not_owned.insert(group.backup.clone());
             (owned, not_owned)
         },
     );
@@ -259,22 +264,15 @@ async fn remove_target_group(
     check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_PRUNE)
         .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
 
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/groups",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("groups");
 
-    let mut args = serde_json::json!({
-        "backup-id": backup_group.id,
-        "backup-type": backup_group.ty,
-    });
+    let mut args = serde_json::json!(backup_group);
+    args["ns"] = serde_json::to_value(target_namespace.name())?;
 
     if params.target.supports_prune_delete_stats {
         args["error-on-protected"] = serde_json::to_value(false)?;
     }
 
-    args["ns"] = serde_json::to_value(target_namespace.name())?;
-
     let mut result = params.target.client.delete(&api_path, Some(args)).await?;
 
     if params.target.supports_prune_delete_stats {
@@ -289,10 +287,10 @@ async fn remove_target_group(
 // Check if the namespace is already present on the target, create it otherwise
 async fn check_or_create_target_namespace(
     params: &PushParameters,
-    target_namespaces: &[BackupNamespace],
+    existing_target_namespaces: &mut Vec<BackupNamespace>,
     target_namespace: &BackupNamespace,
 ) -> Result<(), Error> {
-    if !target_namespace.is_root() && !target_namespaces.contains(target_namespace) {
+    if !target_namespace.is_root() && !existing_target_namespaces.contains(target_namespace) {
         // Namespace not present on target, create namespace.
         // Sub-namespaces have to be created by creating parent components first.
 
@@ -303,25 +301,22 @@ async fn check_or_create_target_namespace(
         for component in target_namespace.components() {
             let current = BackupNamespace::from_parent_ns(&parent, component.to_string())?;
             // Skip over pre-existing parent namespaces on target
-            if target_namespaces.contains(&current) {
+            if existing_target_namespaces.contains(&current) {
                 parent = current;
                 continue;
             }
-            let api_path = format!(
-                "api2/json/admin/datastore/{store}/namespace",
-                store = params.target.repo.store(),
-            );
+            let api_path = params.target.datastore_api_path("namespace");
             let mut args = serde_json::json!({ "name": component.to_string() });
             if !parent.is_root() {
                 args["parent"] = serde_json::to_value(parent.clone())?;
             }
-            let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &current);
             match params.target.client.post(&api_path, Some(args)).await {
-                Ok(_) => info!("Created new namespace on target: {target_store_and_ns}"),
-                Err(err) => bail!(
-                    "Sync into {target_store_and_ns} failed - namespace creation failed: {err}"
-                ),
+                Ok(_) => info!("Created new namespace on target: {current}"),
+                Err(err) => {
+                    bail!("Remote creation of namespace {current} failed, remote returned: {err}")
+                }
             }
+            existing_target_namespaces.push(current.clone());
             parent = current;
         }
     }
@@ -334,38 +329,40 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
     let mut errors = false;
 
     let user_info = CachedUserInfo::new()?;
-    // Generate list of source namespaces to push to target, limited by max-depth and pre-filtered
+    let store = params.source.get_store().to_owned();
+    let auth_id = params.local_user.clone();
+    // Generate list of source namespaces to push to target, limited by max-depth and filtered
     // by local user access privs.
-    let mut namespaces = params
+    let ns_access_filter = Box::new(move |namespace: &BackupNamespace| {
+        let acl_path = namespace.acl_path(&store);
+        let privs = PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP;
+        user_info
+            .check_privs(&auth_id, &acl_path, privs, true)
+            .is_ok()
+    });
+    let mut source_namespaces = params
         .source
-        .list_namespaces(
-            &mut params.max_depth,
-            &params.local_user,
-            &user_info,
-            Box::new(|(namespace, store, auth_id, user_info)| {
-                let acl_path = namespace.acl_path(&store);
-                let privs = PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP;
-                user_info
-                    .check_privs(auth_id, &acl_path, privs, true)
-                    .is_ok()
-            }),
-        )
+        .list_namespaces(&mut params.max_depth, ns_access_filter)
         .await?;
 
-    check_namespace_depth_limit(&params.source.get_ns(), &params.target.ns, &namespaces)?;
+    check_namespace_depth_limit(
+        &params.source.get_ns(),
+        &params.target.ns,
+        &source_namespaces,
+    )?;
 
-    namespaces.sort_unstable_by_key(|a| a.name_len());
+    source_namespaces.sort_unstable_by_key(|a| a.name_len());
 
     // Fetch all accessible namespaces already present on the target
-    let target_namespaces = fetch_target_namespaces(&params).await?;
+    let mut existing_target_namespaces = fetch_target_namespaces(&params).await?;
     // Remember synced namespaces, removing non-synced ones when remove vanished flag is set
-    let mut synced_namespaces = HashSet::with_capacity(namespaces.len());
+    let mut synced_namespaces = HashSet::with_capacity(source_namespaces.len());
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut stats = SyncStats::default();
-    for namespace in &namespaces {
-        let source_store_and_ns = print_store_and_ns(params.source.store.name(), namespace);
-        let target_namespace = params.map_to_target(&namespace)?;
+    for source_namespace in &source_namespaces {
+        let source_store_and_ns = print_store_and_ns(params.source.store.name(), source_namespace);
+        let target_namespace = params.map_to_target(source_namespace)?;
         let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &target_namespace);
 
         info!("----");
@@ -373,15 +370,20 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
 
         synced_namespaces.insert(target_namespace.clone());
 
-        if let Err(err) =
-            check_or_create_target_namespace(&params, &target_namespaces, &target_namespace).await
+        if let Err(err) = check_or_create_target_namespace(
+            &params,
+            &mut existing_target_namespaces,
+            &target_namespace,
+        )
+        .await
         {
-            info!("Cannot sync {source_store_and_ns} into {target_store_and_ns} - {err}");
+            warn!("Encountered error: {err}");
+            warn!("Failed to sync {source_store_and_ns} into {target_store_and_ns}!");
             errors = true;
             continue;
         }
 
-        match push_namespace(namespace, &params).await {
+        match push_namespace(source_namespace, &params).await {
             Ok((sync_progress, sync_stats, sync_errors)) => {
                 errors |= sync_errors;
                 stats.add(sync_stats);
@@ -390,10 +392,10 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
                     groups += sync_progress.done_groups;
                     snapshots += sync_progress.done_snapshots;
 
-                    let ns = if namespace.is_root() {
+                    let ns = if source_namespace.is_root() {
                         "root namespace".into()
                     } else {
-                        format!("namespace {namespace}")
+                        format!("namespace {source_namespace}")
                     };
                     info!(
                         "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots"
@@ -402,7 +404,7 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
             }
             Err(err) => {
                 errors = true;
-                info!("Encountered errors while syncing namespace {namespace} - {err}");
+                info!("Encountered errors while syncing namespace {source_namespace} - {err}");
             }
         }
     }
@@ -411,23 +413,20 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
         // Attention: Filter out all namespaces which are not sub-namespaces of the sync target
         // namespace, or not included in the sync because of the depth limit.
         // Without this pre-filtering, all namespaces unrelated to the sync would be removed!
-        let mut target_sub_namespaces = Vec::new();
-        for namespace in &namespaces {
-            let target_namespace = params.map_to_target(&namespace)?;
-            let mut sub_namespaces = target_namespaces
-                .iter()
-                .filter(|namespace| {
-                    if let Some(depth) = target_namespace.contains(namespace) {
-                        if let Some(max_depth) = params.max_depth {
-                            return depth <= max_depth;
-                        }
-                        return true;
-                    }
-                    false
-                })
-                .collect();
-            target_sub_namespaces.append(&mut sub_namespaces);
-        }
+        let max_depth = params
+            .max_depth
+            .unwrap_or_else(|| pbs_api_types::MAX_NAMESPACE_DEPTH);
+        let mut target_sub_namespaces: Vec<BackupNamespace> = existing_target_namespaces
+            .into_iter()
+            .filter(|target_namespace| {
+                params
+                    .target
+                    .ns
+                    .contains(&target_namespace)
+                    .map(|sub_depth| sub_depth <= max_depth)
+                    .unwrap_or(false)
+            })
+            .collect();
 
         // Sort by namespace length and revert for sub-namespaces to be removed before parents
         target_sub_namespaces.sort_unstable_by_key(|a| a.name_len());
@@ -577,10 +576,7 @@ async fn fetch_target_snapshots(
     target_namespace: &BackupNamespace,
     group: &BackupGroup,
 ) -> Result<Vec<SnapshotListItem>, Error> {
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/snapshots",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("snapshots");
     let mut args = serde_json::to_value(group)?;
     if !target_namespace.is_root() {
         args["ns"] = serde_json::to_value(target_namespace)?;
@@ -591,16 +587,6 @@ async fn fetch_target_snapshots(
     Ok(snapshots)
 }
 
-async fn fetch_previous_backup_time(
-    params: &PushParameters,
-    target_namespace: &BackupNamespace,
-    group: &BackupGroup,
-) -> Result<Option<i64>, Error> {
-    let mut snapshots = fetch_target_snapshots(params, target_namespace, group).await?;
-    snapshots.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-    Ok(snapshots.last().map(|snapshot| snapshot.backup.time))
-}
-
 async fn forget_target_snapshot(
     params: &PushParameters,
     target_namespace: &BackupNamespace,
@@ -609,10 +595,7 @@ async fn forget_target_snapshot(
     check_ns_remote_datastore_privs(params, target_namespace, PRIV_REMOTE_DATASTORE_PRUNE)
         .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
 
-    let api_path = format!(
-        "api2/json/admin/datastore/{store}/snapshots",
-        store = params.target.repo.store(),
-    );
+    let api_path = params.target.datastore_api_path("snapshots");
     let mut args = serde_json::to_value(snapshot)?;
     if !target_namespace.is_root() {
         args["ns"] = serde_json::to_value(target_namespace)?;
@@ -650,8 +633,12 @@ pub(crate) async fn push_group(
         .unwrap_or_default();
 
     let target_namespace = params.map_to_target(namespace)?;
-    let last_snapshot_time = fetch_previous_backup_time(params, &target_namespace, group)
-        .await?
+    let mut target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?;
+    target_snapshots.sort_unstable_by_key(|a| a.backup.time);
+
+    let last_snapshot_time = target_snapshots
+        .last()
+        .map(|snapshot| snapshot.backup.time)
         .unwrap_or(i64::MIN);
 
     let mut source_snapshots = HashSet::new();
@@ -684,20 +671,9 @@ pub(crate) async fn push_group(
 
     progress.group_snapshots = snapshots.len() as u64;
 
-    let target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?;
-    let target_snapshots: Vec<BackupDir> = target_snapshots
-        .into_iter()
-        .map(|snapshot| snapshot.backup)
-        .collect();
-
     let mut stats = SyncStats::default();
     let mut fetch_previous_manifest = !target_snapshots.is_empty();
     for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
-        if target_snapshots.contains(&source_snapshot) {
-            progress.done_snapshots = pos as u64 + 1;
-            info!("percentage done: {progress}");
-            continue;
-        }
         let result =
             push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
         fetch_previous_manifest = true;
@@ -711,7 +687,6 @@ pub(crate) async fn push_group(
     }
 
     if params.remove_vanished {
-        let target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?;
         for snapshot in target_snapshots {
             if source_snapshots.contains(&snapshot.backup.time) {
                 continue;
@@ -814,7 +789,7 @@ pub(crate) async fn push_snapshot(
     };
 
     // Avoid double upload penalty by remembering already seen chunks
-    let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 1024)));
+    let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64 * 1024)));
 
     for entry in source_manifest.files() {
         let mut path = backup_dir.full_path();
@@ -882,7 +857,7 @@ pub(crate) async fn push_snapshot(
                 }
             }
         } else {
-            warn!("{path:?} does not exist, skipped.");
+            bail!("{path:?} does not exist, aborting upload.");
         }
     }
 
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 4ce0777bf..a0157ab2d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -21,7 +21,6 @@ use pbs_api_types::{
     SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
 };
 use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
-use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
 use pbs_datastore::read_chunk::AsyncReadChunk;
@@ -215,8 +214,7 @@ impl SyncSourceReader for LocalSourceReader {
     }
 }
 
-pub type NamespaceFilter =
-    Box<dyn FnMut((&BackupNamespace, &str, &Authid, &CachedUserInfo)) -> bool + Send>;
+pub type NamespaceFilter = Box<dyn Fn(&BackupNamespace) -> bool + Send>;
 
 #[async_trait::async_trait]
 /// `SyncSource` is a trait that provides an interface for synchronizing data/information from a
@@ -228,8 +226,6 @@ pub(crate) trait SyncSource: Send + Sync {
     async fn list_namespaces(
         &self,
         max_depth: &mut Option<usize>,
-        auth_id: &Authid,
-        user_info: &CachedUserInfo,
         filter_callback: NamespaceFilter,
     ) -> Result<Vec<BackupNamespace>, Error>;
 
@@ -273,9 +269,7 @@ impl SyncSource for RemoteSource {
     async fn list_namespaces(
         &self,
         max_depth: &mut Option<usize>,
-        auth_id: &Authid,
-        user_info: &CachedUserInfo,
-        mut filter_callback: NamespaceFilter,
+        filter_callback: NamespaceFilter,
     ) -> Result<Vec<BackupNamespace>, Error> {
         if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
             return Ok(vec![self.ns.clone()]);
@@ -323,10 +317,7 @@ impl SyncSource for RemoteSource {
                 .map(|list_item| list_item.ns)
                 .collect();
 
-        let list = list
-            .into_iter()
-            .filter(|namespace| filter_callback((namespace, self.get_store(), auth_id, user_info)))
-            .collect();
+        let list = list.into_iter().filter(filter_callback).collect();
 
         Ok(list)
     }
@@ -421,9 +412,7 @@ impl SyncSource for LocalSource {
     async fn list_namespaces(
         &self,
         max_depth: &mut Option<usize>,
-        auth_id: &Authid,
-        user_info: &CachedUserInfo,
-        mut filter_callback: NamespaceFilter,
+        filter_callback: NamespaceFilter,
     ) -> Result<Vec<BackupNamespace>, Error> {
         let list: Result<Vec<BackupNamespace>, Error> = ListNamespacesRecursive::new_max_depth(
             self.store.clone(),
@@ -432,10 +421,7 @@ impl SyncSource for LocalSource {
         )?
         .collect();
 
-        let list = list?
-            .into_iter()
-            .filter(|namespace| filter_callback((namespace, self.get_store(), auth_id, user_info)))
-            .collect();
+        let list = list?.into_iter().filter(filter_callback).collect();
 
         Ok(list)
     }

On November 11, 2024 4:43 pm, Christian Ebner wrote:
> This patch series implements the functionality to extend the current
> sync jobs in pull direction by an additional push direction, allowing
> to push contents of a local source datastore to a remote target.
> 
> The series implements this by using the REST API of the remote target
> for fetching, creating and/or deleting namespaces, groups and backups,
> and reuses the clients backup writer functionality to create snapshots
> by writing a manifeset on the remote target and sync the fixed index,
> dynamic index or blobs contained in the source manifest to the remote,
> preserving also encryption information.
> 
> Thanks to Fabian for further feedback to the previous patch series
> version.
> 
> Changes since version 6 of the patch series:
> - Fix permission check for sync job modify access, correctly check local
>   datastore access if job not owned by sync user.
> - Pre-filter source namespaces, so namespaces which the sync user has no
>   access to cannot be leaked.
> - Avoid possibly removing unrelated target namespaces during remove
>   vanished by only removing sub-namespaces of the remote target namespace.
> - Fix issues with local/target namespace mapping, make clear which are
>   which by adapting variable names accordingly.
> - Adapt roles related to remote datastore access to mimic roles for
>   local datastore access.
> - Uncoditionally pass namespace parameter and early check and fail if
>   remote does not support namespaces.
> - Fetch previous snapshots index to initialize known chunks correctly.
> - Adapt snapshot filter for excluding snapshots older than current last
>   snapshot already present on target.
> - Fix incorrect owner header label in sync job grid for push direction.
> - Use `BackupGroup`s `cmp::Ord` for sorting, for pull and push
> - Update some comments and docs.
> 
> Changes since version 5 of the patch series:
> - Split roles and permissions for separate remote datastore prune and remote
>   datastoe modify roles.
> - Fetch target groups filtered by ownership, so to not try to push or remove
>   unowned groups.
> - Add documentation, highlight the caveats of conflicting push jobs when using
>   shared remotes.
> - Check also for optional `PRIV_DATASTORE_BACKUP` as opposed to only
>   `PRIV_DATASTORE_READ` on source datastore namespace, that user can read the
>   contents from there as well.
> - Drop `sync-direction` parameter from API endpoints where not needed, determine
>   it from the corresponding jobs configuration instead.
> - Adapt layout of split job view in WebUI to use more general, less component
>   specific values
> - Introduce `remote_acl_path` helpers for `BackupNamespace` and `SyncJobConfig`.
> - Refactor upload counters to bundle and update counters by chunk variant.
> - Rework `version` endpoint and supported api feature check to be based on
>   `supported_features` rather than a hardcoded version, allowing for more
>   flexibility.
> - `PushParameters` now always have the remote version for check stored
>   unconditionally.
> - Renamed `igonre-protected` to a less misinterpretable `error-on-protected` and
>   inverted boolean logic.
> - Squashed and reorderd patches, the delete stats are not followup patches as
>   they are now fully backwards compatible.
> 
> Changes since version 4 of the patch series:
> - Rebased onto current master
> 
> Most notable changes since version 3 of the patch series include:
> - Rework access control permission checks to resemble the pull based
>   logic more closely.
>   In order to perform a full sync in push direction, including
>   permissions for pruning contents with remove vansished, a acl.cfg
>   looks like below:
>   ```
>   acl:1:/datastore/source-store:syncoperator at pbs:DatastoreReader
>   acl:1:/remote:syncoperator at pbs:RemoteAudit
>   acl:1:/remote/remote-target/target-store:syncoperator at pbs:RemoteDatastorePrune,RemoteSyncPushOperator
>   ```
> - Modify access to sync jobs now requires `DatastoreAudit` for both,
>   pull and push sync jobs
> - Fix previously incorrect privs required for removing target
>   namespaces
> - Fix performance bottleneck by not reading known chunks from source,
>   by sending `MergedChunkInfo` instead of `ChunkInfo` over to the
>   upload stream
> - Factor upload statistic counters and structs out into their own
>   module and provide methods for easy conversion
> - Implement `map_to_target` helper for easier/more readable source to
>   target mapping for namespaces
> - Optimize namespace creation on target, only try creating non
>   pre-existing namespace components.
> - Avoid temp file for manifest and upload source manifest directly
> - Not failing on deletion for protected snapshots is now opt-in
> - Refactor api endpoint `version` in order to be able to fetch api
>   version for target
> - Reworked `SyncDirection` api type, use `api` macro to reduce code
> 
> Most notable changes since version 2 of the patch series include:
> - Add checks and extend roles and privs to allow for restricting a local
>   users access to remote datastore operations. In order to perform a
>   full sync in push direction, including permissions for namespace
>   creation and deleting contents with remove vansished, a acl.cfg looks
>   like below:
>   ```
>   acl:1:/datastore/datastore:syncoperator at pbs:DatastoreAudit
>   acl:1:/remote:syncoperator at pbs:RemoteSyncOperator
>   acl:1:/remote/local/pushme:syncoperator at pbs:RemoteDatastoreModify,RemoteDatastorePrune,RemoteSyncPushOperator
>   ```
>   Based on further feedback, privs might get further grouped or an
>   additional role containing most of these can be created.
> - Drop patch introducing `no-timestamp-check` flag for backup client, as pointed
>   out by Fabian this is not needed, as only backups newer than the currently
>   last available will be pushed.
> - Fix read snapshots from source by using the correct namespace.
> - Rename PullParameters `owner` to more fitting `local_user`.
> - Fix typos in remote sync push operator comment.
> - Fix comments not matching the functionality for the cli implementations.
> 
> Link to issue on bugtracker:
> https://bugzilla.proxmox.com/show_bug.cgi?id=3044
> 
> Christian Ebner (31):
>   sync: pull: optimize backup group sorting
>   sync: extend sync source's list namespaces method by filter callback
>   client: backup writer: refactor backup and upload stats counters
>   client: backup writer: factor out merged chunk stream upload
>   client: backup writer: allow push uploading index and chunks
>   config: acl: refactor acl path component check for datastore
>   config: acl: allow namespace components for remote datastores
>   api types: add remote acl path method for `BackupNamespace`
>   api types: implement remote acl path method for sync job
>   api types: define remote permissions and roles for push sync
>   datastore: move `BackupGroupDeleteStats` to api types
>   api types: implement api type for `BackupGroupDeleteStats`
>   datastore: increment deleted group counter when removing group
>   api/api-types: refactor api endpoint version, add api types
>   fix #3044: server: implement push support for sync operations
>   api types/config: add `sync-push` config type for push sync jobs
>   api: push: implement endpoint for sync in push direction
>   api: sync: move sync job invocation to server sync module
>   api: config: Require PRIV_DATASTORE_AUDIT to modify sync job
>   api: config: factor out sync job owner check
>   api: sync jobs: expose optional `sync-direction` parameter
>   api: admin: avoid duplicate name for list sync jobs api method
>   bin: manager: add datastore push cli command
>   ui: group filter: allow to set namespace for local datastore
>   ui: sync edit: source group filters based on sync direction
>   ui: add view with separate grids for pull and push sync jobs
>   ui: sync job: adapt edit window to be used for pull and push
>   ui: sync view: set proxy on view instead of model
>   api: datastore/namespace: return backup groups delete stats on remove
>   api: version: add 'prune-delete-stats' as supported feature
>   docs: add section for sync jobs in push direction
> 
>  docs/managing-remotes.rst              |  40 +
>  pbs-api-types/src/acl.rs               |  38 +
>  pbs-api-types/src/datastore.rs         |  76 +-
>  pbs-api-types/src/jobs.rs              |  46 ++
>  pbs-api-types/src/lib.rs               |   3 +
>  pbs-api-types/src/version.rs           |  88 +++
>  pbs-client/src/backup_stats.rs         | 119 +++
>  pbs-client/src/backup_writer.rs        | 318 +++++---
>  pbs-client/src/inject_reused_chunks.rs |  14 +-
>  pbs-client/src/lib.rs                  |   4 +
>  pbs-config/src/acl.rs                  |  11 +-
>  pbs-config/src/sync.rs                 |  16 +-
>  pbs-datastore/src/backup_info.rs       |  34 +-
>  pbs-datastore/src/datastore.rs         |  27 +-
>  src/api2/admin/datastore.rs            |  29 +-
>  src/api2/admin/namespace.rs            |  31 +-
>  src/api2/admin/sync.rs                 |  43 +-
>  src/api2/config/datastore.rs           |  15 +-
>  src/api2/config/notifications/mod.rs   |  21 +-
>  src/api2/config/sync.rs                | 296 ++++++--
>  src/api2/mod.rs                        |   2 +
>  src/api2/pull.rs                       | 108 ---
>  src/api2/push.rs                       | 175 +++++
>  src/api2/version.rs                    |  42 +-
>  src/bin/proxmox-backup-manager.rs      | 216 ++++--
>  src/bin/proxmox-backup-proxy.rs        |  24 +-
>  src/server/mod.rs                      |   2 +
>  src/server/pull.rs                     |  33 +-
>  src/server/push.rs                     | 994 +++++++++++++++++++++++++
>  src/server/sync.rs                     | 179 ++++-
>  www/Makefile                           |   1 +
>  www/config/SyncPullPushView.js         |  61 ++
>  www/config/SyncView.js                 |  29 +-
>  www/datastore/DataStoreList.js         |   2 +-
>  www/datastore/Panel.js                 |   2 +-
>  www/form/GroupFilter.js                |  21 +-
>  www/window/SyncJobEdit.js              |  49 +-
>  37 files changed, 2694 insertions(+), 515 deletions(-)
>  create mode 100644 pbs-api-types/src/version.rs
>  create mode 100644 pbs-client/src/backup_stats.rs
>  create mode 100644 src/api2/push.rs
>  create mode 100644 src/server/push.rs
>  create mode 100644 www/config/SyncPullPushView.js
> 
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list