[pbs-devel] [PATCH proxmox-backup v8 30/45] datastore: add local datastore cache for network attached storages

Christian Ebner c.ebner at proxmox.com
Fri Jul 18 16:59:23 CEST 2025


On 7/18/25 1:24 PM, Lukas Wagner wrote:
> Some rustdoc comments are missing, but otherwise looks fine to me.
> 
> As a general remark, applying to this patch, but also in general: I think we should put a much larger
> focus onto writing unit- and integration tests for any significant chunks for new code, e.g.
> like the LocalDatastoreLruCache, and also slowly refactor existing code in a way so that it can be tested.
> 
> Naturally, it is additional effort, but IMO it well pays of later. I'd also say that it makes
> reviews much easier, since the tests are living proof in the code that it works, and as a reviewer
> I also immediately see how the code is supposed to be used. Furthermore, they are a good way
> to detect regressions later on, e.g. due to changing third-party dependencies, and of course
> also changes in the product code itself.
> 
> That being said, I won't ask you to write test for this patch now, since adding them after
> the fact is a big pain and might require a big refactor, e.g. to separate out and abstract away
> any dependencies on existing code. I just felt the urge to bring this up, since this
> is something we can definitely improve on.

Agreed, noted this in my todo list. For the time being I added the 
missing docstrings as requested.

> On  2025-07-15 14:53, Christian Ebner wrote:
>> Use a local datastore as cache using LRU cache replacement policy for
>> operations on a datastore backed by a network, e.g. by an S3 object
>> store backend. The goal is to reduce number of requests to the
>> backend and thereby save costs (monetary as well as time).
>>
>> Cached chunks are stored on the local datastore cache, already
>> containing the datastore's contents metadata (namespace, group,
>> snapshot, owner, index files, ecc..), used to perform fast lookups.
>> The cache itself only stores chunk digests, not the raw data itself.
>> When payload data is required, contents are looked up and read from
>> the local datastore cache filesystem, including fallback to fetch from
>> the backend if the presumably cached entry is not found.
>>
>> The cacher allows to fetch cache items on cache misses via the access
>> method.
>>
>> The capacity of the cache is derived from the local datastore cache
>> filesystem, or by the user configured value, whichever is smalller.
>> The capacity is only set on instantiation of the store, and the current
>> value kept as long as the datastore remains cached in the datastore
>> cache. To change the value, the store has to be either be set to offline
>> mode and back, or the services restarted.
>>
>> Basic performance tests:
>>
>> Backup and upload of contents of linux git repository to AWS S3,
>> snapshots removed in-between each backup run to avoid other chunk reuse
>> optimization of PBS.
>>
>> no-cache:
>>      had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.76 s (average 102.258 MiB/s)
>> empty-cache:
>>      had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.42 s (average 102.945 MiB/s)
>> all-cached:
>>      had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 43.78 s (average 118.554 MiB/s)
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - use info instead of warn, as these might end up in the task logs as
>>    well, possibly causing confusion if warning level
>>
>>   pbs-datastore/src/datastore.rs                |  70 ++++++-
>>   pbs-datastore/src/lib.rs                      |   3 +
>>   .../src/local_datastore_lru_cache.rs          | 172 ++++++++++++++++++
>>   3 files changed, 244 insertions(+), 1 deletion(-)
>>   create mode 100644 pbs-datastore/src/local_datastore_lru_cache.rs
>>
>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>> index 89f45e7f8..cab0f5b4d 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -40,9 +40,10 @@ use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
>>   use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
>>   use crate::hierarchy::{ListGroups, ListGroupsType, ListNamespaces, ListNamespacesRecursive};
>>   use crate::index::IndexFile;
>> +use crate::local_datastore_lru_cache::S3Cacher;
>>   use crate::s3::S3_CONTENT_PREFIX;
>>   use crate::task_tracking::{self, update_active_operations};
>> -use crate::DataBlob;
>> +use crate::{DataBlob, LocalDatastoreLruCache};
>>   
>>   static DATASTORE_MAP: LazyLock<Mutex<HashMap<String, Arc<DataStoreImpl>>>> =
>>       LazyLock::new(|| Mutex::new(HashMap::new()));
>> @@ -136,6 +137,7 @@ pub struct DataStoreImpl {
>>       last_digest: Option<[u8; 32]>,
>>       sync_level: DatastoreFSyncLevel,
>>       backend_config: DatastoreBackendConfig,
>> +    lru_store_caching: Option<LocalDatastoreLruCache>,
>>   }
>>   
>>   impl DataStoreImpl {
>> @@ -151,6 +153,7 @@ impl DataStoreImpl {
>>               last_digest: None,
>>               sync_level: Default::default(),
>>               backend_config: Default::default(),
>> +            lru_store_caching: None,
>>           })
>>       }
>>   }
>> @@ -255,6 +258,37 @@ impl DataStore {
>>           Ok(backend_type)
>>       }
>>   
>> +    pub fn cache(&self) -> Option<&LocalDatastoreLruCache> {
>> +        self.inner.lru_store_caching.as_ref()
>> +    }
>> +
>> +    /// Check if the digest is present in the local datastore cache.
>> +    /// Always returns false if there is no cache configured for this datastore.
>> +    pub fn cache_contains(&self, digest: &[u8; 32]) -> bool {
>> +        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
>> +            return cache.contains(digest);
>> +        }
>> +        false
>> +    }
>> +
>> +    /// Insert digest as most recently used on in the cache.
>> +    /// Returns with success if there is no cache configured for this datastore.
>> +    pub fn cache_insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
>> +        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
>> +            return cache.insert(digest, chunk);
>> +        }
>> +        Ok(())
>> +    }
>> +
> 
> Missing rustdoc comment for this pub fn
> 
>> +    pub fn cacher(&self) -> Result<Option<S3Cacher>, Error> {
>> +        self.backend().map(|backend| match backend {
>> +            DatastoreBackend::S3(s3_client) => {
>> +                Some(S3Cacher::new(s3_client, self.inner.chunk_store.clone()))
>> +            }
>> +            DatastoreBackend::Filesystem => None,
>> +        })
>> +    }
>> +
>>       pub fn lookup_datastore(
>>           name: &str,
>>           operation: Option<Operation>,
>> @@ -437,6 +471,33 @@ impl DataStore {
>>                   .parse_property_string(config.backend.as_deref().unwrap_or(""))?,
>>           )?;
>>   
>> +        let lru_store_caching = if DatastoreBackendType::S3 == backend_config.ty.unwrap_or_default()
>> +        {
>> +            let mut cache_capacity = 0;
>> +            if let Ok(fs_info) = proxmox_sys::fs::fs_info(&chunk_store.base_path()) {
>> +                cache_capacity = fs_info.available / (16 * 1024 * 1024);
>> +            }
>> +            if let Some(max_cache_size) = backend_config.max_cache_size {
>> +                info!(
>> +                    "Got requested max cache size {max_cache_size} for store {}",
>> +                    config.name
>> +                );
>> +                let max_cache_capacity = max_cache_size.as_u64() / (16 * 1024 * 1024);
>> +                cache_capacity = cache_capacity.min(max_cache_capacity);
>> +            }
>> +            let cache_capacity = usize::try_from(cache_capacity).unwrap_or_default();
>> +
>> +            info!(
>> +                "Using datastore cache with capacity {cache_capacity} for store {}",
>> +                config.name
>> +            );
>> +
>> +            let cache = LocalDatastoreLruCache::new(cache_capacity, chunk_store.clone());
>> +            Some(cache)
>> +        } else {
>> +            None
>> +        };
>> +
>>           Ok(DataStoreImpl {
>>               chunk_store,
>>               gc_mutex: Mutex::new(()),
>> @@ -446,6 +507,7 @@ impl DataStore {
>>               last_digest,
>>               sync_level: tuning.sync_level.unwrap_or_default(),
>>               backend_config,
>> +            lru_store_caching,
>>           })
>>       }
>>   
>> @@ -1580,6 +1642,12 @@ impl DataStore {
>>                           chunk_count += 1;
>>   
>>                           if atime < min_atime {
>> +                            if let Some(cache) = self.cache() {
>> +                                let mut digest_bytes = [0u8; 32];
>> +                                hex::decode_to_slice(digest.as_bytes(), &mut digest_bytes)?;
>> +                                // ignore errors, phase 3 will retry cleanup anyways
>> +                                let _ = cache.remove(&digest_bytes);
>> +                            }
>>                               delete_list.push(content.key);
>>                               if bad {
>>                                   gc_status.removed_bad += 1;
>> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
>> index ca6fdb7d8..b9eb035c2 100644
>> --- a/pbs-datastore/src/lib.rs
>> +++ b/pbs-datastore/src/lib.rs
>> @@ -217,3 +217,6 @@ pub use snapshot_reader::SnapshotReader;
>>   
>>   mod local_chunk_reader;
>>   pub use local_chunk_reader::LocalChunkReader;
>> +
>> +mod local_datastore_lru_cache;
>> +pub use local_datastore_lru_cache::LocalDatastoreLruCache;
>> diff --git a/pbs-datastore/src/local_datastore_lru_cache.rs b/pbs-datastore/src/local_datastore_lru_cache.rs
>> new file mode 100644
>> index 000000000..bb64c52f3
>> --- /dev/null
>> +++ b/pbs-datastore/src/local_datastore_lru_cache.rs
>> @@ -0,0 +1,172 @@
>> +//! Use a local datastore as cache for operations on a datastore attached via
>> +//! a network layer (e.g. via the S3 backend).
>> +
>> +use std::future::Future;
>> +use std::sync::Arc;
>> +
>> +use anyhow::{bail, Error};
>> +use http_body_util::BodyExt;
>> +
>> +use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
>> +use proxmox_s3_client::S3Client;
>> +
>> +use crate::ChunkStore;
>> +use crate::DataBlob;
>> +
> 
> v missing rustdoc for pub struct
> 
>> +#[derive(Clone)]
>> +pub struct S3Cacher {
>> +    client: Arc<S3Client>,
>> +    store: Arc<ChunkStore>,
>> +}
>> +
>> +impl AsyncCacher<[u8; 32], ()> for S3Cacher {
>> +    fn fetch(
>> +        &self,
>> +        key: [u8; 32],
>> +    ) -> Box<dyn Future<Output = Result<Option<()>, Error>> + Send + 'static> {
>> +        let client = self.client.clone();
>> +        let store = self.store.clone();
> 
> rather use Arc::clone(&...) here to avoid ambiguity
> 
>> +        Box::new(async move {
>> +            let object_key = crate::s3::object_key_from_digest(&key)?;
>> +            match client.get_object(object_key).await? {
>> +                None => bail!("could not fetch object with key {}", hex::encode(key)),
>> +                Some(response) => {
>> +                    let bytes = response.content.collect().await?.to_bytes();
>> +                    let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> +                    store.insert_chunk(&chunk, &key)?;
>> +                    Ok(Some(()))
>> +                }
>> +            }
>> +        })
>> +    }
>> +}
>> +
>> +impl S3Cacher {
> 
> v missing rustdoc for pub fn
> 
>> +    pub fn new(client: Arc<S3Client>, store: Arc<ChunkStore>) -> Self {
>> +        Self { client, store }
>> +    }
>> +}
>> +
>> +/// LRU cache using local datastore for caching chunks
>> +///
>> +/// Uses a LRU cache, but without storing the values in-memory but rather
>> +/// on the filesystem
>> +pub struct LocalDatastoreLruCache {
>> +    cache: AsyncLruCache<[u8; 32], ()>,
>> +    store: Arc<ChunkStore>,
>> +}
>> +
>> +impl LocalDatastoreLruCache {
>> +    pub fn new(capacity: usize, store: Arc<ChunkStore>) -> Self {
>> +        Self {
>> +            cache: AsyncLruCache::new(capacity),
>> +            store,
>> +        }
>> +    }
>> +
>> +    /// Insert a new chunk into the local datastore cache.
>> +    ///
>> +    /// Fails if the chunk cannot be inserted successfully.
>> +    pub fn insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
>> +        self.store.insert_chunk(chunk, digest)?;
>> +        self.cache.insert(*digest, (), |digest| {
>> +            let (path, _digest_str) = self.store.chunk_path(&digest);
>> +            // Truncate to free up space but keep the inode around, since that
>> +            // is used as marker for chunks in use by garbage collection.
>> +            if let Err(err) = nix::unistd::truncate(&path, 0) {
>> +                if err != nix::errno::Errno::ENOENT {
>> +                    return Err(Error::from(err));
>> +                }
>> +            }
>> +            Ok(())
>> +        })
>> +    }
>> +
>> +    /// Remove a chunk from the local datastore cache.
>> +    ///
>> +    /// Fails if the chunk cannot be deleted successfully.
>> +    pub fn remove(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> +        self.cache.remove(*digest);
>> +        let (path, _digest_str) = self.store.chunk_path(digest);
>> +        std::fs::remove_file(path).map_err(Error::from)
>> +    }
>> +
> 
> v missing rustdoc
> 
>> +    pub async fn access(
>> +        &self,
>> +        digest: &[u8; 32],
>> +        cacher: &mut S3Cacher,
>> +    ) -> Result<Option<DataBlob>, Error> {
>> +        if self
>> +            .cache
>> +            .access(*digest, cacher, |digest| {
>> +                let (path, _digest_str) = self.store.chunk_path(&digest);
>> +                // Truncate to free up space but keep the inode around, since that
>> +                // is used as marker for chunks in use by garbage collection.
>> +                if let Err(err) = nix::unistd::truncate(&path, 0) {
>> +                    if err != nix::errno::Errno::ENOENT {
>> +                        return Err(Error::from(err));
>> +                    }
>> +                }
>> +                Ok(())
>> +            })
>> +            .await?
>> +            .is_some()
>> +        {
>> +            let (path, _digest_str) = self.store.chunk_path(digest);
>> +            let mut file = match std::fs::File::open(&path) {
>> +                Ok(file) => file,
>> +                Err(err) => {
>> +                    // Expected chunk to be present since LRU cache has it, but it is missing
>> +                    // locally, try to fetch again
>> +                    if err.kind() == std::io::ErrorKind::NotFound {
>> +                        let object_key = crate::s3::object_key_from_digest(digest)?;
>> +                        match cacher.client.get_object(object_key).await? {
>> +                            None => {
>> +                                bail!("could not fetch object with key {}", hex::encode(digest))
>> +                            }
>> +                            Some(response) => {
>> +                                let bytes = response.content.collect().await?.to_bytes();
>> +                                let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> +                                self.store.insert_chunk(&chunk, digest)?;
>> +                                std::fs::File::open(&path)?
>> +                            }
>> +                        }
>> +                    } else {
>> +                        return Err(Error::from(err));
>> +                    }
>> +                }
>> +            };
>> +            let chunk = match DataBlob::load_from_reader(&mut file) {
>> +                Ok(chunk) => chunk,
>> +                Err(err) => {
>> +                    use std::io::Seek;
>> +                    // Check if file is empty marker file, try fetching content if so
>> +                    if file.seek(std::io::SeekFrom::End(0))? == 0 {
>> +                        let object_key = crate::s3::object_key_from_digest(digest)?;
>> +                        match cacher.client.get_object(object_key).await? {
>> +                            None => {
>> +                                bail!("could not fetch object with key {}", hex::encode(digest))
>> +                            }
>> +                            Some(response) => {
>> +                                let bytes = response.content.collect().await?.to_bytes();
>> +                                let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> +                                self.store.insert_chunk(&chunk, digest)?;
>> +                                let mut file = std::fs::File::open(&path)?;
>> +                                DataBlob::load_from_reader(&mut file)?
>> +                            }
>> +                        }
>> +                    } else {
>> +                        return Err(err);
>> +                    }
>> +                }
>> +            };
>> +            Ok(Some(chunk))
>> +        } else {
>> +            Ok(None)
>> +        }
>> +    }
>> +
> 
> v missing rustdoc
> 
>> +    pub fn contains(&self, digest: &[u8; 32]) -> bool {
>> +        self.cache.contains(*digest)
>> +    }
>> +}
> 





More information about the pbs-devel mailing list