[pbs-devel] [PATCH proxmox-backup v8 30/45] datastore: add local datastore cache for network attached storages
Christian Ebner
c.ebner at proxmox.com
Fri Jul 18 16:59:23 CEST 2025
On 7/18/25 1:24 PM, Lukas Wagner wrote:
> Some rustdoc comments are missing, but otherwise looks fine to me.
>
> As a general remark, applying to this patch, but also in general: I think we should put a much larger
> focus onto writing unit- and integration tests for any significant chunks for new code, e.g.
> like the LocalDatastoreLruCache, and also slowly refactor existing code in a way so that it can be tested.
>
> Naturally, it is additional effort, but IMO it well pays of later. I'd also say that it makes
> reviews much easier, since the tests are living proof in the code that it works, and as a reviewer
> I also immediately see how the code is supposed to be used. Furthermore, they are a good way
> to detect regressions later on, e.g. due to changing third-party dependencies, and of course
> also changes in the product code itself.
>
> That being said, I won't ask you to write test for this patch now, since adding them after
> the fact is a big pain and might require a big refactor, e.g. to separate out and abstract away
> any dependencies on existing code. I just felt the urge to bring this up, since this
> is something we can definitely improve on.
Agreed, noted this in my todo list. For the time being I added the
missing docstrings as requested.
> On 2025-07-15 14:53, Christian Ebner wrote:
>> Use a local datastore as cache using LRU cache replacement policy for
>> operations on a datastore backed by a network, e.g. by an S3 object
>> store backend. The goal is to reduce number of requests to the
>> backend and thereby save costs (monetary as well as time).
>>
>> Cached chunks are stored on the local datastore cache, already
>> containing the datastore's contents metadata (namespace, group,
>> snapshot, owner, index files, ecc..), used to perform fast lookups.
>> The cache itself only stores chunk digests, not the raw data itself.
>> When payload data is required, contents are looked up and read from
>> the local datastore cache filesystem, including fallback to fetch from
>> the backend if the presumably cached entry is not found.
>>
>> The cacher allows to fetch cache items on cache misses via the access
>> method.
>>
>> The capacity of the cache is derived from the local datastore cache
>> filesystem, or by the user configured value, whichever is smalller.
>> The capacity is only set on instantiation of the store, and the current
>> value kept as long as the datastore remains cached in the datastore
>> cache. To change the value, the store has to be either be set to offline
>> mode and back, or the services restarted.
>>
>> Basic performance tests:
>>
>> Backup and upload of contents of linux git repository to AWS S3,
>> snapshots removed in-between each backup run to avoid other chunk reuse
>> optimization of PBS.
>>
>> no-cache:
>> had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.76 s (average 102.258 MiB/s)
>> empty-cache:
>> had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 50.42 s (average 102.945 MiB/s)
>> all-cached:
>> had to backup 5.069 GiB of 5.069 GiB (compressed 3.718 GiB) in 43.78 s (average 118.554 MiB/s)
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - use info instead of warn, as these might end up in the task logs as
>> well, possibly causing confusion if warning level
>>
>> pbs-datastore/src/datastore.rs | 70 ++++++-
>> pbs-datastore/src/lib.rs | 3 +
>> .../src/local_datastore_lru_cache.rs | 172 ++++++++++++++++++
>> 3 files changed, 244 insertions(+), 1 deletion(-)
>> create mode 100644 pbs-datastore/src/local_datastore_lru_cache.rs
>>
>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>> index 89f45e7f8..cab0f5b4d 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -40,9 +40,10 @@ use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
>> use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
>> use crate::hierarchy::{ListGroups, ListGroupsType, ListNamespaces, ListNamespacesRecursive};
>> use crate::index::IndexFile;
>> +use crate::local_datastore_lru_cache::S3Cacher;
>> use crate::s3::S3_CONTENT_PREFIX;
>> use crate::task_tracking::{self, update_active_operations};
>> -use crate::DataBlob;
>> +use crate::{DataBlob, LocalDatastoreLruCache};
>>
>> static DATASTORE_MAP: LazyLock<Mutex<HashMap<String, Arc<DataStoreImpl>>>> =
>> LazyLock::new(|| Mutex::new(HashMap::new()));
>> @@ -136,6 +137,7 @@ pub struct DataStoreImpl {
>> last_digest: Option<[u8; 32]>,
>> sync_level: DatastoreFSyncLevel,
>> backend_config: DatastoreBackendConfig,
>> + lru_store_caching: Option<LocalDatastoreLruCache>,
>> }
>>
>> impl DataStoreImpl {
>> @@ -151,6 +153,7 @@ impl DataStoreImpl {
>> last_digest: None,
>> sync_level: Default::default(),
>> backend_config: Default::default(),
>> + lru_store_caching: None,
>> })
>> }
>> }
>> @@ -255,6 +258,37 @@ impl DataStore {
>> Ok(backend_type)
>> }
>>
>> + pub fn cache(&self) -> Option<&LocalDatastoreLruCache> {
>> + self.inner.lru_store_caching.as_ref()
>> + }
>> +
>> + /// Check if the digest is present in the local datastore cache.
>> + /// Always returns false if there is no cache configured for this datastore.
>> + pub fn cache_contains(&self, digest: &[u8; 32]) -> bool {
>> + if let Some(cache) = self.inner.lru_store_caching.as_ref() {
>> + return cache.contains(digest);
>> + }
>> + false
>> + }
>> +
>> + /// Insert digest as most recently used on in the cache.
>> + /// Returns with success if there is no cache configured for this datastore.
>> + pub fn cache_insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
>> + if let Some(cache) = self.inner.lru_store_caching.as_ref() {
>> + return cache.insert(digest, chunk);
>> + }
>> + Ok(())
>> + }
>> +
>
> Missing rustdoc comment for this pub fn
>
>> + pub fn cacher(&self) -> Result<Option<S3Cacher>, Error> {
>> + self.backend().map(|backend| match backend {
>> + DatastoreBackend::S3(s3_client) => {
>> + Some(S3Cacher::new(s3_client, self.inner.chunk_store.clone()))
>> + }
>> + DatastoreBackend::Filesystem => None,
>> + })
>> + }
>> +
>> pub fn lookup_datastore(
>> name: &str,
>> operation: Option<Operation>,
>> @@ -437,6 +471,33 @@ impl DataStore {
>> .parse_property_string(config.backend.as_deref().unwrap_or(""))?,
>> )?;
>>
>> + let lru_store_caching = if DatastoreBackendType::S3 == backend_config.ty.unwrap_or_default()
>> + {
>> + let mut cache_capacity = 0;
>> + if let Ok(fs_info) = proxmox_sys::fs::fs_info(&chunk_store.base_path()) {
>> + cache_capacity = fs_info.available / (16 * 1024 * 1024);
>> + }
>> + if let Some(max_cache_size) = backend_config.max_cache_size {
>> + info!(
>> + "Got requested max cache size {max_cache_size} for store {}",
>> + config.name
>> + );
>> + let max_cache_capacity = max_cache_size.as_u64() / (16 * 1024 * 1024);
>> + cache_capacity = cache_capacity.min(max_cache_capacity);
>> + }
>> + let cache_capacity = usize::try_from(cache_capacity).unwrap_or_default();
>> +
>> + info!(
>> + "Using datastore cache with capacity {cache_capacity} for store {}",
>> + config.name
>> + );
>> +
>> + let cache = LocalDatastoreLruCache::new(cache_capacity, chunk_store.clone());
>> + Some(cache)
>> + } else {
>> + None
>> + };
>> +
>> Ok(DataStoreImpl {
>> chunk_store,
>> gc_mutex: Mutex::new(()),
>> @@ -446,6 +507,7 @@ impl DataStore {
>> last_digest,
>> sync_level: tuning.sync_level.unwrap_or_default(),
>> backend_config,
>> + lru_store_caching,
>> })
>> }
>>
>> @@ -1580,6 +1642,12 @@ impl DataStore {
>> chunk_count += 1;
>>
>> if atime < min_atime {
>> + if let Some(cache) = self.cache() {
>> + let mut digest_bytes = [0u8; 32];
>> + hex::decode_to_slice(digest.as_bytes(), &mut digest_bytes)?;
>> + // ignore errors, phase 3 will retry cleanup anyways
>> + let _ = cache.remove(&digest_bytes);
>> + }
>> delete_list.push(content.key);
>> if bad {
>> gc_status.removed_bad += 1;
>> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
>> index ca6fdb7d8..b9eb035c2 100644
>> --- a/pbs-datastore/src/lib.rs
>> +++ b/pbs-datastore/src/lib.rs
>> @@ -217,3 +217,6 @@ pub use snapshot_reader::SnapshotReader;
>>
>> mod local_chunk_reader;
>> pub use local_chunk_reader::LocalChunkReader;
>> +
>> +mod local_datastore_lru_cache;
>> +pub use local_datastore_lru_cache::LocalDatastoreLruCache;
>> diff --git a/pbs-datastore/src/local_datastore_lru_cache.rs b/pbs-datastore/src/local_datastore_lru_cache.rs
>> new file mode 100644
>> index 000000000..bb64c52f3
>> --- /dev/null
>> +++ b/pbs-datastore/src/local_datastore_lru_cache.rs
>> @@ -0,0 +1,172 @@
>> +//! Use a local datastore as cache for operations on a datastore attached via
>> +//! a network layer (e.g. via the S3 backend).
>> +
>> +use std::future::Future;
>> +use std::sync::Arc;
>> +
>> +use anyhow::{bail, Error};
>> +use http_body_util::BodyExt;
>> +
>> +use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
>> +use proxmox_s3_client::S3Client;
>> +
>> +use crate::ChunkStore;
>> +use crate::DataBlob;
>> +
>
> v missing rustdoc for pub struct
>
>> +#[derive(Clone)]
>> +pub struct S3Cacher {
>> + client: Arc<S3Client>,
>> + store: Arc<ChunkStore>,
>> +}
>> +
>> +impl AsyncCacher<[u8; 32], ()> for S3Cacher {
>> + fn fetch(
>> + &self,
>> + key: [u8; 32],
>> + ) -> Box<dyn Future<Output = Result<Option<()>, Error>> + Send + 'static> {
>> + let client = self.client.clone();
>> + let store = self.store.clone();
>
> rather use Arc::clone(&...) here to avoid ambiguity
>
>> + Box::new(async move {
>> + let object_key = crate::s3::object_key_from_digest(&key)?;
>> + match client.get_object(object_key).await? {
>> + None => bail!("could not fetch object with key {}", hex::encode(key)),
>> + Some(response) => {
>> + let bytes = response.content.collect().await?.to_bytes();
>> + let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> + store.insert_chunk(&chunk, &key)?;
>> + Ok(Some(()))
>> + }
>> + }
>> + })
>> + }
>> +}
>> +
>> +impl S3Cacher {
>
> v missing rustdoc for pub fn
>
>> + pub fn new(client: Arc<S3Client>, store: Arc<ChunkStore>) -> Self {
>> + Self { client, store }
>> + }
>> +}
>> +
>> +/// LRU cache using local datastore for caching chunks
>> +///
>> +/// Uses a LRU cache, but without storing the values in-memory but rather
>> +/// on the filesystem
>> +pub struct LocalDatastoreLruCache {
>> + cache: AsyncLruCache<[u8; 32], ()>,
>> + store: Arc<ChunkStore>,
>> +}
>> +
>> +impl LocalDatastoreLruCache {
>> + pub fn new(capacity: usize, store: Arc<ChunkStore>) -> Self {
>> + Self {
>> + cache: AsyncLruCache::new(capacity),
>> + store,
>> + }
>> + }
>> +
>> + /// Insert a new chunk into the local datastore cache.
>> + ///
>> + /// Fails if the chunk cannot be inserted successfully.
>> + pub fn insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
>> + self.store.insert_chunk(chunk, digest)?;
>> + self.cache.insert(*digest, (), |digest| {
>> + let (path, _digest_str) = self.store.chunk_path(&digest);
>> + // Truncate to free up space but keep the inode around, since that
>> + // is used as marker for chunks in use by garbage collection.
>> + if let Err(err) = nix::unistd::truncate(&path, 0) {
>> + if err != nix::errno::Errno::ENOENT {
>> + return Err(Error::from(err));
>> + }
>> + }
>> + Ok(())
>> + })
>> + }
>> +
>> + /// Remove a chunk from the local datastore cache.
>> + ///
>> + /// Fails if the chunk cannot be deleted successfully.
>> + pub fn remove(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> + self.cache.remove(*digest);
>> + let (path, _digest_str) = self.store.chunk_path(digest);
>> + std::fs::remove_file(path).map_err(Error::from)
>> + }
>> +
>
> v missing rustdoc
>
>> + pub async fn access(
>> + &self,
>> + digest: &[u8; 32],
>> + cacher: &mut S3Cacher,
>> + ) -> Result<Option<DataBlob>, Error> {
>> + if self
>> + .cache
>> + .access(*digest, cacher, |digest| {
>> + let (path, _digest_str) = self.store.chunk_path(&digest);
>> + // Truncate to free up space but keep the inode around, since that
>> + // is used as marker for chunks in use by garbage collection.
>> + if let Err(err) = nix::unistd::truncate(&path, 0) {
>> + if err != nix::errno::Errno::ENOENT {
>> + return Err(Error::from(err));
>> + }
>> + }
>> + Ok(())
>> + })
>> + .await?
>> + .is_some()
>> + {
>> + let (path, _digest_str) = self.store.chunk_path(digest);
>> + let mut file = match std::fs::File::open(&path) {
>> + Ok(file) => file,
>> + Err(err) => {
>> + // Expected chunk to be present since LRU cache has it, but it is missing
>> + // locally, try to fetch again
>> + if err.kind() == std::io::ErrorKind::NotFound {
>> + let object_key = crate::s3::object_key_from_digest(digest)?;
>> + match cacher.client.get_object(object_key).await? {
>> + None => {
>> + bail!("could not fetch object with key {}", hex::encode(digest))
>> + }
>> + Some(response) => {
>> + let bytes = response.content.collect().await?.to_bytes();
>> + let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> + self.store.insert_chunk(&chunk, digest)?;
>> + std::fs::File::open(&path)?
>> + }
>> + }
>> + } else {
>> + return Err(Error::from(err));
>> + }
>> + }
>> + };
>> + let chunk = match DataBlob::load_from_reader(&mut file) {
>> + Ok(chunk) => chunk,
>> + Err(err) => {
>> + use std::io::Seek;
>> + // Check if file is empty marker file, try fetching content if so
>> + if file.seek(std::io::SeekFrom::End(0))? == 0 {
>> + let object_key = crate::s3::object_key_from_digest(digest)?;
>> + match cacher.client.get_object(object_key).await? {
>> + None => {
>> + bail!("could not fetch object with key {}", hex::encode(digest))
>> + }
>> + Some(response) => {
>> + let bytes = response.content.collect().await?.to_bytes();
>> + let chunk = DataBlob::from_raw(bytes.to_vec())?;
>> + self.store.insert_chunk(&chunk, digest)?;
>> + let mut file = std::fs::File::open(&path)?;
>> + DataBlob::load_from_reader(&mut file)?
>> + }
>> + }
>> + } else {
>> + return Err(err);
>> + }
>> + }
>> + };
>> + Ok(Some(chunk))
>> + } else {
>> + Ok(None)
>> + }
>> + }
>> +
>
> v missing rustdoc
>
>> + pub fn contains(&self, digest: &[u8; 32]) -> bool {
>> + self.cache.contains(*digest)
>> + }
>> +}
>
More information about the pbs-devel
mailing list