[pbs-devel] [RFC v2 proxmox-backup 23/42] sync: pull: conditionally upload content to S3 backend
Christian Ebner
c.ebner at proxmox.com
Thu May 29 16:31:48 CEST 2025
If the datastore is backed by an S3 object store, not only insert the
pulled contents to the local cache store, but also upload it to the
S3 backend.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
src/server/pull.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 56 insertions(+), 2 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c142..f36efd7c8 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -8,6 +8,7 @@ use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use proxmox_human_byte::HumanByte;
+use tokio::io::AsyncReadExt;
use tracing::info;
use pbs_api_types::{
@@ -24,7 +25,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
@@ -167,7 +168,18 @@ async fn pull_index_chunks<I: IndexFile>(
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
// println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
- target2.insert_chunk(&chunk, &digest)?;
+ match target2.backend()? {
+ DatastoreBackend::Filesystem => {
+ target2.insert_chunk(&chunk, &digest)?;
+ }
+ DatastoreBackend::S3(s3_client) => {
+ let data = chunk.raw_data().to_vec();
+ let upload_body = hyper::Body::from(data);
+ proxmox_async::runtime::block_on(
+ s3_client.put_object(digest.into(), upload_body),
+ )?;
+ }
+ }
Ok(())
},
);
@@ -331,6 +343,20 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let archive_path = snapshot.relative_path().join(archive_name);
+ let object_key = archive_path
+ .as_os_str()
+ .to_str()
+ .ok_or_else(|| format_err!("invalid archive path"))?;
+
+ let archive = tokio::fs::File::open(&path).await?;
+ let mut reader = tokio::io::BufReader::new(archive);
+ let mut contents = Vec::new();
+ reader.read_to_end(&mut contents).await?;
+ let data = hyper::body::Body::from(contents);
+ s3_client.put_object(object_key.into(), data).await?;
+ }
Ok(sync_stats)
}
@@ -401,6 +427,7 @@ async fn pull_snapshot<'a>(
}
}
+ let manifest_data = tmp_manifest_blob.raw_data().to_vec();
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
if ignore_not_verified_or_encrypted(
@@ -467,9 +494,36 @@ async fn pull_snapshot<'a>(
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
}
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_path = snapshot.relative_path().join(MANIFEST_BLOB_NAME.as_ref());
+ let object_key = object_path
+ .as_os_str()
+ .to_str()
+ .ok_or_else(|| format_err!("invalid archive path"))?;
+
+ let data = hyper::body::Body::from(manifest_data);
+ s3_client.put_object(object_key.into(), data).await?;
+ }
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ if client_log_name.exists() {
+ if let DatastoreBackend::S3(s3_client) = snapshot.datastore().backend()? {
+ let object_path = snapshot.relative_path().join(CLIENT_LOG_BLOB_NAME.as_ref());
+ let object_key = object_path
+ .as_os_str()
+ .to_str()
+ .ok_or_else(|| format_err!("invalid archive path"))?;
+
+ let log_file = tokio::fs::File::open(&client_log_name).await?;
+ let mut reader = tokio::io::BufReader::new(log_file);
+ let mut contents = Vec::new();
+ reader.read_to_end(&mut contents).await?;
+
+ let data = hyper::body::Body::from(contents);
+ s3_client.put_object(object_key.into(), data).await?;
+ }
+ }
};
snapshot
.cleanup_unreferenced_files(&manifest)
--
2.39.5
More information about the pbs-devel
mailing list