[pbs-devel] [PATCH proxmox-backup v8 30/45] datastore: add local datastore cache for network attached storages

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 13:24:08 CEST 2025


Some rustdoc comments are missing, but otherwise looks fine to me.

As a general remark, applying to this patch, but also in general: I think we should put a much larger
focus onto writing unit- and integration tests for any significant chunks for new code, e.g.
like the LocalDatastoreLruCache, and also slowly refactor existing code in a way so that it can be tested.

Naturally, it is additional effort, but IMO it well pays of later. I'd also say that it makes
reviews much easier, since the tests are living proof in the code that it works, and as a reviewer
I also immediately see how the code is supposed to be used. Furthermore, they are a good way
to detect regressions later on, e.g. due to changing third-party dependencies, and of course
also changes in the product code itself.

That being said, I won't ask you to write test for this patch now, since adding them after
the fact is a big pain and might require a big refactor, e.g. to separate out and abstract away
any dependencies on existing code. I just felt the urge to bring this up, since this
is something we can definitely improve on.

On  2025-07-15 14:53, Christian Ebner wrote:
> Use a local datastore as cache using LRU cache replacement policy for
> operations on a datastore backed by a network, e.g. by an S3 object
> store backend. The goal is to reduce number of requests to the
> backend and thereby save costs (monetary as well as time).
> 
> Cached chunks are stored on the local datastore cache, already
> containing the datastore's contents metadata (namespace, group,
> snapshot, owner, index files, ecc..), used to perform fast lookups.
> The cache itself only stores chunk digests, not the raw data itself.
> When payload data is required, contents are looked up and read from
> the local datastore cache filesystem, including fallback to fetch from
> the backend if the presumably cached entry is not found.
> 
> The cacher allows to fetch cache items on cache misses via the access
> method.
> 
> The capacity of the cache is derived from the local datastore cache
> filesystem, or by the user configured value, whichever is smalller.
> The capacity is only set on instantiation of the store, and the current
> value kept as long as the datastore remains cached in the datastore
> cache. To change the value, the store has to be either be set to offline
> mode and back, or the services restarted.
> 
> Basic performance tests:
> 
> Backup and upload of contents of linux git repository to AWS S3,
> snapshots removed in-between each backup run to avoid other chunk reuse
> optimization of PBS.
> 
> no-cache:
>     had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.76 s (average 102.258 MiB/s)
> empty-cache:
>     had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.42 s (average 102.945 MiB/s)
> all-cached:
>     had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 43.78 s (average 118.554 MiB/s)
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - use info instead of warn, as these might end up in the task logs as
>   well, possibly causing confusion if warning level
> 
>  pbs-datastore/src/datastore.rs                |  70 ++++++-
>  pbs-datastore/src/lib.rs                      |   3 +
>  .../src/local_datastore_lru_cache.rs          | 172 ++++++++++++++++++
>  3 files changed, 244 insertions(+), 1 deletion(-)
>  create mode 100644 pbs-datastore/src/local_datastore_lru_cache.rs
> 
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index 89f45e7f8..cab0f5b4d 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -40,9 +40,10 @@ use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
>  use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
>  use crate::hierarchy::{ListGroups, ListGroupsType, ListNamespaces, ListNamespacesRecursive};
>  use crate::index::IndexFile;
> +use crate::local_datastore_lru_cache::S3Cacher;
>  use crate::s3::S3_CONTENT_PREFIX;
>  use crate::task_tracking::{self, update_active_operations};
> -use crate::DataBlob;
> +use crate::{DataBlob, LocalDatastoreLruCache};
>  
>  static DATASTORE_MAP: LazyLock<Mutex<HashMap<String, Arc<DataStoreImpl>>>> =
>      LazyLock::new(|| Mutex::new(HashMap::new()));
> @@ -136,6 +137,7 @@ pub struct DataStoreImpl {
>      last_digest: Option<[u8; 32]>,
>      sync_level: DatastoreFSyncLevel,
>      backend_config: DatastoreBackendConfig,
> +    lru_store_caching: Option<LocalDatastoreLruCache>,
>  }
>  
>  impl DataStoreImpl {
> @@ -151,6 +153,7 @@ impl DataStoreImpl {
>              last_digest: None,
>              sync_level: Default::default(),
>              backend_config: Default::default(),
> +            lru_store_caching: None,
>          })
>      }
>  }
> @@ -255,6 +258,37 @@ impl DataStore {
>          Ok(backend_type)
>      }
>  
> +    pub fn cache(&self) -> Option<&LocalDatastoreLruCache> {
> +        self.inner.lru_store_caching.as_ref()
> +    }
> +
> +    /// Check if the digest is present in the local datastore cache.
> +    /// Always returns false if there is no cache configured for this datastore.
> +    pub fn cache_contains(&self, digest: &[u8; 32]) -> bool {
> +        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
> +            return cache.contains(digest);
> +        }
> +        false
> +    }
> +
> +    /// Insert digest as most recently used on in the cache.
> +    /// Returns with success if there is no cache configured for this datastore.
> +    pub fn cache_insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
> +        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
> +            return cache.insert(digest, chunk);
> +        }
> +        Ok(())
> +    }
> +

Missing rustdoc comment for this pub fn

> +    pub fn cacher(&self) -> Result<Option<S3Cacher>, Error> {
> +        self.backend().map(|backend| match backend {
> +            DatastoreBackend::S3(s3_client) => {
> +                Some(S3Cacher::new(s3_client, self.inner.chunk_store.clone()))
> +            }
> +            DatastoreBackend::Filesystem => None,
> +        })
> +    }
> +
>      pub fn lookup_datastore(
>          name: &str,
>          operation: Option<Operation>,
> @@ -437,6 +471,33 @@ impl DataStore {
>                  .parse_property_string(config.backend.as_deref().unwrap_or(""))?,
>          )?;
>  
> +        let lru_store_caching = if DatastoreBackendType::S3 == backend_config.ty.unwrap_or_default()
> +        {
> +            let mut cache_capacity = 0;
> +            if let Ok(fs_info) = proxmox_sys::fs::fs_info(&chunk_store.base_path()) {
> +                cache_capacity = fs_info.available / (16 * 1024 * 1024);
> +            }
> +            if let Some(max_cache_size) = backend_config.max_cache_size {
> +                info!(
> +                    "Got requested max cache size {max_cache_size} for store {}",
> +                    config.name
> +                );
> +                let max_cache_capacity = max_cache_size.as_u64() / (16 * 1024 * 1024);
> +                cache_capacity = cache_capacity.min(max_cache_capacity);
> +            }
> +            let cache_capacity = usize::try_from(cache_capacity).unwrap_or_default();
> +
> +            info!(
> +                "Using datastore cache with capacity {cache_capacity} for store {}",
> +                config.name
> +            );
> +
> +            let cache = LocalDatastoreLruCache::new(cache_capacity, chunk_store.clone());
> +            Some(cache)
> +        } else {
> +            None
> +        };
> +
>          Ok(DataStoreImpl {
>              chunk_store,
>              gc_mutex: Mutex::new(()),
> @@ -446,6 +507,7 @@ impl DataStore {
>              last_digest,
>              sync_level: tuning.sync_level.unwrap_or_default(),
>              backend_config,
> +            lru_store_caching,
>          })
>      }
>  
> @@ -1580,6 +1642,12 @@ impl DataStore {
>                          chunk_count += 1;
>  
>                          if atime < min_atime {
> +                            if let Some(cache) = self.cache() {
> +                                let mut digest_bytes = [0u8; 32];
> +                                hex::decode_to_slice(digest.as_bytes(), &mut digest_bytes)?;
> +                                // ignore errors, phase 3 will retry cleanup anyways
> +                                let _ = cache.remove(&digest_bytes);
> +                            }
>                              delete_list.push(content.key);
>                              if bad {
>                                  gc_status.removed_bad += 1;
> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
> index ca6fdb7d8..b9eb035c2 100644
> --- a/pbs-datastore/src/lib.rs
> +++ b/pbs-datastore/src/lib.rs
> @@ -217,3 +217,6 @@ pub use snapshot_reader::SnapshotReader;
>  
>  mod local_chunk_reader;
>  pub use local_chunk_reader::LocalChunkReader;
> +
> +mod local_datastore_lru_cache;
> +pub use local_datastore_lru_cache::LocalDatastoreLruCache;
> diff --git a/pbs-datastore/src/local_datastore_lru_cache.rs b/pbs-datastore/src/local_datastore_lru_cache.rs
> new file mode 100644
> index 000000000..bb64c52f3
> --- /dev/null
> +++ b/pbs-datastore/src/local_datastore_lru_cache.rs
> @@ -0,0 +1,172 @@
> +//! Use a local datastore as cache for operations on a datastore attached via
> +//! a network layer (e.g. via the S3 backend).
> +
> +use std::future::Future;
> +use std::sync::Arc;
> +
> +use anyhow::{bail, Error};
> +use http_body_util::BodyExt;
> +
> +use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
> +use proxmox_s3_client::S3Client;
> +
> +use crate::ChunkStore;
> +use crate::DataBlob;
> +

v missing rustdoc for pub struct

> +#[derive(Clone)]
> +pub struct S3Cacher {
> +    client: Arc<S3Client>,
> +    store: Arc<ChunkStore>,
> +}
> +
> +impl AsyncCacher<[u8; 32], ()> for S3Cacher {
> +    fn fetch(
> +        &self,
> +        key: [u8; 32],
> +    ) -> Box<dyn Future<Output = Result<Option<()>, Error>> + Send + 'static> {
> +        let client = self.client.clone();
> +        let store = self.store.clone();

rather use Arc::clone(&...) here to avoid ambiguity

> +        Box::new(async move {
> +            let object_key = crate::s3::object_key_from_digest(&key)?;
> +            match client.get_object(object_key).await? {
> +                None => bail!("could not fetch object with key {}", hex::encode(key)),
> +                Some(response) => {
> +                    let bytes = response.content.collect().await?.to_bytes();
> +                    let chunk = DataBlob::from_raw(bytes.to_vec())?;
> +                    store.insert_chunk(&chunk, &key)?;
> +                    Ok(Some(()))
> +                }
> +            }
> +        })
> +    }
> +}
> +
> +impl S3Cacher {

v missing rustdoc for pub fn

> +    pub fn new(client: Arc<S3Client>, store: Arc<ChunkStore>) -> Self {
> +        Self { client, store }
> +    }
> +}
> +
> +/// LRU cache using local datastore for caching chunks
> +///
> +/// Uses a LRU cache, but without storing the values in-memory but rather
> +/// on the filesystem
> +pub struct LocalDatastoreLruCache {
> +    cache: AsyncLruCache<[u8; 32], ()>,
> +    store: Arc<ChunkStore>,
> +}
> +
> +impl LocalDatastoreLruCache {
> +    pub fn new(capacity: usize, store: Arc<ChunkStore>) -> Self {
> +        Self {
> +            cache: AsyncLruCache::new(capacity),
> +            store,
> +        }
> +    }
> +
> +    /// Insert a new chunk into the local datastore cache.
> +    ///
> +    /// Fails if the chunk cannot be inserted successfully.
> +    pub fn insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
> +        self.store.insert_chunk(chunk, digest)?;
> +        self.cache.insert(*digest, (), |digest| {
> +            let (path, _digest_str) = self.store.chunk_path(&digest);
> +            // Truncate to free up space but keep the inode around, since that
> +            // is used as marker for chunks in use by garbage collection.
> +            if let Err(err) = nix::unistd::truncate(&path, 0) {
> +                if err != nix::errno::Errno::ENOENT {
> +                    return Err(Error::from(err));
> +                }
> +            }
> +            Ok(())
> +        })
> +    }
> +
> +    /// Remove a chunk from the local datastore cache.
> +    ///
> +    /// Fails if the chunk cannot be deleted successfully.
> +    pub fn remove(&self, digest: &[u8; 32]) -> Result<(), Error> {
> +        self.cache.remove(*digest);
> +        let (path, _digest_str) = self.store.chunk_path(digest);
> +        std::fs::remove_file(path).map_err(Error::from)
> +    }
> +

v missing rustdoc

> +    pub async fn access(
> +        &self,
> +        digest: &[u8; 32],
> +        cacher: &mut S3Cacher,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        if self
> +            .cache
> +            .access(*digest, cacher, |digest| {
> +                let (path, _digest_str) = self.store.chunk_path(&digest);
> +                // Truncate to free up space but keep the inode around, since that
> +                // is used as marker for chunks in use by garbage collection.
> +                if let Err(err) = nix::unistd::truncate(&path, 0) {
> +                    if err != nix::errno::Errno::ENOENT {
> +                        return Err(Error::from(err));
> +                    }
> +                }
> +                Ok(())
> +            })
> +            .await?
> +            .is_some()
> +        {
> +            let (path, _digest_str) = self.store.chunk_path(digest);
> +            let mut file = match std::fs::File::open(&path) {
> +                Ok(file) => file,
> +                Err(err) => {
> +                    // Expected chunk to be present since LRU cache has it, but it is missing
> +                    // locally, try to fetch again
> +                    if err.kind() == std::io::ErrorKind::NotFound {
> +                        let object_key = crate::s3::object_key_from_digest(digest)?;
> +                        match cacher.client.get_object(object_key).await? {
> +                            None => {
> +                                bail!("could not fetch object with key {}", hex::encode(digest))
> +                            }
> +                            Some(response) => {
> +                                let bytes = response.content.collect().await?.to_bytes();
> +                                let chunk = DataBlob::from_raw(bytes.to_vec())?;
> +                                self.store.insert_chunk(&chunk, digest)?;
> +                                std::fs::File::open(&path)?
> +                            }
> +                        }
> +                    } else {
> +                        return Err(Error::from(err));
> +                    }
> +                }
> +            };
> +            let chunk = match DataBlob::load_from_reader(&mut file) {
> +                Ok(chunk) => chunk,
> +                Err(err) => {
> +                    use std::io::Seek;
> +                    // Check if file is empty marker file, try fetching content if so
> +                    if file.seek(std::io::SeekFrom::End(0))? == 0 {
> +                        let object_key = crate::s3::object_key_from_digest(digest)?;
> +                        match cacher.client.get_object(object_key).await? {
> +                            None => {
> +                                bail!("could not fetch object with key {}", hex::encode(digest))
> +                            }
> +                            Some(response) => {
> +                                let bytes = response.content.collect().await?.to_bytes();
> +                                let chunk = DataBlob::from_raw(bytes.to_vec())?;
> +                                self.store.insert_chunk(&chunk, digest)?;
> +                                let mut file = std::fs::File::open(&path)?;
> +                                DataBlob::load_from_reader(&mut file)?
> +                            }
> +                        }
> +                    } else {
> +                        return Err(err);
> +                    }
> +                }
> +            };
> +            Ok(Some(chunk))
> +        } else {
> +            Ok(None)
> +        }
> +    }
> +

v missing rustdoc

> +    pub fn contains(&self, digest: &[u8; 32]) -> bool {
> +        self.cache.contains(*digest)
> +    }
> +}

-- 
- Lukas





More information about the pbs-devel mailing list