[pbs-devel] [PATCH proxmox-backup 01/17] sync: pull: instantiate backend only once per sync job

Christian Ebner c.ebner at proxmox.com
Mon Nov 3 12:31:04 CET 2025


Currently the target datastores' backend is instatziated for each
chunk to be inserted, which on s3 backed datastores leads to the
s3-client being re-instantiated and a new connection being
established.

Optimize this by only creating the backend once and sharing it for
all the chunk inserts to be performed.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 30 +++++++++++++++++++++---------
 1 file changed, 21 insertions(+), 9 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 817b57ac5..de8b140bc 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -38,6 +38,8 @@ use crate::tools::parallel_handler::ParallelHandler;
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
     ns: BackupNamespace,
+    // Contains the active S3Client in case of S3 backend
+    backend: DatastoreBackend,
 }
 
 /// Parameters for a pull operation.
@@ -114,10 +116,9 @@ impl PullParameters {
                 ns: remote_ns,
             })
         };
-        let target = PullTarget {
-            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
-            ns,
-        };
+        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
+        let backend = store.backend()?;
+        let target = PullTarget { store, ns, backend };
 
         let group_filter = group_filter.unwrap_or_default();
 
@@ -141,6 +142,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    backend: &DatastoreBackend,
 ) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -162,13 +164,14 @@ async fn pull_index_chunks<I: IndexFile>(
     );
 
     let target2 = target.clone();
+    let backend = backend.clone();
     let verify_pool = ParallelHandler::new(
         "sync chunk writer",
         4,
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
-            match target2.backend()? {
+            match &backend {
                 DatastoreBackend::Filesystem => {
                     target2.insert_chunk(&chunk, &digest)?;
                 }
@@ -283,6 +286,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    backend: &DatastoreBackend,
 ) -> Result<SyncStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
@@ -317,6 +321,7 @@ async fn pull_single_archive<'a>(
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    backend,
                 )
                 .await?;
                 sync_stats.add(stats);
@@ -339,6 +344,7 @@ async fn pull_single_archive<'a>(
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    backend,
                 )
                 .await?;
                 sync_stats.add(stats);
@@ -495,15 +501,21 @@ async fn pull_snapshot<'a>(
             }
         }
 
-        let stats =
-            pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+        let stats = pull_single_archive(
+            reader.clone(),
+            snapshot,
+            item,
+            downloaded_chunks.clone(),
+            &params.target.backend,
+        )
+        .await?;
         sync_stats.add(stats);
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
         bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
     }
-    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+    if let DatastoreBackend::S3(s3_client) = &params.target.backend {
         let object_key = pbs_datastore::s3::object_key_from_path(
             &snapshot.relative_path(),
             MANIFEST_BLOB_NAME.as_ref(),
@@ -520,7 +532,7 @@ async fn pull_snapshot<'a>(
     if !client_log_name.exists() {
         reader.try_download_client_log(&client_log_name).await?;
         if client_log_name.exists() {
-            if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+            if let DatastoreBackend::S3(s3_client) = &params.target.backend {
                 let object_key = pbs_datastore::s3::object_key_from_path(
                     &snapshot.relative_path(),
                     CLIENT_LOG_BLOB_NAME.as_ref(),
-- 
2.47.3





More information about the pbs-devel mailing list