[pbs-devel] [PATCH proxmox-backup v4 17/45] api: backup: conditionally upload chunks to s3 object store backend

Christian Ebner c.ebner at proxmox.com
Mon Jun 23 11:40:38 CEST 2025


Upload fixed and dynamic sized chunks to either the filesystem or
the S3 object store, depending on the configured backend.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/api2/backup/upload_chunk.rs | 70 +++++++++++++++++++--------------
 1 file changed, 41 insertions(+), 29 deletions(-)

diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 2c66c2855..760a7736c 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -10,13 +10,15 @@ use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use serde_json::{json, Value};
 
+use proxmox_http::Body;
 use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
 use proxmox_schema::*;
 use proxmox_sortable_macro::sortable;
 
 use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
-use pbs_datastore::{DataBlob, DataStore};
+use pbs_datastore::{DataBlob, DataStore, DatastoreBackend};
+use pbs_s3_client::PutObjectResponse;
 use pbs_tools::json::{required_integer_param, required_string_param};
 
 use super::environment::*;
@@ -154,22 +156,10 @@ fn upload_fixed_chunk(
 ) -> ApiResponseFuture {
     async move {
         let wid = required_integer_param(&param, "wid")? as usize;
-        let size = required_integer_param(&param, "size")? as u32;
-        let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
-
-        let digest_str = required_string_param(&param, "digest")?;
-        let digest = <[u8; 32]>::from_hex(digest_str)?;
-
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
-            BodyDataStream::new(req_body),
-            env.datastore.clone(),
-            digest,
-            size,
-            encoded_size,
-        )
-        .await?;
+        let (digest, size, compressed_size, is_duplicate) =
+            upload_to_backend(req_body, param, env).await?;
 
         env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -229,22 +219,10 @@ fn upload_dynamic_chunk(
 ) -> ApiResponseFuture {
     async move {
         let wid = required_integer_param(&param, "wid")? as usize;
-        let size = required_integer_param(&param, "size")? as u32;
-        let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
-
-        let digest_str = required_string_param(&param, "digest")?;
-        let digest = <[u8; 32]>::from_hex(digest_str)?;
-
         let env: &BackupEnvironment = rpcenv.as_ref();
 
-        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
-            BodyDataStream::new(req_body),
-            env.datastore.clone(),
-            digest,
-            size,
-            encoded_size,
-        )
-        .await?;
+        let (digest, size, compressed_size, is_duplicate) =
+            upload_to_backend(req_body, param, env).await?;
 
         env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
         let digest_str = hex::encode(digest);
@@ -256,6 +234,40 @@ fn upload_dynamic_chunk(
     .boxed()
 }
 
+async fn upload_to_backend(
+    req_body: Incoming,
+    param: Value,
+    env: &BackupEnvironment,
+) -> Result<([u8; 32], u32, u32, bool), Error> {
+    let size = required_integer_param(&param, "size")? as u32;
+    let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
+    let digest_str = required_string_param(&param, "digest")?;
+    let digest = <[u8; 32]>::from_hex(digest_str)?;
+
+    match &env.backend {
+        DatastoreBackend::Filesystem => {
+            UploadChunk::new(
+                BodyDataStream::new(req_body),
+                env.datastore.clone(),
+                digest,
+                size,
+                encoded_size,
+            )
+            .await
+        }
+        DatastoreBackend::S3(s3_client) => {
+            let data = req_body.collect().await?.to_bytes();
+            let upload_body = Body::from(data);
+            let is_duplicate = match s3_client.put_object(digest.into(), upload_body).await? {
+                PutObjectResponse::PreconditionFailed => true,
+                PutObjectResponse::NeedsRetry => bail!("concurrent operation, reupload required"),
+                PutObjectResponse::Success(_content) => false,
+            };
+            Ok((digest, size, encoded_size, is_duplicate))
+        }
+    }
+}
+
 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
     &ApiHandler::AsyncHttp(&upload_speedtest),
     &ObjectSchema::new("Test upload speed.", &[]),
-- 
2.47.2





More information about the pbs-devel mailing list