[pbs-devel] [PATCH proxmox-backup v6 12/37] sync: pull: conditionally upload content to s3 backend
Christian Ebner
c.ebner at proxmox.com
Tue Jul 8 19:00:49 CEST 2025
If the datastore is backed by an S3 object store, not only insert the
pulled contents to the local cache store, but also upload it to the
S3 backend.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/server/pull.rs | 66 +++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 63 insertions(+), 3 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c142..fe87359ab 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -6,8 +6,9 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
use proxmox_human_byte::HumanByte;
+use tokio::io::AsyncReadExt;
use tracing::info;
use pbs_api_types::{
@@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
@@ -167,7 +168,20 @@ async fn pull_index_chunks<I: IndexFile>(
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
// println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
- target2.insert_chunk(&chunk, &digest)?;
+ match target2.backend()? {
+ DatastoreBackend::Filesystem => {
+ target2.insert_chunk(&chunk, &digest)?;
+ }
+ DatastoreBackend::S3(s3_client) => {
+ let data = chunk.raw_data().to_vec();
+ let upload_data = hyper::body::Bytes::from(data);
+ let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
+ let _is_duplicate = proxmox_async::runtime::block_on(
+ s3_client.upload_with_retry(object_key, upload_data, false),
+ )
+ .context("failed to upload chunk to s3 backend")?;
+ }
+ }
Ok(())
},
);
@@ -331,6 +345,18 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key =
+ pbs_datastore::s3::object_key_from_path(&snapshot.relative_path(), archive_name)
+ .context("invalid archive object key")?;
+
+ let archive = tokio::fs::File::open(&path).await?;
+ let mut reader = tokio::io::BufReader::new(archive);
+ let mut contents = Vec::new();
+ reader.read_to_end(&mut contents).await?;
+ let data = hyper::body::Bytes::from(contents);
+ let _is_duplicate = s3_client.upload_with_retry(object_key, data, true).await?;
+ }
Ok(sync_stats)
}
@@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
}
}
+ let manifest_data = tmp_manifest_blob.raw_data().to_vec();
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
if ignore_not_verified_or_encrypted(
@@ -467,9 +494,42 @@ async fn pull_snapshot<'a>(
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key = pbs_datastore::s3::object_key_from_path(
+ &snapshot.relative_path(),
+ MANIFEST_BLOB_NAME.as_ref(),
+ )
+ .context("invalid manifest object key")?;
+
+ let data = hyper::body::Bytes::from(manifest_data);
+ let _is_duplicate = s3_client
+ .upload_with_retry(object_key, data, true)
+ .await
+ .context("failed to upload manifest to s3 backend")?;
+ }
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ if client_log_name.exists() {
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_key = pbs_datastore::s3::object_key_from_path(
+ &snapshot.relative_path(),
+ CLIENT_LOG_BLOB_NAME.as_ref(),
+ )
+ .context("invalid archive object key")?;
+
+ let log_file = tokio::fs::File::open(&client_log_name).await?;
+ let mut reader = tokio::io::BufReader::new(log_file);
+ let mut contents = Vec::new();
+ reader.read_to_end(&mut contents).await?;
+
+ let data = hyper::body::Bytes::from(contents);
+ let _is_duplicate = s3_client
+ .upload_with_retry(object_key, data, true)
+ .await
+ .context("failed to upload client log to s3 backend")?;
+ }
+ }
};
snapshot
.cleanup_unreferenced_files(&manifest)
--
2.47.2
More information about the pbs-devel
mailing list