[pbs-devel] [PATCH proxmox-backup v9 13/46] sync: pull: conditionally upload content to s3 backend
Christian Ebner
c.ebner at proxmox.com
Sat Jul 19 14:50:02 CEST 2025
If the datastore is backed by an S3 object store, not only insert the
pulled contents to the local cache store, but also upload it to the
S3 backend.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 8:
- use refactored upload_replace_with_retry and
upload_no_replace_with_retry
- read file contents via tokio::fs::read
src/server/pull.rs | 64 +++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 61 insertions(+), 3 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c142..775ed0c59 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
use proxmox_human_byte::HumanByte;
use tracing::info;
@@ -24,7 +24,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
@@ -167,7 +167,20 @@ async fn pull_index_chunks<I: IndexFile>(
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
// println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
- target2.insert_chunk(&chunk, &digest)?;
+ match target2.backend()? {
+ DatastoreBackend::Filesystem => {
+ target2.insert_chunk(&chunk, &digest)?;
+ }
+ DatastoreBackend::S3(s3_client) => {
+ let data = chunk.raw_data().to_vec();
+ let upload_data = hyper::body::Bytes::from(data);
+ let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
+ let _is_duplicate = proxmox_async::runtime::block_on(
+ s3_client.upload_no_replace_with_retry(object_key, upload_data),
+ )
+ .context("failed to upload chunk to s3 backend")?;
+ }
+ }
Ok(())
},
);
@@ -331,6 +344,19 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key =
+ pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
+ .context("invalid archive object key")?;
+
+ let data = tokio::fs::read(&path)
+ .await
+ .context("failed to read archive contents")?;
+ let contents = hyper::body::Bytes::from(data);
+ let _is_duplicate = s3_client
+ .upload_replace_with_retry(object_key, contents)
+ .await?;
+ }
Ok(sync_stats)
}
@@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
}
}
+ let manifest_data = tmp_manifest_blob.raw_data().to_vec();
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
if ignore_not_verified_or_encrypted(
@@ -467,9 +494,40 @@ async fn pull_snapshot<'a>(
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key = pbs_datastore::s3::object_key_from_path(
+ &snapshot.relative_path(),
+ MANIFEST_BLOB_NAME.as_ref(),
+ )
+ .context("invalid manifest object key")?;
+
+ let data = hyper::body::Bytes::from(manifest_data);
+ let _is_duplicate = s3_client
+ .upload_replace_with_retry(object_key, data)
+ .await
+ .context("failed to upload manifest to s3 backend")?;
+ }
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ if client_log_name.exists() {
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key = pbs_datastore::s3::object_key_from_path(
+ &snapshot.relative_path(),
+ CLIENT_LOG_BLOB_NAME.as_ref(),
+ )
+ .context("invalid archive object key")?;
+
+ let data = tokio::fs::read(&client_log_name)
+ .await
+ .context("failed to read log file contents")?;
+ let contents = hyper::body::Bytes::from(data);
+ let _is_duplicate = s3_client
+ .upload_replace_with_retry(object_key, contents)
+ .await
+ .context("failed to upload client log to s3 backend")?;
+ }
+ }
};
snapshot
.cleanup_unreferenced_files(&manifest)
--
2.47.2
More information about the pbs-devel
mailing list