[pbs-devel] [PATCH proxmox-backup v8 13/45] sync: pull: conditionally upload content to s3 backend

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 10:35:23 CEST 2025



On  2025-07-15 14:53, Christian Ebner wrote:
> If the datastore is backed by an S3 object store, not only insert the
> pulled contents to the local cache store, but also upload it to the
> S3 backend.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
>  src/server/pull.rs | 66 +++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 63 insertions(+), 3 deletions(-)
> 
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index b1724c142..fe87359ab 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
> -use anyhow::{bail, format_err, Error};
> +use anyhow::{bail, format_err, Context, Error};
>  use proxmox_human_byte::HumanByte;
> +use tokio::io::AsyncReadExt;
>  use tracing::info;
>  
>  use pbs_api_types::{
> @@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
>  use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{BackupManifest, FileInfo};
>  use pbs_datastore::read_chunk::AsyncReadChunk;
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
>  use pbs_tools::sha::sha256;
>  
>  use super::sync::{
> @@ -167,7 +168,20 @@ async fn pull_index_chunks<I: IndexFile>(
>          move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>              // println!("verify and write {}", hex::encode(&digest));
>              chunk.verify_unencrypted(size as usize, &digest)?;
> -            target2.insert_chunk(&chunk, &digest)?;
> +            match target2.backend()? {
> +                DatastoreBackend::Filesystem => {
> +                    target2.insert_chunk(&chunk, &digest)?;
> +                }
> +                DatastoreBackend::S3(s3_client) => {
> +                    let data = chunk.raw_data().to_vec();
> +                    let upload_data = hyper::body::Bytes::from(data);
> +                    let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
> +                    let _is_duplicate = proxmox_async::runtime::block_on(
> +                        s3_client.upload_with_retry(object_key, upload_data, false),
> +                    )
> +                    .context("failed to upload chunk to s3 backend")?;
> +                }
> +            }
>              Ok(())
>          },
>      );
> @@ -331,6 +345,18 @@ async fn pull_single_archive<'a>(
>      if let Err(err) = std::fs::rename(&tmp_path, &path) {
>          bail!("Atomic rename file {:?} failed - {}", path, err);
>      }
> +    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> +        let object_key =
> +            pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
> +                .context("invalid archive object key")?;
> +
> +        let archive = tokio::fs::File::open(&path).await?;
> +        let mut reader = tokio::io::BufReader::new(archive);
> +        let mut contents = Vec::new();
> +        reader.read_to_end(&mut contents).await?;

You can use tokio::fs::read here

> +        let data = hyper::body::Bytes::from(contents);
> +        let _is_duplicate = s3_client.upload_with_retry(object_key, data, true).await?;

I might do a review of the already merged s3 client code later, but I really don't like the
`replace: bool ` parameter for this function very much. I think I'd prefer having
to separate functions for replace vs. not replace (which might delegate to a common
fn internally, there a bool param is fine IMO), or alternatively, use an enum
instead. I think personally I'm gravitating more towards the separate function.

What do you think?

> +    }
>      Ok(sync_stats)
>  }
>  
> @@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
>          }
>      }
>  
> +    let manifest_data = tmp_manifest_blob.raw_data().to_vec();
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>  
>      if ignore_not_verified_or_encrypted(
> @@ -467,9 +494,42 @@ async fn pull_snapshot<'a>(
>      if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>          bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>      }
> +    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> +        let object_key = pbs_datastore::s3::object_key_from_path(
> +            &snapshot.relative_path(),
> +            MANIFEST_BLOB_NAME.as_ref(),
> +        )
> +        .context("invalid manifest object key")?;
> +
> +        let data = hyper::body::Bytes::from(manifest_data);
> +        let _is_duplicate = s3_client
> +            .upload_with_retry(object_key, data, true)
> +            .await
> +            .context("failed to upload manifest to s3 backend")?;
> +    }
>  
>      if !client_log_name.exists() {
>          reader.try_download_client_log(&client_log_name).await?;
> +        if client_log_name.exists() {
> +            if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> +                let object_key = pbs_datastore::s3::object_key_from_path(
> +                    &snapshot.relative_path(),
> +                    CLIENT_LOG_BLOB_NAME.as_ref(),
> +                )
> +                .context("invalid archive object key")?;
> +
> +                let log_file = tokio::fs::File::open(&client_log_name).await?;
> +                let mut reader = tokio::io::BufReader::new(log_file);
> +                let mut contents = Vec::new();
> +                reader.read_to_end(&mut contents).await?;

You can use tokio::fs::read(...) here

> +
> +                let data = hyper::body::Bytes::from(contents);
> +                let _is_duplicate = s3_client
> +                    .upload_with_retry(object_key, data, true)
> +                    .await
> +                    .context("failed to upload client log to s3 backend")?;
> +            }
> +        }
>      };
>      snapshot
>          .cleanup_unreferenced_files(&manifest)

-- 
- Lukas





More information about the pbs-devel mailing list