[pbs-devel] [PATCH proxmox-backup v8 08/45] api: backup: conditionally upload chunks to s3 object store backend

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 10:11:29 CEST 2025


Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>


On  2025-07-15 14:52, Christian Ebner wrote:
> Upload fixed and dynamic sized chunks to either the filesystem or
> the S3 object store, depending on the configured backend.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
>  src/api2/backup/upload_chunk.rs | 71 +++++++++++++++++++--------------
>  1 file changed, 42 insertions(+), 29 deletions(-)
> 
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 2c66c2855..3ad8c3c75 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -16,7 +16,7 @@ use proxmox_sortable_macro::sortable;
>  
>  use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
>  use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
> -use pbs_datastore::{DataBlob, DataStore};
> +use pbs_datastore::{DataBlob, DataStore, DatastoreBackend};
>  use pbs_tools::json::{required_integer_param, required_string_param};
>  
>  use super::environment::*;
> @@ -154,22 +154,10 @@ fn upload_fixed_chunk(
>  ) -> ApiResponseFuture {
>      async move {
>          let wid = required_integer_param(&param, "wid")? as usize;
> -        let size = required_integer_param(&param, "size")? as u32;
> -        let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
> -
> -        let digest_str = required_string_param(&param, "digest")?;
> -        let digest = <[u8; 32]>::from_hex(digest_str)?;
> -
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> -            BodyDataStream::new(req_body),
> -            env.datastore.clone(),
> -            digest,
> -            size,
> -            encoded_size,
> -        )
> -        .await?;
> +        let (digest, size, compressed_size, is_duplicate) =
> +            upload_to_backend(req_body, param, env).await?;
>  
>          env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -229,22 +217,10 @@ fn upload_dynamic_chunk(
>  ) -> ApiResponseFuture {
>      async move {
>          let wid = required_integer_param(&param, "wid")? as usize;
> -        let size = required_integer_param(&param, "size")? as u32;
> -        let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
> -
> -        let digest_str = required_string_param(&param, "digest")?;
> -        let digest = <[u8; 32]>::from_hex(digest_str)?;
> -
>          let env: &BackupEnvironment = rpcenv.as_ref();
>  
> -        let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> -            BodyDataStream::new(req_body),
> -            env.datastore.clone(),
> -            digest,
> -            size,
> -            encoded_size,
> -        )
> -        .await?;
> +        let (digest, size, compressed_size, is_duplicate) =
> +            upload_to_backend(req_body, param, env).await?;
>  
>          env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
>          let digest_str = hex::encode(digest);
> @@ -256,6 +232,43 @@ fn upload_dynamic_chunk(
>      .boxed()
>  }
>  
> +async fn upload_to_backend(
> +    req_body: Incoming,
> +    param: Value,
> +    env: &BackupEnvironment,
> +) -> Result<([u8; 32], u32, u32, bool), Error> {
> +    let size = required_integer_param(&param, "size")? as u32;
> +    let encoded_size = required_integer_param(&param, "encoded-size")? as u32;
> +    let digest_str = required_string_param(&param, "digest")?;
> +    let digest = <[u8; 32]>::from_hex(digest_str)?;
> +
> +    match &env.backend {
> +        DatastoreBackend::Filesystem => {
> +            UploadChunk::new(
> +                BodyDataStream::new(req_body),
> +                env.datastore.clone(),
> +                digest,
> +                size,
> +                encoded_size,
> +            )
> +            .await
> +        }
> +        DatastoreBackend::S3(s3_client) => {
> +            let data = req_body.collect().await?.to_bytes();
> +            if encoded_size != data.len() as u32 {
> +                bail!(
> +                    "got blob with unexpected length ({encoded_size} != {})",
> +                    data.len()
> +                );
> +            }
> +
> +            let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
> +            let is_duplicate = s3_client.upload_with_retry(object_key, data, false).await?;
> +            Ok((digest, size, encoded_size, is_duplicate))
> +        }
> +    }
> +}
> +
>  pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
>      &ApiHandler::AsyncHttp(&upload_speedtest),
>      &ObjectSchema::new("Test upload speed.", &[]),

-- 
- Lukas





More information about the pbs-devel mailing list