[pbs-devel] [PATCH proxmox-backup v8 08/45] api: backup: conditionally upload chunks to s3 object store backend
Lukas Wagner
l.wagner at proxmox.com
Fri Jul 18 10:11:29 CEST 2025
Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>
On 2025-07-15 14:52, Christian Ebner wrote:
> Upload fixed and dynamic sized chunks to either the filesystem or
> the S3 object store, depending on the configured backend.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> src/api2/backup/upload_chunk.rs | 71 +++++++++++++++++++--------------
> 1 file changed, 42 insertions(+), 29 deletions(-)
>
> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
> index 2c66c2855..3ad8c3c75 100644
> --- a/src/api2/backup/upload_chunk.rs
> +++ b/src/api2/backup/upload_chunk.rs
> @@ -16,7 +16,7 @@ use proxmox_sortable_macro::sortable;
>
> use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
> use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
> -use pbs_datastore::{DataBlob, DataStore};
> +use pbs_datastore::{DataBlob, DataStore, DatastoreBackend};
> use pbs_tools::json::{required_integer_param, required_string_param};
>
> use super::environment::*;
> @@ -154,22 +154,10 @@ fn upload_fixed_chunk(
> ) -> ApiResponseFuture {
> async move {
> let wid = required_integer_param(¶m, "wid")? as usize;
> - let size = required_integer_param(¶m, "size")? as u32;
> - let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
> -
> - let digest_str = required_string_param(¶m, "digest")?;
> - let digest = <[u8; 32]>::from_hex(digest_str)?;
> -
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> - BodyDataStream::new(req_body),
> - env.datastore.clone(),
> - digest,
> - size,
> - encoded_size,
> - )
> - .await?;
> + let (digest, size, compressed_size, is_duplicate) =
> + upload_to_backend(req_body, param, env).await?;
>
> env.register_fixed_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -229,22 +217,10 @@ fn upload_dynamic_chunk(
> ) -> ApiResponseFuture {
> async move {
> let wid = required_integer_param(¶m, "wid")? as usize;
> - let size = required_integer_param(¶m, "size")? as u32;
> - let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
> -
> - let digest_str = required_string_param(¶m, "digest")?;
> - let digest = <[u8; 32]>::from_hex(digest_str)?;
> -
> let env: &BackupEnvironment = rpcenv.as_ref();
>
> - let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(
> - BodyDataStream::new(req_body),
> - env.datastore.clone(),
> - digest,
> - size,
> - encoded_size,
> - )
> - .await?;
> + let (digest, size, compressed_size, is_duplicate) =
> + upload_to_backend(req_body, param, env).await?;
>
> env.register_dynamic_chunk(wid, digest, size, compressed_size, is_duplicate)?;
> let digest_str = hex::encode(digest);
> @@ -256,6 +232,43 @@ fn upload_dynamic_chunk(
> .boxed()
> }
>
> +async fn upload_to_backend(
> + req_body: Incoming,
> + param: Value,
> + env: &BackupEnvironment,
> +) -> Result<([u8; 32], u32, u32, bool), Error> {
> + let size = required_integer_param(¶m, "size")? as u32;
> + let encoded_size = required_integer_param(¶m, "encoded-size")? as u32;
> + let digest_str = required_string_param(¶m, "digest")?;
> + let digest = <[u8; 32]>::from_hex(digest_str)?;
> +
> + match &env.backend {
> + DatastoreBackend::Filesystem => {
> + UploadChunk::new(
> + BodyDataStream::new(req_body),
> + env.datastore.clone(),
> + digest,
> + size,
> + encoded_size,
> + )
> + .await
> + }
> + DatastoreBackend::S3(s3_client) => {
> + let data = req_body.collect().await?.to_bytes();
> + if encoded_size != data.len() as u32 {
> + bail!(
> + "got blob with unexpected length ({encoded_size} != {})",
> + data.len()
> + );
> + }
> +
> + let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
> + let is_duplicate = s3_client.upload_with_retry(object_key, data, false).await?;
> + Ok((digest, size, encoded_size, is_duplicate))
> + }
> + }
> +}
> +
> pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
> &ApiHandler::AsyncHttp(&upload_speedtest),
> &ObjectSchema::new("Test upload speed.", &[]),
--
- Lukas
More information about the pbs-devel
mailing list