[pbs-devel] [PATCH proxmox-backup v9 15/46] datastore: local chunk reader: read chunks based on backend
Hannes Laimer
h.laimer at proxmox.com
Mon Jul 21 15:12:59 CEST 2025
On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
> Get and store the datastore's backend on local chunk reader
> instantiantion and fetch chunks based on the variant from either the
> filesystem or the s3 object store.
>
> By storing the backend variant, the s3 client is instantiated only
> once and reused until the local chunk reader instance is dropped.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 8:
> - use Arc::clone() over .clone()
>
> pbs-datastore/Cargo.toml | 1 +
> pbs-datastore/src/local_chunk_reader.rs | 38 +++++++++++++++++++++----
> 2 files changed, 33 insertions(+), 6 deletions(-)
>
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index 7e56dbd31..8ce930a94 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -13,6 +13,7 @@ crc32fast.workspace = true
> endian_trait.workspace = true
> futures.workspace = true
> hex = { workspace = true, features = [ "serde" ] }
> +http-body-util.workspace = true
> hyper.workspace = true
> libc.workspace = true
> log.workspace = true
> diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
> index 05a70c068..667c97206 100644
> --- a/pbs-datastore/src/local_chunk_reader.rs
> +++ b/pbs-datastore/src/local_chunk_reader.rs
> @@ -3,17 +3,21 @@ use std::pin::Pin;
> use std::sync::Arc;
>
> use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
>
> use pbs_api_types::CryptMode;
> use pbs_tools::crypt_config::CryptConfig;
> +use proxmox_s3_client::S3Client;
>
> use crate::data_blob::DataBlob;
> +use crate::datastore::DatastoreBackend;
> use crate::read_chunk::{AsyncReadChunk, ReadChunk};
> use crate::DataStore;
>
> #[derive(Clone)]
> pub struct LocalChunkReader {
> store: Arc<DataStore>,
> + backend: DatastoreBackend,
> crypt_config: Option<Arc<CryptConfig>>,
> crypt_mode: CryptMode,
> }
> @@ -24,8 +28,11 @@ impl LocalChunkReader {
> crypt_config: Option<Arc<CryptConfig>>,
> crypt_mode: CryptMode,
> ) -> Self {
> + // TODO: Error handling!
> + let backend = store.backend().unwrap();
Was this missed, or was this intentionally left in?
I feel like we don't want to panic here :P
(correct me if I'm wrong, but I think we would whenever anything goes
wrong when connecting to s3?)
> Self {
> store,
> + backend,
> crypt_config,
> crypt_mode,
> }
> @@ -47,10 +54,26 @@ impl LocalChunkReader {
> }
> }
>
> +async fn fetch(s3_client: Arc<S3Client>, digest: &[u8; 32]) -> Result<DataBlob, Error> {
> + let object_key = crate::s3::object_key_from_digest(digest)?;
> + if let Some(response) = s3_client.get_object(object_key).await? {
> + let bytes = response.content.collect().await?.to_bytes();
> + DataBlob::from_raw(bytes.to_vec())
> + } else {
> + bail!("no object with digest {}", hex::encode(digest));
> + }
> +}
> +
> impl ReadChunk for LocalChunkReader {
> fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
> - let chunk = self.store.load_chunk(digest)?;
> + let chunk = match &self.backend {
> + DatastoreBackend::Filesystem => self.store.load_chunk(digest)?,
> + DatastoreBackend::S3(s3_client) => {
> + proxmox_async::runtime::block_on(fetch(Arc::clone(s3_client), digest))?
> + }
> + };
> self.ensure_crypt_mode(chunk.crypt_mode()?)?;
> +
> Ok(chunk)
> }
>
> @@ -69,11 +92,14 @@ impl AsyncReadChunk for LocalChunkReader {
> digest: &'a [u8; 32],
> ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
> Box::pin(async move {
> - let (path, _) = self.store.chunk_path(digest);
> -
> - let raw_data = tokio::fs::read(&path).await?;
> -
> - let chunk = DataBlob::load_from_reader(&mut &raw_data[..])?;
> + let chunk = match &self.backend {
> + DatastoreBackend::Filesystem => {
> + let (path, _) = self.store.chunk_path(digest);
> + let raw_data = tokio::fs::read(&path).await?;
> + DataBlob::load_from_reader(&mut &raw_data[..])?
> + }
> + DatastoreBackend::S3(s3_client) => fetch(Arc::clone(s3_client), digest).await?,
> + };
> self.ensure_crypt_mode(chunk.crypt_mode()?)?;
>
> Ok(chunk)
More information about the pbs-devel
mailing list