[pbs-devel] [PATCH proxmox-backup v8 14/45] api: reader: fetch chunks based on datastore backend

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 12:03:41 CEST 2025


On  2025-07-18 11:58, Christian Ebner wrote:
> On 7/18/25 10:38 AM, Lukas Wagner wrote:
>> One comment inline, but nothing prohibitive of a R-b:
>>
>> Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>
>>
>>
>> On  2025-07-15 14:53, Christian Ebner wrote:
>>> Read the chunk based on the datastores backend, reading from local
>>> filesystem or fetching from S3 object store.
>>>
>>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>>> ---
>>> changes since version 7:
>>> - no changes
>>>
>>>   src/api2/reader/environment.rs | 12 ++++++----
>>>   src/api2/reader/mod.rs         | 41 +++++++++++++++++++++++-----------
>>>   2 files changed, 36 insertions(+), 17 deletions(-)
>>>
>>> diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
>>> index 3b2f06f43..8924352b0 100644
>>> --- a/src/api2/reader/environment.rs
>>> +++ b/src/api2/reader/environment.rs
>>> @@ -1,13 +1,14 @@
>>>   use std::collections::HashSet;
>>>   use std::sync::{Arc, RwLock};
>>>   +use anyhow::Error;
>>>   use serde_json::{json, Value};
>>>     use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
>>>     use pbs_api_types::Authid;
>>>   use pbs_datastore::backup_info::BackupDir;
>>> -use pbs_datastore::DataStore;
>>> +use pbs_datastore::{DataStore, DatastoreBackend};
>>>   use proxmox_rest_server::formatter::*;
>>>   use proxmox_rest_server::WorkerTask;
>>>   use tracing::info;
>>> @@ -23,6 +24,7 @@ pub struct ReaderEnvironment {
>>>       pub worker: Arc<WorkerTask>,
>>>       pub datastore: Arc<DataStore>,
>>>       pub backup_dir: BackupDir,
>>> +    pub backend: DatastoreBackend,
>>>       allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
>>>   }
>>>   @@ -33,8 +35,9 @@ impl ReaderEnvironment {
>>>           worker: Arc<WorkerTask>,
>>>           datastore: Arc<DataStore>,
>>>           backup_dir: BackupDir,
>>> -    ) -> Self {
>>> -        Self {
>>> +    ) -> Result<Self, Error> {
>>> +        let backend = datastore.backend()?;
>>> +        Ok(Self {
>>>               result_attributes: json!({}),
>>>               env_type,
>>>               auth_id,
>>> @@ -43,8 +46,9 @@ impl ReaderEnvironment {
>>>               debug: tracing::enabled!(tracing::Level::DEBUG),
>>>               formatter: JSON_FORMATTER,
>>>               backup_dir,
>>> +            backend,
>>>               allowed_chunks: Arc::new(RwLock::new(HashSet::new())),
>>> -        }
>>> +        })
>>>       }
>>>         pub fn log<S: AsRef<str>>(&self, msg: S) {
>>> diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
>>> index a77216043..997d9ca77 100644
>>> --- a/src/api2/reader/mod.rs
>>> +++ b/src/api2/reader/mod.rs
>>> @@ -3,6 +3,7 @@
>>>   use anyhow::{bail, format_err, Context, Error};
>>>   use futures::*;
>>>   use hex::FromHex;
>>> +use http_body_util::BodyExt;
>>>   use hyper::body::Incoming;
>>>   use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
>>>   use hyper::http::request::Parts;
>>> @@ -27,8 +28,9 @@ use pbs_api_types::{
>>>   };
>>>   use pbs_config::CachedUserInfo;
>>>   use pbs_datastore::index::IndexFile;
>>> -use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
>>> +use pbs_datastore::{DataStore, DatastoreBackend, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
>>>   use pbs_tools::json::required_string_param;
>>> +use proxmox_s3_client::S3Client;
>>>     use crate::api2::backup::optional_ns_param;
>>>   use crate::api2::helpers;
>>> @@ -162,7 +164,7 @@ fn upgrade_to_backup_reader_protocol(
>>>                       worker.clone(),
>>>                       datastore,
>>>                       backup_dir,
>>> -                );
>>> +                )?;
>>>                     env.debug = debug;
>>>   @@ -323,17 +325,10 @@ fn download_chunk(
>>>               ));
>>>           }
>>>   -        let (path, _) = env.datastore.chunk_path(&digest);
>>> -        let path2 = path.clone();
>>> -
>>> -        env.debug(format!("download chunk {:?}", path));
>>> -
>>> -        let data =
>>> -            proxmox_async::runtime::block_in_place(|| std::fs::read(path)).map_err(move |err| {
>>> -                http_err!(BAD_REQUEST, "reading file {:?} failed: {}", path2, err)
>>> -            })?;
>>> -
>>> -        let body = Body::from(data);
>>> +        let body = match &env.backend {
>>> +            DatastoreBackend::Filesystem => load_from_filesystem(env, &digest)?,
>>> +            DatastoreBackend::S3(s3_client) => fetch_from_object_store(s3_client, &digest).await?,
>>> +        };
>>>             // fixme: set other headers ?
>>>           Ok(Response::builder()
>>> @@ -345,6 +340,26 @@ fn download_chunk(
>>>       .boxed()
>>>   }
>>>   +async fn fetch_from_object_store(s3_client: &S3Client, digest: &[u8; 32]) -> Result<Body, Error> {
>>> +    let object_key = pbs_datastore::s3::object_key_from_digest(digest)?;
>>> +    if let Some(response) = s3_client.get_object(object_key).await? {
>>
>> ^ Do we maybe want some kind of retry-logic for retrieving objects as well? Disregard
>> in case you implement it in a later patch, I'm reviewing this series patch by patch.
> 
> While a retry might be of interest in case of inter-mitten issues, for the time being I would like to refrain from doing so for the reasons stated in my reply to proxmox-backup patch 0004. If the need for this truly arises, adding this later on should be rather simple. If you already see this as an issue now, I can of course add the retry logic right away.

No, I'm fine with revisiting this later, e.g. after a potential rollout where we have some initial user feedback. It's still
experimental after all :)

> 
>>
>>> +        let data = response.content.collect().await?.to_bytes();
>>> +        return Ok(Body::from(data));
>>> +    }
>>> +    bail!("cannot find chunk with digest {}", hex::encode(digest));
>>> +}
>>> +
>>> +fn load_from_filesystem(env: &ReaderEnvironment, digest: &[u8; 32]) -> Result<Body, Error> {
>>> +    let (path, _) = env.datastore.chunk_path(digest);
>>> +    let path2 = path.clone();
>>> +
>>> +    env.debug(format!("download chunk {path:?}"));
>>> +
>>> +    let data = proxmox_async::runtime::block_in_place(|| std::fs::read(path))
>>> +        .map_err(move |err| http_err!(BAD_REQUEST, "reading file {path2:?} failed: {err}"))?;
>>> +    Ok(Body::from(data))
>>> +}
>>> +
>>>   /* this is too slow
>>>   fn download_chunk_old(
>>>       _parts: Parts,
>>
> 





More information about the pbs-devel mailing list