[pbs-devel] [PATCH proxmox-backup 04/11] RemoteChunkReader: add LRU cached variant

Stefan Reiter s.reiter at proxmox.com
Mon Jan 11 12:14:02 CET 2021


Retain the old constructor for compatibility, most use cases don't need
an LRU cache anyway.

For now convert the 'mount' API to use the new variant, as the same set
of chunks might be accessed multiple times in a random pattern there.

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---

I looked at using the Accessor API of LruCache first, which would make this a
bit cleaner, but that's a trait and we use async, so...

 src/bin/proxmox_backup_client/mount.rs |  4 +-
 src/client/remote_chunk_reader.rs      | 77 ++++++++++++++++++++------
 2 files changed, 62 insertions(+), 19 deletions(-)

diff --git a/src/bin/proxmox_backup_client/mount.rs b/src/bin/proxmox_backup_client/mount.rs
index 6a22f78b..7785d812 100644
--- a/src/bin/proxmox_backup_client/mount.rs
+++ b/src/bin/proxmox_backup_client/mount.rs
@@ -251,7 +251,7 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
     if server_archive_name.ends_with(".didx") {
         let index = client.download_dynamic_index(&manifest, &server_archive_name).await?;
         let most_used = index.find_most_used_chunks(8);
-        let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used);
+        let chunk_reader = RemoteChunkReader::new_lru_cached(client.clone(), crypt_config, file_info.chunk_crypt_mode(), most_used, 16);
         let reader = BufferedDynamicReader::new(index, chunk_reader);
         let archive_size = reader.archive_size();
         let reader: proxmox_backup::pxar::fuse::Reader =
@@ -277,7 +277,7 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
     } else if server_archive_name.ends_with(".fidx") {
         let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
         let size = index.index_bytes();
-        let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new());
+        let chunk_reader = RemoteChunkReader::new_lru_cached(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new(), 16);
         let reader = AsyncIndexReader::new(index, chunk_reader);
 
         let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
diff --git a/src/client/remote_chunk_reader.rs b/src/client/remote_chunk_reader.rs
index 06f693a2..1314bcdc 100644
--- a/src/client/remote_chunk_reader.rs
+++ b/src/client/remote_chunk_reader.rs
@@ -8,6 +8,13 @@ use anyhow::{bail, Error};
 use super::BackupReader;
 use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
 use crate::tools::runtime::block_on;
+use crate::tools::lru_cache::LruCache;
+
+struct Cache {
+    cache_hint: HashMap<[u8; 32], usize>,
+    hinted: HashMap<[u8; 32], Vec<u8>>,
+    lru: Option<LruCache<[u8; 32], Vec<u8>>>,
+}
 
 /// Read chunks from remote host using ``BackupReader``
 #[derive(Clone)]
@@ -15,8 +22,7 @@ pub struct RemoteChunkReader {
     client: Arc<BackupReader>,
     crypt_config: Option<Arc<CryptConfig>>,
     crypt_mode: CryptMode,
-    cache_hint: Arc<HashMap<[u8; 32], usize>>,
-    cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
+    cache: Arc<Mutex<Cache>>,
 }
 
 impl RemoteChunkReader {
@@ -28,13 +34,30 @@ impl RemoteChunkReader {
         crypt_config: Option<Arc<CryptConfig>>,
         crypt_mode: CryptMode,
         cache_hint: HashMap<[u8; 32], usize>,
+    ) -> Self {
+        Self::new_lru_cached(client, crypt_config, crypt_mode, cache_hint, 0)
+    }
+
+    /// Create a new instance.
+    ///
+    /// Chunks listed in ``cache_hint`` are cached and kept in RAM, as well as the last
+    /// 'cache_last' accessed chunks.
+    pub fn new_lru_cached(
+        client: Arc<BackupReader>,
+        crypt_config: Option<Arc<CryptConfig>>,
+        crypt_mode: CryptMode,
+        cache_hint: HashMap<[u8; 32], usize>,
+        cache_last: usize,
     ) -> Self {
         Self {
             client,
             crypt_config,
             crypt_mode,
-            cache_hint: Arc::new(cache_hint),
-            cache: Arc::new(Mutex::new(HashMap::new())),
+            cache: Arc::new(Mutex::new(Cache {
+                hinted: HashMap::with_capacity(cache_hint.len()),
+                lru: if cache_last == 0 { None } else { Some(LruCache::new(cache_last)) },
+                cache_hint,
+            })),
         }
     }
 
@@ -64,6 +87,34 @@ impl RemoteChunkReader {
             },
         }
     }
+
+    fn cache_get(&self, digest: &[u8; 32]) -> Option<Vec<u8>> {
+        let cache = &mut *self.cache.lock().unwrap();
+        if let Some(data) = cache.hinted.get(digest) {
+            return Some(data.to_vec());
+        }
+
+        cache
+            .lru
+            .as_mut()
+            .map(|lru| lru.get_mut(*digest).map(|x| x.to_vec()))
+            .flatten()
+    }
+
+    fn cache_insert(&self, digest: &[u8; 32], raw_data: &Vec<u8>) {
+        let cache = &mut *self.cache.lock().unwrap();
+
+        // if hinted, always cache given digest
+        if cache.cache_hint.contains_key(digest) {
+            cache.hinted.insert(*digest, raw_data.to_vec());
+            return;
+        }
+
+        // otherwise put in LRU
+        if let Some(ref mut lru) = cache.lru {
+            lru.insert(*digest, raw_data.to_vec());
+        }
+    }
 }
 
 impl ReadChunk for RemoteChunkReader {
@@ -72,18 +123,14 @@ impl ReadChunk for RemoteChunkReader {
     }
 
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
-        if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
-            return Ok(raw_data.to_vec());
+        if let Some(raw_data) = self.cache_get(digest) {
+            return Ok(raw_data);
         }
 
         let chunk = ReadChunk::read_raw_chunk(self, digest)?;
 
         let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
-        let use_cache = self.cache_hint.contains_key(digest);
-        if use_cache {
-            (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
-        }
+        self.cache_insert(digest, &raw_data);
 
         Ok(raw_data)
     }
@@ -102,18 +149,14 @@ impl AsyncReadChunk for RemoteChunkReader {
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
         Box::pin(async move {
-            if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
+            if let Some(raw_data) = self.cache_get(digest) {
                 return Ok(raw_data.to_vec());
             }
 
             let chunk = Self::read_raw_chunk(self, digest).await?;
 
             let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
-            let use_cache = self.cache_hint.contains_key(digest);
-            if use_cache {
-                (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
-            }
+            self.cache_insert(digest, &raw_data);
 
             Ok(raw_data)
         })
-- 
2.20.1






More information about the pbs-devel mailing list