[pbs-devel] [PATCH proxmox-backup v9 15/46] datastore: local chunk reader: read chunks based on backend

Hannes Laimer h.laimer at proxmox.com
Mon Jul 21 15:12:59 CEST 2025


On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
> Get and store the datastore's backend on local chunk reader
> instantiantion and fetch chunks based on the variant from either the
> filesystem or the s3 object store.
>
> By storing the backend variant, the s3 client is instantiated only
> once and reused until the local chunk reader instance is dropped.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 8:
> - use Arc::clone() over .clone()
>
>  pbs-datastore/Cargo.toml                |  1 +
>  pbs-datastore/src/local_chunk_reader.rs | 38 +++++++++++++++++++++----
>  2 files changed, 33 insertions(+), 6 deletions(-)
>
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index 7e56dbd31..8ce930a94 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -13,6 +13,7 @@ crc32fast.workspace = true
>  endian_trait.workspace = true
>  futures.workspace = true
>  hex = { workspace = true, features = [ "serde" ] }
> +http-body-util.workspace = true
>  hyper.workspace = true
>  libc.workspace = true
>  log.workspace = true
> diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
> index 05a70c068..667c97206 100644
> --- a/pbs-datastore/src/local_chunk_reader.rs
> +++ b/pbs-datastore/src/local_chunk_reader.rs
> @@ -3,17 +3,21 @@ use std::pin::Pin;
>  use std::sync::Arc;
>  
>  use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
>  
>  use pbs_api_types::CryptMode;
>  use pbs_tools::crypt_config::CryptConfig;
> +use proxmox_s3_client::S3Client;
>  
>  use crate::data_blob::DataBlob;
> +use crate::datastore::DatastoreBackend;
>  use crate::read_chunk::{AsyncReadChunk, ReadChunk};
>  use crate::DataStore;
>  
>  #[derive(Clone)]
>  pub struct LocalChunkReader {
>      store: Arc<DataStore>,
> +    backend: DatastoreBackend,
>      crypt_config: Option<Arc<CryptConfig>>,
>      crypt_mode: CryptMode,
>  }
> @@ -24,8 +28,11 @@ impl LocalChunkReader {
>          crypt_config: Option<Arc<CryptConfig>>,
>          crypt_mode: CryptMode,
>      ) -> Self {
> +        // TODO: Error handling!
> +        let backend = store.backend().unwrap();

Was this missed, or was this intentionally left in?
I feel like we don't want to panic here :P
(correct me if I'm wrong, but I think we would whenever anything goes
wrong when connecting to s3?)

>          Self {
>              store,
> +            backend,
>              crypt_config,
>              crypt_mode,
>          }
> @@ -47,10 +54,26 @@ impl LocalChunkReader {
>      }
>  }
>  
> +async fn fetch(s3_client: Arc<S3Client>, digest: &[u8; 32]) -> Result<DataBlob, Error> {
> +    let object_key = crate::s3::object_key_from_digest(digest)?;
> +    if let Some(response) = s3_client.get_object(object_key).await? {
> +        let bytes = response.content.collect().await?.to_bytes();
> +        DataBlob::from_raw(bytes.to_vec())
> +    } else {
> +        bail!("no object with digest {}", hex::encode(digest));
> +    }
> +}
> +
>  impl ReadChunk for LocalChunkReader {
>      fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
> -        let chunk = self.store.load_chunk(digest)?;
> +        let chunk = match &self.backend {
> +            DatastoreBackend::Filesystem => self.store.load_chunk(digest)?,
> +            DatastoreBackend::S3(s3_client) => {
> +                proxmox_async::runtime::block_on(fetch(Arc::clone(s3_client), digest))?
> +            }
> +        };
>          self.ensure_crypt_mode(chunk.crypt_mode()?)?;
> +
>          Ok(chunk)
>      }
>  
> @@ -69,11 +92,14 @@ impl AsyncReadChunk for LocalChunkReader {
>          digest: &'a [u8; 32],
>      ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
>          Box::pin(async move {
> -            let (path, _) = self.store.chunk_path(digest);
> -
> -            let raw_data = tokio::fs::read(&path).await?;
> -
> -            let chunk = DataBlob::load_from_reader(&mut &raw_data[..])?;
> +            let chunk = match &self.backend {
> +                DatastoreBackend::Filesystem => {
> +                    let (path, _) = self.store.chunk_path(digest);
> +                    let raw_data = tokio::fs::read(&path).await?;
> +                    DataBlob::load_from_reader(&mut &raw_data[..])?
> +                }
> +                DatastoreBackend::S3(s3_client) => fetch(Arc::clone(s3_client), digest).await?,
> +            };
>              self.ensure_crypt_mode(chunk.crypt_mode()?)?;
>  
>              Ok(chunk)





More information about the pbs-devel mailing list