[pbs-devel] [PATCH proxmox-backup v5 21/46] sync: pull: conditionally upload content to s3 backend

Christian Ebner c.ebner at proxmox.com
Thu Jul 3 15:18:12 CEST 2025


If the datastore is backed by an S3 object store, not only insert the
pulled contents to the local cache store, but also upload it to the
S3 backend.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 66 insertions(+), 3 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c142..ec9518a47 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
 use proxmox_human_byte::HumanByte;
+use tokio::io::AsyncReadExt;
 use tracing::info;
 
 use pbs_api_types::{
@@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
 use pbs_tools::sha::sha256;
 
 use super::sync::{
@@ -167,7 +168,19 @@ async fn pull_index_chunks<I: IndexFile>(
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
-            target2.insert_chunk(&chunk, &digest)?;
+            match target2.backend()? {
+                DatastoreBackend::Filesystem => {
+                    target2.insert_chunk(&chunk, &digest)?;
+                }
+                DatastoreBackend::S3(s3_client) => {
+                    let data = chunk.raw_data().to_vec();
+                    let upload_data = hyper::body::Bytes::from(data);
+                    let _is_duplicate = proxmox_async::runtime::block_on(
+                        s3_client.upload_with_retry(digest.into(), upload_data, false),
+                    )
+                    .context("failed to upload chunk to s3 backend")?;
+                }
+            }
             Ok(())
         },
     );
@@ -331,6 +344,22 @@ async fn pull_single_archive<'a>(
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
+    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+        let archive_path = snapshot.relative_path().join(archive_name);
+        let object_key = archive_path
+            .as_os_str()
+            .to_str()
+            .ok_or_else(|| format_err!("invalid archive path"))?;
+
+        let archive = tokio::fs::File::open(&path).await?;
+        let mut reader = tokio::io::BufReader::new(archive);
+        let mut contents = Vec::new();
+        reader.read_to_end(&mut contents).await?;
+        let data = hyper::body::Bytes::from(contents);
+        let _is_duplicate = s3_client
+            .upload_with_retry(object_key.into(), data, true)
+            .await?;
+    }
     Ok(sync_stats)
 }
 
@@ -401,6 +430,7 @@ async fn pull_snapshot<'a>(
         }
     }
 
+    let manifest_data = tmp_manifest_blob.raw_data().to_vec();
     let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
 
     if ignore_not_verified_or_encrypted(
@@ -467,9 +497,42 @@ async fn pull_snapshot<'a>(
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
         bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
     }
+    if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+        let object_path = snapshot.relative_path().join(MANIFEST_BLOB_NAME.as_ref());
+        let object_key = object_path
+            .as_os_str()
+            .to_str()
+            .ok_or_else(|| format_err!("invalid archive path"))?;
+
+        let data = hyper::body::Bytes::from(manifest_data);
+        let _is_duplicate = s3_client
+            .upload_with_retry(object_key.into(), data, true)
+            .await
+            .context("failed to upload manifest to s3 backend")?;
+    }
 
     if !client_log_name.exists() {
         reader.try_download_client_log(&client_log_name).await?;
+        if client_log_name.exists() {
+            if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+                let object_path = snapshot.relative_path().join(CLIENT_LOG_BLOB_NAME.as_ref());
+                let object_key = object_path
+                    .as_os_str()
+                    .to_str()
+                    .ok_or_else(|| format_err!("invalid archive path"))?;
+
+                let log_file = tokio::fs::File::open(&client_log_name).await?;
+                let mut reader = tokio::io::BufReader::new(log_file);
+                let mut contents = Vec::new();
+                reader.read_to_end(&mut contents).await?;
+
+                let data = hyper::body::Bytes::from(contents);
+                let _is_duplicate = s3_client
+                    .upload_with_retry(object_key.into(), data, true)
+                    .await
+                    .context("failed to upload client log to s3 backend")?;
+            }
+        }
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
-- 
2.47.2





More information about the pbs-devel mailing list