[pbs-devel] [PATCH proxmox-backup v8 13/45] sync: pull: conditionally upload content to s3 backend
Christian Ebner
c.ebner at proxmox.com
Fri Jul 18 11:43:26 CEST 2025
On 7/18/25 10:35 AM, Lukas Wagner wrote:
>
>
> On 2025-07-15 14:53, Christian Ebner wrote:
>> If the datastore is backed by an S3 object store, not only insert the
>> pulled contents to the local cache store, but also upload it to the
>> S3 backend.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - no changes
>>
>> src/server/pull.rs | 66 +++++++++++++++++++++++++++++++++++++++++++---
>> 1 file changed, 63 insertions(+), 3 deletions(-)
>>
>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>> index b1724c142..fe87359ab 100644
>> --- a/src/server/pull.rs
>> +++ b/src/server/pull.rs
>> @@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
>> use std::sync::{Arc, Mutex};
>> use std::time::SystemTime;
>>
>> -use anyhow::{bail, format_err, Error};
>> +use anyhow::{bail, format_err, Context, Error};
>> use proxmox_human_byte::HumanByte;
>> +use tokio::io::AsyncReadExt;
>> use tracing::info;
>>
>> use pbs_api_types::{
>> @@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
>> use pbs_datastore::index::IndexFile;
>> use pbs_datastore::manifest::{BackupManifest, FileInfo};
>> use pbs_datastore::read_chunk::AsyncReadChunk;
>> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>> +use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
>> use pbs_tools::sha::sha256;
>>
>> use super::sync::{
>> @@ -167,7 +168,20 @@ async fn pull_index_chunks<I: IndexFile>(
>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>> // println!("verify and write {}", hex::encode(&digest));
>> chunk.verify_unencrypted(size as usize, &digest)?;
>> - target2.insert_chunk(&chunk, &digest)?;
>> + match target2.backend()? {
>> + DatastoreBackend::Filesystem => {
>> + target2.insert_chunk(&chunk, &digest)?;
>> + }
>> + DatastoreBackend::S3(s3_client) => {
>> + let data = chunk.raw_data().to_vec();
>> + let upload_data = hyper::body::Bytes::from(data);
>> + let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
>> + let _is_duplicate = proxmox_async::runtime::block_on(
>> + s3_client.upload_with_retry(object_key, upload_data, false),
>> + )
>> + .context("failed to upload chunk to s3 backend")?;
>> + }
>> + }
>> Ok(())
>> },
>> );
>> @@ -331,6 +345,18 @@ async fn pull_single_archive<'a>(
>> if let Err(err) = std::fs::rename(&tmp_path, &path) {
>> bail!("Atomic rename file {:?} failed - {}", path, err);
>> }
>> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> + let object_key =
>> + pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
>> + .context("invalid archive object key")?;
>> +
>> + let archive = tokio::fs::File::open(&path).await?;
>> + let mut reader = tokio::io::BufReader::new(archive);
>> + let mut contents = Vec::new();
>> + reader.read_to_end(&mut contents).await?;
>
> You can use tokio::fs::read here
Same as for the sync code, makes reading the whole file contents much
more concise, thanks!
>
>> + let data = hyper::body::Bytes::from(contents);
>> + let _is_duplicate = s3_client.upload_with_retry(object_key, data, true).await?;
>
> I might do a review of the already merged s3 client code later, but I really don't like the
> `replace: bool ` parameter for this function very much. I think I'd prefer having
> to separate functions for replace vs. not replace (which might delegate to a common
> fn internally, there a bool param is fine IMO), or alternatively, use an enum
> instead. I think personally I'm gravitating more towards the separate function.
>
> What do you think?
Yeah, splitting this up into pub fn `upload_with_retry` and
`upload_replace_with_retry` and keeping the common code in an internal
helper method on the client could indeed be more ergonomic.
Will opt for that in this case.
>
>> + }
>> Ok(sync_stats)
>> }
>>
>> @@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
>> }
>> }
>>
>> + let manifest_data = tmp_manifest_blob.raw_data().to_vec();
>> let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>>
>> if ignore_not_verified_or_encrypted(
>> @@ -467,9 +494,42 @@ async fn pull_snapshot<'a>(
>> if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>> bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>> }
>> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> + let object_key = pbs_datastore::s3::object_key_from_path(
>> + &snapshot.relative_path(),
>> + MANIFEST_BLOB_NAME.as_ref(),
>> + )
>> + .context("invalid manifest object key")?;
>> +
>> + let data = hyper::body::Bytes::from(manifest_data);
>> + let _is_duplicate = s3_client
>> + .upload_with_retry(object_key, data, true)
>> + .await
>> + .context("failed to upload manifest to s3 backend")?;
>> + }
>>
>> if !client_log_name.exists() {
>> reader.try_download_client_log(&client_log_name).await?;
>> + if client_log_name.exists() {
>> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
>> + let object_key = pbs_datastore::s3::object_key_from_path(
>> + &snapshot.relative_path(),
>> + CLIENT_LOG_BLOB_NAME.as_ref(),
>> + )
>> + .context("invalid archive object key")?;
>> +
>> + let log_file = tokio::fs::File::open(&client_log_name).await?;
>> + let mut reader = tokio::io::BufReader::new(log_file);
>> + let mut contents = Vec::new();
>> + reader.read_to_end(&mut contents).await?;
>
> You can use tokio::fs::read(...) here
same as above :)
>
>> +
>> + let data = hyper::body::Bytes::from(contents);
>> + let _is_duplicate = s3_client
>> + .upload_with_retry(object_key, data, true)
>> + .await
>> + .context("failed to upload client log to s3 backend")?;
>> + }
>> + }
>> };
>> snapshot
>> .cleanup_unreferenced_files(&manifest)
>
More information about the pbs-devel
mailing list