[pbs-devel] [PATCH proxmox-backup v7 08/38] api: backup: conditionally upload chunks to s3 object store backend
Christian Ebner
c.ebner at proxmox.com
Thu Jul 10 19:06:58 CEST 2025
Upload fixed and dynamic sized chunks to either the filesystem or
the S3 object store, depending on the configured backend.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 6:
- no changes
src/api2/backup/upload_chunk.rs | 71 +++++++++++++++++++--------------
1 file changed, 42 insertions(+), 29 deletions(-)
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 2c66c2855..3ad8c3c75 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -16,7 +16,7 @@ use proxmox_sortable_macro::sortable;
use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
-use pbs_datastore::{DataBlob, DataStore};
+use pbs_datastore::{DataBlob, DataStore, DatastoreBackend};
use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
@@ -154,22 +154,10 @@ fn upload_fixed_chunk(
) -> ApiResponseFuture {
async move {
let wid = required_integer_param(¶m, "wid")? as usize;
- let size = required_integer_param(¶m, "size")? as u32;
- let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
-
- let digest_str = required_string_param(¶m, "digest")?;
- let digest = <[u8; 32]>::from_hex(digest_str)?;
-
let env: &BackupEnvironment = rpcenv.as_ref();
- let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
- BodyDataStream::new(req_body),
- env.datastore.clone(),
- digest,
- size,
- encoded_size,
- )
- .await?;
+ let (digest, size, compressed_size, is_duplicate) =
+ upload_to_backend(req_body, param, env).await?;
env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = hex::encode(digest);
@@ -229,22 +217,10 @@ fn upload_dynamic_chunk(
) -> ApiResponseFuture {
async move {
let wid = required_integer_param(¶m, "wid")? as usize;
- let size = required_integer_param(¶m, "size")? as u32;
- let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
-
- let digest_str = required_string_param(¶m, "digest")?;
- let digest = <[u8; 32]>::from_hex(digest_str)?;
-
let env: &BackupEnvironment = rpcenv.as_ref();
- let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
- BodyDataStream::new(req_body),
- env.datastore.clone(),
- digest,
- size,
- encoded_size,
- )
- .await?;
+ let (digest, size, compressed_size, is_duplicate) =
+ upload_to_backend(req_body, param, env).await?;
env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
let digest_str = hex::encode(digest);
@@ -256,6 +232,43 @@ fn upload_dynamic_chunk(
.boxed()
}
+async fn upload_to_backend(
+ req_body: Incoming,
+ param: Value,
+ env: &BackupEnvironment,
+) -> Result<([u8; 32], u32, u32, bool), Error> {
+ let size = required_integer_param(¶m, "size")? as u32;
+ let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
+ let digest_str = required_string_param(¶m, "digest")?;
+ let digest = <[u8; 32]>::from_hex(digest_str)?;
+
+ match &env.backend {
+ DatastoreBackend::Filesystem => {
+ UploadChunk::new(
+ BodyDataStream::new(req_body),
+ env.datastore.clone(),
+ digest,
+ size,
+ encoded_size,
+ )
+ .await
+ }
+ DatastoreBackend::S3(s3_client) => {
+ let data = req_body.collect().await?.to_bytes();
+ if encoded_size != data.len() as u32 {
+ bail!(
+ "got blob with unexpected length ({encoded_size} != {})",
+ data.len()
+ );
+ }
+
+ let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
+ let is_duplicate = s3_client.upload_with_retry(object_key, data, false).await?;
+ Ok((digest, size, encoded_size, is_duplicate))
+ }
+ }
+}
+
pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
&ApiHandler::AsyncHttp(&upload_speedtest),
&ObjectSchema::new("Test upload speed.", &[]),
--
2.47.2
More information about the pbs-devel
mailing list