[pbs-devel] [PATCH proxmox-backup v4 39/45] api: backup: use local datastore cache on s3 backend chunk upload
Christian Ebner
c.ebner at proxmox.com
Mon Jun 23 11:41:00 CEST 2025
Take advantage of the local datastore cache to avoid re-uploading of
already known chunks. This not only helps improve the backup/upload
speeds, but also avoids additionally costs by reducing the number of
requests and transferred payload data to the S3 object store api.
If the cache is present, lookup if it contains the chunk, skipping
upload altogether if it is. Otherwise, upload the chunk into memory,
upload it to the S3 object store api and insert it into the local
datastore cache.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/api2/backup/upload_chunk.rs | 37 ++++++++++++++++++++++++++++-----
src/server/pull.rs | 4 ++++
2 files changed, 36 insertions(+), 5 deletions(-)
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 760a7736c..5a5ed386a 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -257,11 +257,38 @@ async fn upload_to_backend(
}
DatastoreBackend::S3(s3_client) => {
let data = req_body.collect().await?.to_bytes();
- let upload_body = Body::from(data);
- let is_duplicate = match s3_client.put_object(digest.into(), upload_body).await? {
- PutObjectResponse::PreconditionFailed => true,
- PutObjectResponse::NeedsRetry => bail!("concurrent operation, reupload required"),
- PutObjectResponse::Success(_content) => false,
+ let upload_body = Body::from(data.clone());
+
+ if encoded_size != data.len() as u32 {
+ bail!(
+ "got blob with unexpected length ({encoded_size} != {})",
+ data.len()
+ );
+ }
+
+ if env.datastore.cache_contains(&digest) {
+ return Ok((digest, size, encoded_size, true));
+ }
+
+ let datastore = env.datastore.clone();
+ let upload = s3_client.put_object(digest.into(), upload_body);
+ let cache_insert = tokio::task::spawn_blocking(move || {
+ let chunk = DataBlob::from_raw(data.to_vec())?;
+ datastore.cache_insert(&digest, &chunk)
+ });
+ let is_duplicate = match futures::join!(upload, cache_insert) {
+ (Ok(upload_response), Ok(Ok(()))) => match upload_response {
+ PutObjectResponse::PreconditionFailed => true,
+ PutObjectResponse::NeedsRetry => {
+ bail!("concurrent operation, reupload required")
+ }
+ PutObjectResponse::Success(_content) => false,
+ },
+ (Ok(_), Ok(Err(err))) => return Err(err.context("chunk cache insert failed")),
+ (Ok(_), Err(err)) => {
+ return Err(Error::from(err).context("chunk cache insert task failed"))
+ }
+ (Err(err), _) => return Err(err.context("chunk upload failed")),
};
Ok((digest, size, encoded_size, is_duplicate))
}
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0996d9889..990389ca1 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -173,6 +173,10 @@ async fn pull_index_chunks<I: IndexFile>(
target2.insert_chunk(&chunk, &digest)?;
}
DatastoreBackend::S3(s3_client) => {
+ if target2.cache_contains(&digest) {
+ return Ok(());
+ }
+ target2.cache_insert(&digest, &chunk)?;
let data = chunk.raw_data().to_vec();
let upload_body = proxmox_http::Body::from(data);
proxmox_async::runtime::block_on(
--
2.47.2
More information about the pbs-devel
mailing list