[pbs-devel] [PATCH proxmox-backup v8 13/45] sync: pull: conditionally upload content to s3 backend

Christian Ebner c.ebner at proxmox.com
Fri Jul 18 11:43:26 CEST 2025


On 7/18/25 10:35 AM, Lukas Wagner wrote:
> 
> 
> On  2025-07-15 14:53, Christian Ebner wrote:
>> If the datastore is backed by an S3 object store, not only insert the
>> pulled contents to the local cache store, but also upload it to the
>> S3 backend.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - no changes
>>
>>   src/server/pull.rs | 66 +++++++++++++++++++++++++++++++++++++++++++---
>>   1 file changed, 63 insertions(+), 3 deletions(-)
>>
>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>> index b1724c142..fe87359ab 100644
>> --- a/src/server/pull.rs
>> +++ b/src/server/pull.rs
>> @@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
>>   use std::sync::{Arc, Mutex};
>>   use std::time::SystemTime;
>>   
>> -use anyhow::{bail, format_err, Error};
>> +use anyhow::{bail, format_err, Context, Error};
>>   use proxmox_human_byte::HumanByte;
>> +use tokio::io::AsyncReadExt;
>>   use tracing::info;
>>   
>>   use pbs_api_types::{
>> @@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
>>   use pbs_datastore::index::IndexFile;
>>   use pbs_datastore::manifest::{BackupManifest, FileInfo};
>>   use pbs_datastore::read_chunk::AsyncReadChunk;
>> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>> +use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
>>   use pbs_tools::sha::sha256;
>>   
>>   use super::sync::{
>> @@ -167,7 +168,20 @@ async fn pull_index_chunks<I: IndexFile>(
>>           move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>>               // println!("verify and write {}", hex::encode(&digest));
>>               chunk.verify_unencrypted(size as usize, &digest)?;
>> -            target2.insert_chunk(&chunk, &digest)?;
>> +            match target2.backend()? {
>> +                DatastoreBackend::Filesystem => {
>> +                    target2.insert_chunk(&chunk, &digest)?;
>> +                }
>> +                DatastoreBackend::S3(s3_client) => {
>> +                    let data = chunk.raw_data().to_vec();
>> +                    let upload_data = hyper::body::Bytes::from(data);
>> +                    let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
>> +                    let _is_duplicate = proxmox_async::runtime::block_on(
>> +                        s3_client.upload_with_retry(object_key, upload_data, false),
>> +                    )
>> +                    .context("failed to upload chunk to s3 backend")?;
>> +                }
>> +            }
>>               Ok(())
>>           },
>>       );
>> @@ -331,6 +345,18 @@ async fn pull_single_archive<'a>(
>>       if let Err(err) = std::fs::rename(&tmp_path, &path) {
>>           bail!("Atomic rename file {:?} failed - {}", path, err);
>>       }
>> +    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> +        let object_key =
>> +            pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
>> +                .context("invalid archive object key")?;
>> +
>> +        let archive = tokio::fs::File::open(&path).await?;
>> +        let mut reader = tokio::io::BufReader::new(archive);
>> +        let mut contents = Vec::new();
>> +        reader.read_to_end(&mut contents).await?;
> 
> You can use tokio::fs::read here

Same as for the sync code, makes reading the whole file contents much 
more concise, thanks!

> 
>> +        let data = hyper::body::Bytes::from(contents);
>> +        let _is_duplicate = s3_client.upload_with_retry(object_key, data, true).await?;
> 
> I might do a review of the already merged s3 client code later, but I really don't like the
> `replace: bool ` parameter for this function very much. I think I'd prefer having
> to separate functions for replace vs. not replace (which might delegate to a common
> fn internally, there a bool param is fine IMO), or alternatively, use an enum
> instead. I think personally I'm gravitating more towards the separate function.
> 
> What do you think?

Yeah, splitting this up into pub fn `upload_with_retry` and 
`upload_replace_with_retry` and keeping the common code in an internal 
helper method on the client could indeed be more ergonomic.

Will opt for that in this case.

> 
>> +    }
>>       Ok(sync_stats)
>>   }
>>   
>> @@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
>>           }
>>       }
>>   
>> +    let manifest_data = tmp_manifest_blob.raw_data().to_vec();
>>       let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>>   
>>       if ignore_not_verified_or_encrypted(
>> @@ -467,9 +494,42 @@ async fn pull_snapshot<'a>(
>>       if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>>           bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>>       }
>> +    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> +        let object_key = pbs_datastore::s3::object_key_from_path(
>> +            &snapshot.relative_path(),
>> +            MANIFEST_BLOB_NAME.as_ref(),
>> +        )
>> +        .context("invalid manifest object key")?;
>> +
>> +        let data = hyper::body::Bytes::from(manifest_data);
>> +        let _is_duplicate = s3_client
>> +            .upload_with_retry(object_key, data, true)
>> +            .await
>> +            .context("failed to upload manifest to s3 backend")?;
>> +    }
>>   
>>       if !client_log_name.exists() {
>>           reader.try_download_client_log(&client_log_name).await?;
>> +        if client_log_name.exists() {
>> +            if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> +                let object_key = pbs_datastore::s3::object_key_from_path(
>> +                    &snapshot.relative_path(),
>> +                    CLIENT_LOG_BLOB_NAME.as_ref(),
>> +                )
>> +                .context("invalid archive object key")?;
>> +
>> +                let log_file = tokio::fs::File::open(&client_log_name).await?;
>> +                let mut reader = tokio::io::BufReader::new(log_file);
>> +                let mut contents = Vec::new();
>> +                reader.read_to_end(&mut contents).await?;
> 
> You can use tokio::fs::read(...) here

same as above :)

> 
>> +
>> +                let data = hyper::body::Bytes::from(contents);
>> +                let _is_duplicate = s3_client
>> +                    .upload_with_retry(object_key, data, true)
>> +                    .await
>> +                    .context("failed to upload client log to s3 backend")?;
>> +            }
>> +        }
>>       };
>>       snapshot
>>           .cleanup_unreferenced_files(&manifest)
> 





More information about the pbs-devel mailing list