[pbs-devel] [PATCH proxmox-backup v2 3/7] Add pruning parameters to the pull command

Stefan Hanreich s.hanreich at proxmox.com
Wed Nov 30 16:00:58 CET 2022


Added the prune options to the pull command, that now optionally executes
a prune job after pulling. This can be used to automatically prune the pulled
groups. group filters still apply to the pruning.

In order to use the new PruneJob function I had to adjust the
reference to the WorkerTask to an Arc<WorkerTask>, since this is what is
needed by the new PruneJob struct.

Additionally I refactored the pull method by extracting the
remove_vanished functionality into its own function. This should make
the code easier to read.

Signed-off-by: Stefan Hanreich <s.hanreich at proxmox.com>
---
 src/api2/pull.rs   |   4 +-
 src/server/pull.rs | 169 ++++++++++++++++++++++++++++++---------------
 2 files changed, 115 insertions(+), 58 deletions(-)

diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index f3b31e05..719b7ca1 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -129,7 +129,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, pull_params).await?;
+                pull_store(worker.clone(), &client, pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -285,7 +285,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(worker.clone(), &client, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 634a0b70..44068c3b 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -13,12 +13,12 @@ use pbs_config::CachedUserInfo;
 use serde_json::json;
 
 use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, KeepOptions,
-    NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
-    PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
+    print_store_and_ns, Authid, BackupGroup, BackupNamespace, GroupFilter, GroupListItem,
+    KeepOptions, NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
+    MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
 
 use pbs_client::{
@@ -31,6 +31,7 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::prune::PruneJob;
 use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
 use proxmox_rest_server::WorkerTask;
@@ -901,7 +902,7 @@ fn check_and_remove_vanished_ns(
 /// - creation and removal of sub-NS checked here
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
@@ -913,7 +914,7 @@ pub(crate) async fn pull_store(
     let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
         vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
     } else {
-        query_namespaces(worker, client, &mut params).await?
+        query_namespaces(&worker, client, &mut params).await?
     };
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
 
@@ -952,7 +953,15 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(
+            worker.clone(),
+            client,
+            &params,
+            namespace.clone(),
+            target_ns,
+        )
+        .await
+        {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -981,7 +990,7 @@ pub(crate) async fn pull_store(
     }
 
     if params.remove_vanished {
-        errors |= check_and_remove_vanished_ns(worker, &params, synced_ns)?;
+        errors |= check_and_remove_vanished_ns(&worker, &params, synced_ns)?;
     }
 
     if errors {
@@ -1004,7 +1013,7 @@ pub(crate) async fn pull_store(
 /// - remote namespaces are filtered by remote
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
-    worker: &WorkerTask,
+    worker: Arc<WorkerTask>,
     client: &HttpClient,
     params: &PullParameters,
     source_ns: BackupNamespace,
@@ -1037,10 +1046,6 @@ pub(crate) async fn pull_ns(
         }
     });
 
-    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
-        filters.iter().any(|filter| group.matches(filter))
-    };
-
     // Get groups with target NS set
     let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
 
@@ -1071,7 +1076,7 @@ pub(crate) async fn pull_ns(
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
-    for (done, group) in list.into_iter().enumerate() {
+    for (done, group) in list.iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
@@ -1079,14 +1084,14 @@ pub(crate) async fn pull_ns(
         let (owner, _lock_guard) =
             match params
                 .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
+                .create_locked_backup_group(&target_ns, group, &params.owner)
             {
                 Ok(result) => result,
                 Err(err) => {
                     task_log!(
                         worker,
                         "sync group {} failed - group lock failed: {}",
-                        &group,
+                        group,
                         err
                     );
                     errors = true; // do not stop here, instead continue
@@ -1100,66 +1105,118 @@ pub(crate) async fn pull_ns(
             task_log!(
                 worker,
                 "sync group {} failed - owner check failed ({} != {})",
-                &group,
+                group,
                 params.owner,
                 owner
             );
             errors = true; // do not stop here, instead continue
         } else if let Err(err) = pull_group(
-            worker,
+            worker.clone().as_ref(),
             client,
             params,
-            &group,
+            group,
             source_ns.clone(),
             &mut progress,
         )
         .await
         {
-            task_log!(worker, "sync group {} failed - {}", &group, err,);
+            task_log!(worker, "sync group {} failed - {}", group, err,);
             errors = true; // do not stop here, instead continue
         }
     }
 
     if params.remove_vanished {
-        let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
-                let local_group = local_group?;
-                let local_group = local_group.group();
-                if new_groups.contains(local_group) {
-                    continue;
-                }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
-                if check_backup_owner(&owner, &params.owner).is_err() {
-                    continue;
-                }
-                if let Some(ref group_filter) = &params.group_filter {
-                    if !apply_filters(local_group, group_filter) {
-                        continue;
-                    }
-                }
-                task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
-                    Ok(true) => {}
-                    Ok(false) => {
-                        task_log!(
-                            worker,
-                            "kept some protected snapshots of group '{}'",
-                            local_group
-                        );
-                    }
-                    Err(err) => {
-                        task_log!(worker, "{}", err);
-                        errors = true;
-                    }
-                }
-            }
-            Ok(())
-        });
-        if let Err(err) = result {
-            task_log!(worker, "error during cleanup: {}", err);
+        if let Err(err) = remove_vanished(worker.clone(), params, target_ns.clone(), &new_groups) {
+            task_warn!(worker, "error during cleanup: {}", err);
+            errors = true;
+        };
+    }
+
+    if params.keep_options.keeps_something() {
+        if let Err(err) = prune_namespace(worker.clone(), params, target_ns.clone(), list) {
+            task_warn!(worker, "error during pruning: {}", err);
             errors = true;
         };
     }
 
     Ok((progress, errors))
 }
+
+fn apply_filters(group: &BackupGroup, filters: &[GroupFilter]) -> bool {
+    filters.iter().any(|filter| group.matches(filter))
+}
+
+fn remove_vanished(
+    worker: Arc<WorkerTask>,
+    params: &PullParameters,
+    target_ns: BackupNamespace,
+    new_groups: &HashSet<BackupGroup>,
+) -> Result<(), Error> {
+    let list_groups = params.store.iter_backup_groups(target_ns.clone())?;
+
+    for local_group in list_groups {
+        let local_group = local_group?;
+        let local_group = local_group.group();
+
+        if new_groups.contains(local_group) {
+            continue;
+        }
+
+        let owner = params.store.get_owner(&target_ns, local_group)?;
+        if check_backup_owner(&owner, &params.owner).is_err() {
+            continue;
+        }
+
+        if let Some(ref group_filter) = &params.group_filter {
+            if !apply_filters(local_group, group_filter) {
+                continue;
+            }
+        }
+
+        task_log!(worker, "delete vanished group '{local_group}'");
+
+        if !params.store.remove_backup_group(&target_ns, local_group)? {
+            task_log!(
+                worker,
+                "kept some protected snapshots of group '{}'",
+                local_group
+            );
+        }
+    }
+
+    Ok(())
+}
+
+fn prune_namespace(
+    worker: Arc<WorkerTask>,
+    params: &PullParameters,
+    target_ns: BackupNamespace,
+    backup_groups: Vec<BackupGroup>,
+) -> Result<(), Error> {
+    task_log!(worker, "running prune job");
+
+    for local_group in backup_groups.into_iter() {
+        let owner = params.store.get_owner(&target_ns, &local_group)?;
+        if check_backup_owner(&owner, &params.owner).is_err() {
+            continue;
+        }
+
+        if let Some(ref group_filter) = &params.group_filter {
+            if !apply_filters(&local_group, group_filter) {
+                continue;
+            }
+        }
+
+        task_log!(worker, "pruning backup group {}", &local_group);
+
+        let backup_group = params
+            .store
+            .backup_group(target_ns.clone(), local_group.clone());
+
+        PruneJob::new(backup_group.list_backups()?, &params.keep_options)?
+            .logging(worker.clone())
+            .run();
+    }
+
+    Ok(())
+}
-- 
2.30.2





More information about the pbs-devel mailing list