[pbs-devel] [PATCH proxmox-backup v8 13/45] sync: pull: conditionally upload content to s3 backend
Lukas Wagner
l.wagner at proxmox.com
Fri Jul 18 10:35:23 CEST 2025
On 2025-07-15 14:53, Christian Ebner wrote:
> If the datastore is backed by an S3 object store, not only insert the
> pulled contents to the local cache store, but also upload it to the
> S3 backend.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> src/server/pull.rs | 66 +++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 63 insertions(+), 3 deletions(-)
>
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index b1724c142..fe87359ab 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> -use anyhow::{bail, format_err, Error};
> +use anyhow::{bail, format_err, Context, Error};
> use proxmox_human_byte::HumanByte;
> +use tokio::io::AsyncReadExt;
> use tracing::info;
>
> use pbs_api_types::{
> @@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
> use pbs_datastore::index::IndexFile;
> use pbs_datastore::manifest::{BackupManifest, FileInfo};
> use pbs_datastore::read_chunk::AsyncReadChunk;
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
> use pbs_tools::sha::sha256;
>
> use super::sync::{
> @@ -167,7 +168,20 @@ async fn pull_index_chunks<I: IndexFile>(
> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
> // println!("verify and write {}", hex::encode(&digest));
> chunk.verify_unencrypted(size as usize, &digest)?;
> - target2.insert_chunk(&chunk, &digest)?;
> + match target2.backend()? {
> + DatastoreBackend::Filesystem => {
> + target2.insert_chunk(&chunk, &digest)?;
> + }
> + DatastoreBackend::S3(s3_client) => {
> + let data = chunk.raw_data().to_vec();
> + let upload_data = hyper::body::Bytes::from(data);
> + let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
> + let _is_duplicate = proxmox_async::runtime::block_on(
> + s3_client.upload_with_retry(object_key, upload_data, false),
> + )
> + .context("failed to upload chunk to s3 backend")?;
> + }
> + }
> Ok(())
> },
> );
> @@ -331,6 +345,18 @@ async fn pull_single_archive<'a>(
> if let Err(err) = std::fs::rename(&tmp_path, &path) {
> bail!("Atomic rename file {:?} failed - {}", path, err);
> }
> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> + let object_key =
> + pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
> + .context("invalid archive object key")?;
> +
> + let archive = tokio::fs::File::open(&path).await?;
> + let mut reader = tokio::io::BufReader::new(archive);
> + let mut contents = Vec::new();
> + reader.read_to_end(&mut contents).await?;
You can use tokio::fs::read here
> + let data = hyper::body::Bytes::from(contents);
> + let _is_duplicate = s3_client.upload_with_retry(object_key, data, true).await?;
I might do a review of the already merged s3 client code later, but I really don't like the
`replace: bool ` parameter for this function very much. I think I'd prefer having
to separate functions for replace vs. not replace (which might delegate to a common
fn internally, there a bool param is fine IMO), or alternatively, use an enum
instead. I think personally I'm gravitating more towards the separate function.
What do you think?
> + }
> Ok(sync_stats)
> }
>
> @@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
> }
> }
>
> + let manifest_data = tmp_manifest_blob.raw_data().to_vec();
> let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>
> if ignore_not_verified_or_encrypted(
> @@ -467,9 +494,42 @@ async fn pull_snapshot<'a>(
> if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
> bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
> }
> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> + let object_key = pbs_datastore::s3::object_key_from_path(
> + &snapshot.relative_path(),
> + MANIFEST_BLOB_NAME.as_ref(),
> + )
> + .context("invalid manifest object key")?;
> +
> + let data = hyper::body::Bytes::from(manifest_data);
> + let _is_duplicate = s3_client
> + .upload_with_retry(object_key, data, true)
> + .await
> + .context("failed to upload manifest to s3 backend")?;
> + }
>
> if !client_log_name.exists() {
> reader.try_download_client_log(&client_log_name).await?;
> + if client_log_name.exists() {
> + if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
> + let object_key = pbs_datastore::s3::object_key_from_path(
> + &snapshot.relative_path(),
> + CLIENT_LOG_BLOB_NAME.as_ref(),
> + )
> + .context("invalid archive object key")?;
> +
> + let log_file = tokio::fs::File::open(&client_log_name).await?;
> + let mut reader = tokio::io::BufReader::new(log_file);
> + let mut contents = Vec::new();
> + reader.read_to_end(&mut contents).await?;
You can use tokio::fs::read(...) here
> +
> + let data = hyper::body::Bytes::from(contents);
> + let _is_duplicate = s3_client
> + .upload_with_retry(object_key, data, true)
> + .await
> + .context("failed to upload client log to s3 backend")?;
> + }
> + }
> };
> snapshot
> .cleanup_unreferenced_files(&manifest)
--
- Lukas
More information about the pbs-devel
mailing list