[pbs-devel] [RFC proxmox-backup 35/39] api: backup: use local datastore cache on S3 backend chunk upload
Christian Ebner
c.ebner at proxmox.com
Mon May 19 13:46:36 CEST 2025
Take advantage of the local datastore cache to avoid re-uploading of
already known chunks. This not only helps improve the backup/upload
speeds, but also avoids additionally costs by reducing the number of
requests and transferred payload data to the S3 object store api.
If the cache is present, lookup if it contains the chunk, skipping
upload altogether if it is. Otherwise, upload the chunk into memory,
upload it to the S3 object store api and insert it into the local
datastore cache.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/api2/backup/upload_chunk.rs | 47 ++++++++++++++++++++++++++++++---
1 file changed, 43 insertions(+), 4 deletions(-)
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 59f9ca558..1d82936e6 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -248,10 +248,49 @@ async fn upload_to_backend(
UploadChunk::new(req_body, datastore, digest, size, encoded_size).await
}
DatastoreBackend::S3(s3_client) => {
- let is_duplicate = match s3_client.put_object(digest.into(), req_body).await? {
- PutObjectResponse::PreconditionFailed => true,
- PutObjectResponse::NeedsRetry => bail!("concurrent operation, reupload required"),
- PutObjectResponse::Success(_content) => false,
+ if datastore.cache_contains(&digest) {
+ return Ok((digest, size, encoded_size, true));
+ }
+ // TODO: Avoid this altoghether? put_object already loads the whole
+ // chunk into memory and does also hashing and crc32sum calculation
+ // for s3 request.
+ //
+ // Load chunk data into memory, need to write it twice,
+ // to S3 object store and local cache store.
+ let data = req_body
+ .map_err(Error::from)
+ .try_fold(Vec::new(), |mut acc, chunk| {
+ acc.extend_from_slice(&chunk);
+ future::ok::<_, Error>(acc)
+ })
+ .await?;
+
+ if encoded_size != data.len() as u32 {
+ bail!(
+ "got blob with unexpected length ({encoded_size} != {})",
+ data.len()
+ );
+ }
+
+ let upload_body = hyper::Body::from(data.clone());
+ let upload = s3_client.put_object(digest.into(), upload_body);
+ let cache_insert = tokio::task::spawn_blocking(move || {
+ let chunk = DataBlob::from_raw(data)?;
+ datastore.cache_insert(&digest, &chunk)
+ });
+ let is_duplicate = match futures::join!(upload, cache_insert) {
+ (Ok(upload_response), Ok(Ok(()))) => match upload_response {
+ PutObjectResponse::PreconditionFailed => true,
+ PutObjectResponse::NeedsRetry => {
+ bail!("concurrent operation, reupload required")
+ }
+ PutObjectResponse::Success(_content) => false,
+ },
+ (Ok(_), Ok(Err(err))) => return Err(err.context("chunk cache insert failed")),
+ (Ok(_), Err(err)) => {
+ return Err(Error::from(err).context("chunk cache insert task failed"))
+ }
+ (Err(err), _) => return Err(err.context("chunk upload failed")),
};
Ok((digest, size, encoded_size, is_duplicate))
}
--
2.39.5
More information about the pbs-devel
mailing list