[pbs-devel] [PATCH proxmox-backup v11 15/46] datastore: local chunk reader: read chunks based on backend
Christian Ebner
c.ebner at proxmox.com
Tue Jul 22 12:10:35 CEST 2025
Get and store the datastore's backend on local chunk reader
instantiantion and fetch chunks based on the variant from either the
filesystem or the s3 object store.
By storing the backend variant, the s3 client is instantiated only
once and reused until the local chunk reader instance is dropped.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>
Reviewed-by: Hannes Laimer <h.laimer at proxmox.com>
---
changes since version 10:
- no changes
pbs-datastore/Cargo.toml | 1 +
pbs-datastore/src/local_chunk_reader.rs | 43 +++++++++++++++++++------
src/api2/admin/datastore.rs | 12 ++++---
src/server/pull.rs | 8 +++--
src/server/push.rs | 8 +++--
src/server/sync.rs | 22 +++++--------
6 files changed, 63 insertions(+), 31 deletions(-)
diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index 7e56dbd31..8ce930a94 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -13,6 +13,7 @@ crc32fast.workspace = true
endian_trait.workspace = true
futures.workspace = true
hex = { workspace = true, features = [ "serde" ] }
+http-body-util.workspace = true
hyper.workspace = true
libc.workspace = true
log.workspace = true
diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
index 05a70c068..58a2fee8d 100644
--- a/pbs-datastore/src/local_chunk_reader.rs
+++ b/pbs-datastore/src/local_chunk_reader.rs
@@ -3,17 +3,21 @@ use std::pin::Pin;
use std::sync::Arc;
use anyhow::{bail, Error};
+use http_body_util::BodyExt;
use pbs_api_types::CryptMode;
use pbs_tools::crypt_config::CryptConfig;
+use proxmox_s3_client::S3Client;
use crate::data_blob::DataBlob;
+use crate::datastore::DatastoreBackend;
use crate::read_chunk::{AsyncReadChunk, ReadChunk};
use crate::DataStore;
#[derive(Clone)]
pub struct LocalChunkReader {
store: Arc<DataStore>,
+ backend: DatastoreBackend,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
}
@@ -23,12 +27,14 @@ impl LocalChunkReader {
store: Arc<DataStore>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
- ) -> Self {
- Self {
+ ) -> Result<Self, Error> {
+ let backend = store.backend()?;
+ Ok(Self {
store,
+ backend,
crypt_config,
crypt_mode,
- }
+ })
}
fn ensure_crypt_mode(&self, chunk_mode: CryptMode) -> Result<(), Error> {
@@ -47,10 +53,26 @@ impl LocalChunkReader {
}
}
+async fn fetch(s3_client: Arc<S3Client>, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+ let object_key = crate::s3::object_key_from_digest(digest)?;
+ if let Some(response) = s3_client.get_object(object_key).await? {
+ let bytes = response.content.collect().await?.to_bytes();
+ DataBlob::from_raw(bytes.to_vec())
+ } else {
+ bail!("no object with digest {}", hex::encode(digest));
+ }
+}
+
impl ReadChunk for LocalChunkReader {
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
- let chunk = self.store.load_chunk(digest)?;
+ let chunk = match &self.backend {
+ DatastoreBackend::Filesystem => self.store.load_chunk(digest)?,
+ DatastoreBackend::S3(s3_client) => {
+ proxmox_async::runtime::block_on(fetch(Arc::clone(s3_client), digest))?
+ }
+ };
self.ensure_crypt_mode(chunk.crypt_mode()?)?;
+
Ok(chunk)
}
@@ -69,11 +91,14 @@ impl AsyncReadChunk for LocalChunkReader {
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
Box::pin(async move {
- let (path, _) = self.store.chunk_path(digest);
-
- let raw_data = tokio::fs::read(&path).await?;
-
- let chunk = DataBlob::load_from_reader(&mut &raw_data[..])?;
+ let chunk = match &self.backend {
+ DatastoreBackend::Filesystem => {
+ let (path, _) = self.store.chunk_path(digest);
+ let raw_data = tokio::fs::read(&path).await?;
+ DataBlob::load_from_reader(&mut &raw_data[..])?
+ }
+ DatastoreBackend::S3(s3_client) => fetch(Arc::clone(s3_client), digest).await?,
+ };
self.ensure_crypt_mode(chunk.crypt_mode()?)?;
Ok(chunk)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index e6b8449d2..d742633cf 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -1520,7 +1520,8 @@ pub fn download_file_decoded(
let (csum, size) = index.compute_csum();
manifest.verify_file(&file_name, &csum, size)?;
- let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+ let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+ .context("creating local chunk reader failed")?;
let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
Body::wrap_stream(AsyncReaderStream::new(reader).map_err(move |err| {
eprintln!("error during streaming of '{:?}' - {}", path, err);
@@ -1535,7 +1536,8 @@ pub fn download_file_decoded(
let (csum, size) = index.compute_csum();
manifest.verify_file(&file_name, &csum, size)?;
- let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+ let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+ .context("creating local chunk reader failed")?;
let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
Body::wrap_stream(
AsyncReaderStream::with_buffer_size(reader, 4 * 1024 * 1024).map_err(
@@ -1739,7 +1741,8 @@ pub async fn catalog(
let (csum, size) = index.compute_csum();
manifest.verify_file(&file_name, &csum, size)?;
- let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+ let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+ .context("creating local chunk reader failed")?;
let reader = BufferedDynamicReader::new(index, chunk_reader);
let mut catalog_reader = CatalogReader::new(reader);
@@ -1808,7 +1811,8 @@ fn get_local_pxar_reader(
let (csum, size) = index.compute_csum();
manifest.verify_file(pxar_name, &csum, size)?;
- let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+ let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+ .context("creating local chunk reader failed")?;
let reader = BufferedDynamicReader::new(index, chunk_reader);
let archive_size = reader.archive_size();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 775ed0c59..a4402b720 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -306,7 +306,9 @@ async fn pull_single_archive<'a>(
info!("skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
- reader.chunk_reader(archive_info.crypt_mode),
+ reader
+ .chunk_reader(archive_info.crypt_mode)
+ .context("failed to get chunk reader")?,
snapshot.datastore().clone(),
index,
downloaded_chunks,
@@ -326,7 +328,9 @@ async fn pull_single_archive<'a>(
info!("skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
- reader.chunk_reader(archive_info.crypt_mode),
+ reader
+ .chunk_reader(archive_info.crypt_mode)
+ .context("failed to get chunk reader")?,
snapshot.datastore().clone(),
index,
downloaded_chunks,
diff --git a/src/server/push.rs b/src/server/push.rs
index e71012ed8..528eed9ff 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -890,7 +890,9 @@ pub(crate) async fn push_snapshot(
.await;
}
let index = DynamicIndexReader::open(&path)?;
- let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+ let chunk_reader = reader
+ .chunk_reader(entry.chunk_crypt_mode())
+ .context("failed to get chunk reader")?;
let sync_stats = push_index(
&archive_name,
index,
@@ -914,7 +916,9 @@ pub(crate) async fn push_snapshot(
.await;
}
let index = FixedIndexReader::open(&path)?;
- let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+ let chunk_reader = reader
+ .chunk_reader(entry.chunk_crypt_mode())
+ .context("failed to get chunk reader")?;
let size = index.index_bytes();
let sync_stats = push_index(
&archive_name,
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 09814ef0c..9238a8626 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -87,7 +87,7 @@ impl SyncStats {
/// and checking whether chunk sync should be skipped.
pub(crate) trait SyncSourceReader: Send + Sync {
/// Returns a chunk reader with the specified encryption mode.
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error>;
/// Asynchronously loads a file from the source into a local file.
/// `filename` is the name of the file to load from the source.
@@ -113,13 +113,10 @@ pub(crate) struct LocalSourceReader {
#[async_trait::async_trait]
impl SyncSourceReader for RemoteSourceReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(RemoteChunkReader::new(
- self.backup_reader.clone(),
- None,
- crypt_mode,
- HashMap::new(),
- ))
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+ let chunk_reader =
+ RemoteChunkReader::new(self.backup_reader.clone(), None, crypt_mode, HashMap::new());
+ Ok(Arc::new(chunk_reader))
}
async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
@@ -190,12 +187,9 @@ impl SyncSourceReader for RemoteSourceReader {
#[async_trait::async_trait]
impl SyncSourceReader for LocalSourceReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(LocalChunkReader::new(
- self.datastore.clone(),
- None,
- crypt_mode,
- ))
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+ let chunk_reader = LocalChunkReader::new(self.datastore.clone(), None, crypt_mode)?;
+ Ok(Arc::new(chunk_reader))
}
async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
--
2.47.2
More information about the pbs-devel
mailing list