[pbs-devel] [PATCH proxmox-backup 06/17] datastore: refactor chunk insert based on backend

Christian Ebner c.ebner at proxmox.com
Mon Nov 3 12:31:09 CET 2025


With the goal to move all the backend related implementation details
into the datastore's method and deduplicate code as much as possible.

The backend has to be passed in from the caller on chunk insert, as
this stores the s3-client which lives for the whole backup session
lifetime. Same is true for the pull sync job and the tape restore,
although for latter the s3 functionality is not exposed yet as still
incomplete.

To distinguish between inserts in datastors with cache, for cases
where chunks should bypass the cache, add a dedicated method for that.

This will facilitate the implementation of the per-chunk file locking
for cache and backend consistency on s3 backed stores.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-datastore/src/datastore.rs  | 85 ++++++++++++++++++++++++++++++++-
 src/api2/backup/upload_chunk.rs | 64 ++++---------------------
 src/api2/tape/restore.rs        |  6 ++-
 src/server/pull.rs              | 19 +-------
 4 files changed, 98 insertions(+), 76 deletions(-)

diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index ae85be76d..9008d8fc6 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Context, Error};
 use http_body_util::BodyExt;
+use hyper::body::Bytes;
 use nix::unistd::{unlinkat, UnlinkatFlags};
 use pbs_tools::lru_cache::LruCache;
 use tokio::io::AsyncWriteExt;
@@ -1855,8 +1856,88 @@ impl DataStore {
             .cond_touch_chunk(digest, assert_exists)
     }
 
-    pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
-        self.inner.chunk_store.insert_chunk(chunk, digest)
+    /// Inserts the chunk with given digest in the chunk store based on the given backend.
+    ///
+    /// The backend is passed along in order to reuse an active connection to the backend, created
+    /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+    /// This calls into async code, so callers must assure to never hold a Mutex lock.
+    pub fn insert_chunk(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        backend: &DatastoreBackend,
+    ) -> Result<(bool, u64), Error> {
+        match backend {
+            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+            DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, &s3_client),
+        }
+    }
+
+    /// Inserts the chunk with given digest in the chunk store based on the given backend, but
+    /// always bypasses the local datastore cache.
+    ///
+    /// The backend is passed along in order to reuse an active connection to the backend, created
+    /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+    /// This calls into async code, so callers must assure to never hold a Mutex lock.
+    ///
+    /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore
+    /// itself.
+    pub fn insert_chunk_no_cache(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        backend: &DatastoreBackend,
+    ) -> Result<(bool, u64), Error> {
+        match backend {
+            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+            DatastoreBackend::S3(s3_client) => {
+                let chunk_data: Bytes = chunk.raw_data().to_vec().into();
+                let chunk_size = chunk_data.len() as u64;
+                let object_key = crate::s3::object_key_from_digest(digest)?;
+                let is_duplicate = proxmox_async::runtime::block_on(
+                    s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+                )
+                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+                Ok((is_duplicate, chunk_size))
+            }
+        }
+    }
+
+    fn insert_chunk_cached(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        s3_client: &Arc<S3Client>,
+    ) -> Result<(bool, u64), Error> {
+        let chunk_data = chunk.raw_data();
+        let chunk_size = chunk_data.len() as u64;
+
+        // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache
+        // or the chunk marker file exists on filesystem. The latter means the chunk has
+        // been uploaded in the past, but was evicted from the LRU cache since but was not
+        // cleaned up by garbage collection, so contained in the S3 object store.
+        if self.cache_contains(&digest) {
+            tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
+            return Ok((true, chunk_size));
+        }
+        if let Ok(true) = self.cond_touch_chunk(digest, false) {
+            tracing::info!(
+                "Skip upload of already encountered chunk {}",
+                hex::encode(digest),
+            );
+            return Ok((true, chunk_size));
+        }
+
+        tracing::info!("Upload new chunk {}", hex::encode(digest));
+        let object_key = crate::s3::object_key_from_digest(digest)?;
+        let chunk_data: Bytes = chunk_data.to_vec().into();
+        let is_duplicate = proxmox_async::runtime::block_on(
+            s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+        )
+        .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+        tracing::info!("Caching of chunk {}", hex::encode(digest));
+        self.cache_insert(&digest, &chunk)?;
+        Ok((is_duplicate, chunk_size))
     }
 
     pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 297f23b12..f4a50002d 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,7 +5,7 @@ use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
 use http_body_util::{BodyDataStream, BodyExt};
-use hyper::body::{Bytes, Incoming};
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use serde_json::{json, Value};
 
@@ -15,7 +15,7 @@ use proxmox_sortable_macro::sortable;
 
 use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
-use pbs_datastore::{DataBlob, DatastoreBackend};
+use pbs_datastore::DataBlob;
 use pbs_tools::json::{required_integer_param, required_string_param};
 
 use super::environment::*;
@@ -232,59 +232,15 @@ async fn upload_to_backend(
     let (digest, size, chunk) =
         UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?;
 
-    match &env.backend {
-        DatastoreBackend::Filesystem => {
-            let (is_duplicate, compressed_size) = proxmox_async::runtime::block_in_place(|| {
-                env.datastore.insert_chunk(&chunk, &digest)
-            })?;
-            Ok((digest, size, compressed_size as u32, is_duplicate))
-        }
-        DatastoreBackend::S3(s3_client) => {
-            let chunk_data: Bytes = chunk.raw_data().to_vec().into();
-
-            if env.no_cache {
-                let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-                let is_duplicate = s3_client
-                    .upload_no_replace_with_retry(object_key, chunk_data)
-                    .await
-                    .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
-                return Ok((digest, size, encoded_size, is_duplicate));
-            }
-
-            // Avoid re-upload to S3 if the chunk is either present in the LRU cache or the chunk
-            // file exists on filesystem. The latter means that the chunk has been present in the
-            // past an was not cleaned up by garbage collection, so contained in the S3 object store.
-            if env.datastore.cache_contains(&digest) {
-                tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
-                return Ok((digest, size, encoded_size, true));
-            }
-            if let Ok(true) = env.datastore.cond_touch_chunk(&digest, false) {
-                tracing::info!(
-                    "Skip upload of already encountered chunk {}",
-                    hex::encode(digest)
-                );
-                return Ok((digest, size, encoded_size, true));
-            }
-
-            tracing::info!("Upload of new chunk {}", hex::encode(digest));
-            let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-            let is_duplicate = s3_client
-                .upload_no_replace_with_retry(object_key, chunk_data)
-                .await
-                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
-
-            // Only insert the chunk into the cache after it has been successufuly uploaded.
-            // Although less performant than doing this in parallel, it is required for consisency
-            // since chunks are considered as present on the backend if the file exists in the local
-            // cache store.
-            let datastore = env.datastore.clone();
-            tracing::info!("Caching of chunk {}", hex::encode(digest));
-            let _ = tokio::task::spawn_blocking(move || datastore.cache_insert(&digest, &chunk))
-                .await?;
-
-            Ok((digest, size, encoded_size, is_duplicate))
-        }
+    if env.no_cache {
+        let (is_duplicate, chunk_size) =
+            env.datastore
+                .insert_chunk_no_cache(&chunk, &digest, &env.backend)?;
+        return Ok((digest, size, chunk_size as u32, is_duplicate));
     }
+
+    let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
+    Ok((digest, size, chunk_size as u32, is_duplicate))
 }
 
 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 8b6979017..4f2ee3db6 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1151,6 +1151,7 @@ fn restore_partial_chunk_archive<'a>(
     let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
     let bytes2 = bytes.clone();
 
+    let backend = datastore.backend()?;
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1162,7 +1163,7 @@ fn restore_partial_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest)?;
+                datastore.insert_chunk(&chunk, &digest, &backend)?;
             }
             Ok(())
         },
@@ -1544,6 +1545,7 @@ fn restore_chunk_archive<'a>(
     let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
     let bytes2 = bytes.clone();
 
+    let backend = datastore.backend()?;
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1560,7 +1562,7 @@ fn restore_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest)?;
+                datastore.insert_chunk(&chunk, &digest, &backend)?;
             } else if verbose {
                 info!("Found existing chunk: {}", hex::encode(digest));
             }
diff --git a/src/server/pull.rs b/src/server/pull.rs
index de8b140bc..db7084f3e 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -171,24 +171,7 @@ async fn pull_index_chunks<I: IndexFile>(
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
-            match &backend {
-                DatastoreBackend::Filesystem => {
-                    target2.insert_chunk(&chunk, &digest)?;
-                }
-                DatastoreBackend::S3(s3_client) => {
-                    if target2.cache_contains(&digest) {
-                        return Ok(());
-                    }
-                    target2.cache_insert(&digest, &chunk)?;
-                    let data = chunk.raw_data().to_vec();
-                    let upload_data = hyper::body::Bytes::from(data);
-                    let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-                    let _is_duplicate = proxmox_async::runtime::block_on(
-                        s3_client.upload_no_replace_with_retry(object_key, upload_data),
-                    )
-                    .context("failed to upload chunk to s3 backend")?;
-                }
-            }
+            target2.insert_chunk(&chunk, &digest, &backend)?;
             Ok(())
         },
     );
-- 
2.47.3





More information about the pbs-devel mailing list