[pbs-devel] [PATCH v3 proxmox-backup 05/11] pull/sync: extract passed along vars into struct

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Oct 28 15:00:52 CEST 2021


this is basically the sync job config without ID and some stuff
converted already, and a convenient helper to generate the http client
from it.

Suggested-by: Dominik Csapak <d.csapak at proxmox.com>
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 src/api2/config/remote.rs |  4 +-
 src/api2/pull.rs          | 61 +++++++++++++-------------
 src/server/pull.rs        | 91 ++++++++++++++++++++++++++-------------
 3 files changed, 92 insertions(+), 64 deletions(-)

diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 29e638d7..4dffe6bb 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -277,7 +277,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 }
 
 /// Helper to get client for remote.cfg entry
-pub async fn remote_client(remote: Remote) -> Result<HttpClient, Error> {
+pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
     let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
 
     let client = HttpClient::new(
@@ -322,7 +322,7 @@ pub async fn scan_remote_datastores(name: String) -> Result<Vec<DataStoreListIte
                   api_err)
     };
 
-    let client = remote_client(remote)
+    let client = remote_client(&remote)
         .await
         .map_err(map_remote_err)?;
     let api_res = client
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 4280d922..5ae916ed 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -1,5 +1,5 @@
 //! Sync datastore from remote server
-use std::sync::{Arc};
+use std::convert::TryFrom;
 
 use anyhow::{format_err, Error};
 use futures::{select, future::FutureExt};
@@ -7,18 +7,18 @@ use futures::{select, future::FutureExt};
 use proxmox_schema::api;
 use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
 
-use pbs_client::{HttpClient, BackupRepository};
 use pbs_api_types::{
-    Remote, Authid, SyncJobConfig,
+    Authid, SyncJobConfig,
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
 use pbs_tools::task_log;
 use proxmox_rest_server::WorkerTask;
 use pbs_config::CachedUserInfo;
-use pbs_datastore::DataStore;
 
-use crate::server::{jobstate::Job, pull::pull_store};
+use crate::server::pull::{PullParameters, pull_store};
+use crate::server::jobstate::Job;
+
 
 pub fn check_pull_privs(
     auth_id: &Authid,
@@ -40,27 +40,18 @@ pub fn check_pull_privs(
     Ok(())
 }
 
-pub async fn get_pull_parameters(
-    store: &str,
-    remote: &str,
-    remote_store: &str,
-) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
-
-    let tgt_store = DataStore::lookup_datastore(store)?;
-
-    let (remote_config, _digest) = pbs_config::remote::config()?;
-    let remote: Remote = remote_config.lookup("remote", remote)?;
-
-    let src_repo = BackupRepository::new(
-        Some(remote.config.auth_id.clone()),
-        Some(remote.config.host.clone()),
-        remote.config.port,
-        remote_store.to_string(),
-    );
-
-    let client = crate::api2::config::remote::remote_client(remote).await?;
-
-    Ok((client, src_repo, tgt_store))
+impl TryFrom<&SyncJobConfig> for PullParameters {
+    type Error = Error;
+
+    fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+        PullParameters::new(
+            &sync_job.store,
+            &sync_job.remote,
+            &sync_job.remote_store,
+            sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
+            sync_job.remove_vanished,
+        )
+    }
 }
 
 pub fn do_sync_job(
@@ -94,9 +85,8 @@ pub fn do_sync_job(
 
             let worker_future = async move {
 
-                let delete = sync_job.remove_vanished.unwrap_or(true);
-                let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone());
-                let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
+                let pull_params = PullParameters::try_from(&sync_job)?;
+                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -110,7 +100,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
+                pull_store(&worker, &client, &pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -187,14 +177,21 @@ async fn pull (
 
     check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
 
-    let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
+    let pull_params = PullParameters::new(
+        &store,
+        &remote,
+        &remote_store,
+        auth_id.clone(),
+        remove_vanished,
+    )?;
+    let client = pull_params.client().await?;
 
     // fixme: set to_stdout to false?
     let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
 
         task_log!(worker, "sync datastore '{}' start", store);
 
-        let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id);
+        let pull_future = pull_store(&worker, &client, &pull_params);
         let future = select!{
             success = pull_future.fuse() => success,
             abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5c3f9a18..2c454e2d 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -13,7 +13,7 @@ use http::StatusCode;
 
 use proxmox_router::HttpError;
 
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
+use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem};
 use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -33,6 +33,44 @@ use crate::tools::ParallelHandler;
 // fixme: delete vanished groups
 // Todo: correctly lock backup groups
 
+pub struct PullParameters {
+    remote: Remote,
+    source: BackupRepository,
+    store: Arc<DataStore>,
+    owner: Authid,
+    remove_vanished: bool,
+}
+
+impl PullParameters {
+    pub fn new(
+        store: &str,
+        remote: &str,
+        remote_store: &str,
+        owner: Authid,
+        remove_vanished: Option<bool>,
+    ) -> Result<Self, Error> {
+        let store = DataStore::lookup_datastore(store)?;
+
+        let (remote_config, _digest) = pbs_config::remote::config()?;
+        let remote: Remote = remote_config.lookup("remote", remote)?;
+
+        let remove_vanished = remove_vanished.unwrap_or(true);
+
+        let source = BackupRepository::new(
+            Some(remote.config.auth_id.clone()),
+            Some(remote.config.host.clone()),
+            remote.config.port,
+            remote_store.to_string(),
+        );
+
+        Ok(Self { remote, source, store, owner, remove_vanished })
+    }
+
+    pub async fn client(&self) -> Result<HttpClient, Error> {
+        crate::api2::config::remote::remote_client(&self.remote).await
+    }
+}
+
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
     chunk_reader: RemoteChunkReader,
@@ -503,13 +541,11 @@ impl std::fmt::Display for SkipInfo {
 pub async fn pull_group(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
+    params: &PullParameters,
     group: &BackupGroup,
-    delete: bool,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+    let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
 
     let args = json!({
         "backup-type": group.backup_type(),
@@ -525,7 +561,7 @@ pub async fn pull_group(
 
     let fingerprint = client.fingerprint();
 
-    let last_sync = tgt_store.last_successful_backup(group)?;
+    let last_sync = params.store.last_successful_backup(group)?;
 
     let mut remote_snapshots = std::collections::HashSet::new();
 
@@ -566,16 +602,16 @@ pub async fn pull_group(
         let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
 
         let new_client = HttpClient::new(
-            src_repo.host(),
-            src_repo.port(),
-            src_repo.auth_id(),
+            params.source.host(),
+            params.source.port(),
+            params.source.auth_id(),
             options,
         )?;
 
         let reader = BackupReader::start(
             new_client,
             None,
-            src_repo.store(),
+            params.source.store(),
             snapshot.group().backup_type(),
             snapshot.group().backup_id(),
             backup_time,
@@ -586,7 +622,7 @@ pub async fn pull_group(
         let result = pull_snapshot_from(
             worker,
             reader,
-            tgt_store.clone(),
+            params.store.clone(),
             &snapshot,
             downloaded_chunks.clone(),
         )
@@ -598,14 +634,14 @@ pub async fn pull_group(
         result?; // stop on error
     }
 
-    if delete {
-        let local_list = group.list_backups(&tgt_store.base_path())?;
+    if params.remove_vanished {
+        let local_list = group.list_backups(&params.store.base_path())?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
             if remote_snapshots.contains(&backup_time) {
                 continue;
             }
-            if info.backup_dir.is_protected(tgt_store.base_path()) {
+            if info.backup_dir.is_protected(params.store.base_path()) {
                 task_log!(
                     worker,
                     "don't delete vanished snapshot {:?} (protected)",
@@ -614,7 +650,7 @@ pub async fn pull_group(
                 continue;
             }
             task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
-            tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+            params.store.remove_backup_dir(&info.backup_dir, false)?;
         }
     }
 
@@ -628,15 +664,12 @@ pub async fn pull_group(
 pub async fn pull_store(
     worker: &WorkerTask,
     client: &HttpClient,
-    src_repo: &BackupRepository,
-    tgt_store: Arc<DataStore>,
-    delete: bool,
-    auth_id: Authid,
+    params: &PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
 
-    let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
     let mut result = client
         .get(&path, None)
@@ -675,7 +708,7 @@ pub async fn pull_store(
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
 
-        let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
+        let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, &params.owner) {
             Ok(result) => result,
             Err(err) => {
                 task_log!(
@@ -689,21 +722,19 @@ pub async fn pull_store(
         };
 
         // permission check
-        if auth_id != owner {
+        if params.owner != owner {
             // only the owner is allowed to create additional snapshots
             task_log!(
                 worker,
                 "sync group {} failed - owner check failed ({} != {})",
-                &group, auth_id, owner
+                &group, params.owner, owner
             );
             errors = true; // do not stop here, instead continue
         } else if let Err(err) = pull_group(
             worker,
             client,
-            src_repo,
-            tgt_store.clone(),
+            params,
             &group,
-            delete,
             &mut progress,
         )
         .await
@@ -717,9 +748,9 @@ pub async fn pull_store(
         }
     }
 
-    if delete {
+    if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
+            let local_groups = BackupInfo::list_backup_groups(&params.store.base_path())?;
             for local_group in local_groups {
                 if new_groups.contains(&local_group) {
                     continue;
@@ -730,7 +761,7 @@ pub async fn pull_store(
                     local_group.backup_type(),
                     local_group.backup_id()
                 );
-                match tgt_store.remove_backup_group(&local_group) {
+                match params.store.remove_backup_group(&local_group) {
                     Ok(true) => {},
                     Ok(false) => {
                         task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
-- 
2.30.2






More information about the pbs-devel mailing list