[pbs-devel] [PATCH proxmox-backup 01/17] sync: pull: instantiate backend only once per sync job

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon Nov 3 15:51:26 CET 2025


Reviewed-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>

On November 3, 2025 12:31 pm, Christian Ebner wrote:
> Currently the target datastores' backend is instatziated for each
> chunk to be inserted, which on s3 backed datastores leads to the
> s3-client being re-instantiated and a new connection being
> established.
> 
> Optimize this by only creating the backend once and sharing it for
> all the chunk inserts to be performed.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
>  src/server/pull.rs | 30 +++++++++++++++++++++---------
>  1 file changed, 21 insertions(+), 9 deletions(-)
> 
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 817b57ac5..de8b140bc 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -38,6 +38,8 @@ use crate::tools::parallel_handler::ParallelHandler;
>  pub(crate) struct PullTarget {
>      store: Arc<DataStore>,
>      ns: BackupNamespace,
> +    // Contains the active S3Client in case of S3 backend
> +    backend: DatastoreBackend,
>  }
>  
>  /// Parameters for a pull operation.
> @@ -114,10 +116,9 @@ impl PullParameters {
>                  ns: remote_ns,
>              })
>          };
> -        let target = PullTarget {
> -            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> -            ns,
> -        };
> +        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> +        let backend = store.backend()?;
> +        let target = PullTarget { store, ns, backend };
>  
>          let group_filter = group_filter.unwrap_or_default();
>  
> @@ -141,6 +142,7 @@ async fn pull_index_chunks<I: IndexFile>(
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    backend: &DatastoreBackend,
>  ) -> Result<SyncStats, Error> {
>      use futures::stream::{self, StreamExt, TryStreamExt};
>  
> @@ -162,13 +164,14 @@ async fn pull_index_chunks<I: IndexFile>(
>      );
>  
>      let target2 = target.clone();
> +    let backend = backend.clone();
>      let verify_pool = ParallelHandler::new(
>          "sync chunk writer",
>          4,
>          move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>              // println!("verify and write {}", hex::encode(&digest));
>              chunk.verify_unencrypted(size as usize, &digest)?;
> -            match target2.backend()? {
> +            match &backend {
>                  DatastoreBackend::Filesystem => {
>                      target2.insert_chunk(&chunk, &digest)?;
>                  }
> @@ -283,6 +286,7 @@ async fn pull_single_archive<'a>(
>      snapshot: &'a pbs_datastore::BackupDir,
>      archive_info: &'a FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    backend: &DatastoreBackend,
>  ) -> Result<SyncStats, Error> {
>      let archive_name = &archive_info.filename;
>      let mut path = snapshot.full_path();
> @@ -317,6 +321,7 @@ async fn pull_single_archive<'a>(
>                      snapshot.datastore().clone(),
>                      index,
>                      downloaded_chunks,
> +                    backend,
>                  )
>                  .await?;
>                  sync_stats.add(stats);
> @@ -339,6 +344,7 @@ async fn pull_single_archive<'a>(
>                      snapshot.datastore().clone(),
>                      index,
>                      downloaded_chunks,
> +                    backend,
>                  )
>                  .await?;
>                  sync_stats.add(stats);
> @@ -495,15 +501,21 @@ async fn pull_snapshot<'a>(
>              }
>          }
>  
> -        let stats =
> -            pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
> +        let stats = pull_single_archive(
> +            reader.clone(),
> +            snapshot,
> +            item,
> +            downloaded_chunks.clone(),
> +            &params.target.backend,

nit: this is used 3 times here, and could be pulled into a binding as
well..

> +        )
> +        .await?;
>          sync_stats.add(stats);
>      }
>  
>      if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>          bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>      }
> -    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> +    if let DatastoreBackend::S3(s3_client) = &params.target.backend {
>          let object_key = pbs_datastore::s3::object_key_from_path(
>              &snapshot.relative_path(),
>              MANIFEST_BLOB_NAME.as_ref(),
> @@ -520,7 +532,7 @@ async fn pull_snapshot<'a>(
>      if !client_log_name.exists() {
>          reader.try_download_client_log(&client_log_name).await?;
>          if client_log_name.exists() {
> -            if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> +            if let DatastoreBackend::S3(s3_client) = &params.target.backend {
>                  let object_key = pbs_datastore::s3::object_key_from_path(
>                      &snapshot.relative_path(),
>                      CLIENT_LOG_BLOB_NAME.as_ref(),
> -- 
> 2.47.3
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list