[pbs-devel] [RFC proxmox-backup 21/39] datastore: local chunk reader: read chunks based on backend

Christian Ebner c.ebner at proxmox.com
Mon May 19 13:46:22 CEST 2025


Get and store the datastore's backend on local chunk reader
instantiantion and fetch chunks based on the variant from either the
filesystem or the s3 object store.

By storing the backend variant, the s3 client is instantiated only
once and reused until the local chunk reader instance is dropped.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-datastore/Cargo.toml                |  2 ++
 pbs-datastore/src/local_chunk_reader.rs | 37 +++++++++++++++++++++----
 2 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index 3ee06c9bb..323f5e270 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -13,6 +13,7 @@ crc32fast.workspace = true
 endian_trait.workspace = true
 futures.workspace = true
 hex = { workspace = true, features = [ "serde" ] }
+hyper.workspace = true
 libc.workspace = true
 log.workspace = true
 nix.workspace = true
@@ -28,6 +29,7 @@ zstd-safe.workspace = true
 pathpatterns.workspace = true
 pxar.workspace = true
 
+proxmox-async.workspace = true
 proxmox-borrow.workspace = true
 proxmox-human-byte.workspace = true
 proxmox-io.workspace = true
diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
index 05a70c068..a363059a1 100644
--- a/pbs-datastore/src/local_chunk_reader.rs
+++ b/pbs-datastore/src/local_chunk_reader.rs
@@ -3,17 +3,21 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 use anyhow::{bail, Error};
+use hyper::body::HttpBody;
 
 use pbs_api_types::CryptMode;
+use pbs_s3_client::S3Client;
 use pbs_tools::crypt_config::CryptConfig;
 
 use crate::data_blob::DataBlob;
+use crate::datastore::DatastoreBackend;
 use crate::read_chunk::{AsyncReadChunk, ReadChunk};
 use crate::DataStore;
 
 #[derive(Clone)]
 pub struct LocalChunkReader {
     store: Arc<DataStore>,
+    backend: DatastoreBackend,
     crypt_config: Option<Arc<CryptConfig>>,
     crypt_mode: CryptMode,
 }
@@ -24,8 +28,11 @@ impl LocalChunkReader {
         crypt_config: Option<Arc<CryptConfig>>,
         crypt_mode: CryptMode,
     ) -> Self {
+        // TODO: Error handling!
+        let backend = store.backend().unwrap();
         Self {
             store,
+            backend,
             crypt_config,
             crypt_mode,
         }
@@ -47,10 +54,25 @@ impl LocalChunkReader {
     }
 }
 
+async fn fetch(s3_client: Arc<S3Client>, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+    if let Some(response) = s3_client.get_object(digest.into()).await? {
+        let bytes = response.content.collect().await?.to_bytes();
+        DataBlob::from_raw(bytes.to_vec())
+    } else {
+        bail!("no object with digest {}", hex::encode(digest));
+    }
+}
+
 impl ReadChunk for LocalChunkReader {
     fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
-        let chunk = self.store.load_chunk(digest)?;
+        let chunk = match &self.backend {
+            DatastoreBackend::Filesystem => self.store.load_chunk(digest)?,
+            DatastoreBackend::S3(s3_client) => {
+                proxmox_async::runtime::block_on(fetch(s3_client.clone(), digest))?
+            }
+        };
         self.ensure_crypt_mode(chunk.crypt_mode()?)?;
+
         Ok(chunk)
     }
 
@@ -69,11 +91,14 @@ impl AsyncReadChunk for LocalChunkReader {
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
         Box::pin(async move {
-            let (path, _) = self.store.chunk_path(digest);
-
-            let raw_data = tokio::fs::read(&path).await?;
-
-            let chunk = DataBlob::load_from_reader(&mut &raw_data[..])?;
+            let chunk = match &self.backend {
+                DatastoreBackend::Filesystem => {
+                    let (path, _) = self.store.chunk_path(digest);
+                    let raw_data = tokio::fs::read(&path).await?;
+                    DataBlob::load_from_reader(&mut &raw_data[..])?
+                }
+                DatastoreBackend::S3(s3_client) => fetch(s3_client.clone(), digest).await?,
+            };
             self.ensure_crypt_mode(chunk.crypt_mode()?)?;
 
             Ok(chunk)
-- 
2.39.5





More information about the pbs-devel mailing list