[pbs-devel] [RFC v2 proxmox-backup 40/42] api: backup: use local datastore cache on S3 backend chunk upload

Christian Ebner c.ebner at proxmox.com
Thu May 29 16:32:05 CEST 2025


Take advantage of the local datastore cache to avoid re-uploading of
already known chunks. This not only helps improve the backup/upload
speeds, but also avoids additionally costs by reducing the number of
requests and transferred payload data to the S3 object store api.

If the cache is present, lookup if it contains the chunk, skipping
upload altogether if it is. Otherwise, upload the chunk into memory,
upload it to the S3 object store api and insert it into the local
datastore cache.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/api2/backup/upload_chunk.rs | 46 ++++++++++++++++++++++++++++++---
 src/server/pull.rs              |  4 +++
 2 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 838eec1fa..7a80fd0eb 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -247,10 +247,48 @@ async fn upload_to_backend(
             UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await
         }
         DatastoreBackend::S3(s3_client) => {
-            let is_duplicate = match s3_client.put_object(digest.into(), req_body).await? {
-                PutObjectResponse::PreconditionFailed => true,
-                PutObjectResponse::NeedsRetry => bail!("concurrent operation, reupload required"),
-                PutObjectResponse::Success(_content) => false,
+            // Load chunk data into memory, need to write it twice, to S3 object store and
+            // local cache store. Further, body needs to be consumed also if chunks insert
+            // can be skipped since cached.
+            let data = req_body
+                .map_err(Error::from)
+                .try_fold(Vec::new(), |mut acc, chunk| {
+                    acc.extend_from_slice(&chunk);
+                    future::ok::<_, Error>(acc)
+                })
+                .await?;
+
+            if encoded_size != data.len() as u32 {
+                bail!(
+                    "got blob with unexpected length ({encoded_size} != {})",
+                    data.len()
+                );
+            }
+
+            if env.datastore.cache_contains(&digest) {
+                return Ok((digest, size, encoded_size, true));
+            }
+
+            let datastore = env.datastore.clone();
+            let upload_body = hyper::Body::from(data.clone());
+            let upload = s3_client.put_object(digest.into(), upload_body);
+            let cache_insert = tokio::task::spawn_blocking(move || {
+                let chunk = DataBlob::from_raw(data)?;
+                datastore.cache_insert(&digest, &chunk)
+            });
+            let is_duplicate = match futures::join!(upload, cache_insert) {
+                (Ok(upload_response), Ok(Ok(()))) => match upload_response {
+                    PutObjectResponse::PreconditionFailed => true,
+                    PutObjectResponse::NeedsRetry => {
+                        bail!("concurrent operation, reupload required")
+                    }
+                    PutObjectResponse::Success(_content) => false,
+                },
+                (Ok(_), Ok(Err(err))) => return Err(err.context("chunk cache insert failed")),
+                (Ok(_), Err(err)) => {
+                    return Err(Error::from(err).context("chunk cache insert task failed"))
+                }
+                (Err(err), _) => return Err(err.context("chunk upload failed")),
             };
             Ok((digest, size, encoded_size, is_duplicate))
         }
diff --git a/src/server/pull.rs b/src/server/pull.rs
index f36efd7c8..85d3154eb 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -173,6 +173,10 @@ async fn pull_index_chunks<I: IndexFile>(
                     target2.insert_chunk(&chunk, &digest)?;
                 }
                 DatastoreBackend::S3(s3_client) => {
+                    if target2.cache_contains(&digest) {
+                        return Ok(());
+                    }
+                    target2.cache_insert(&digest, &chunk)?;
                     let data = chunk.raw_data().to_vec();
                     let upload_body = hyper::Body::from(data);
                     proxmox_async::runtime::block_on(
-- 
2.39.5





More information about the pbs-devel mailing list