[pve-devel] [RFC proxmox 4/7] cache: add new crate 'proxmox-cache'

Lukas Wagner l.wagner at proxmox.com
Mon Aug 21 15:44:41 CEST 2023


For now, it contains a file-backed cache with expiration logic.
The cache should be safe to be accessed from multiple processes at
once.

The cache stores values in a directory, based on the key.
E.g. key "foo" results in a file 'foo.json' in the given base
directory. If a new value is set, the file is atomically replaced.
The JSON file also contains some metadata, namely 'added_at' and
'expire_in' - they are used for cache expiration.

Note: This cache is not suited to applications that
 - Might want to cache huge amounts of data, and/or access the cache
   very frequently (due to the overhead of JSON de/serialization)
 - Require arbitrary keys - right now, keys are limited by
   SAFE_ID_REGEX

The cache was developed for the use in pvestatd, in order to cache
e.g. storage plugin status. There, these limitations do not really
play any role.

Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
---
 Cargo.toml                            |   1 +
 proxmox-cache/Cargo.toml              |  20 ++
 proxmox-cache/examples/performance.rs |  82 ++++++++
 proxmox-cache/src/lib.rs              |  40 ++++
 proxmox-cache/src/shared_cache.rs     | 263 ++++++++++++++++++++++++++
 5 files changed, 406 insertions(+)
 create mode 100644 proxmox-cache/Cargo.toml
 create mode 100644 proxmox-cache/examples/performance.rs
 create mode 100644 proxmox-cache/src/lib.rs
 create mode 100644 proxmox-cache/src/shared_cache.rs

diff --git a/Cargo.toml b/Cargo.toml
index e334ac1..940e1d0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,6 +5,7 @@ members = [
     "proxmox-async",
     "proxmox-auth-api",
     "proxmox-borrow",
+    "proxmox-cache",
     "proxmox-client",
     "proxmox-compression",
     "proxmox-http",
diff --git a/proxmox-cache/Cargo.toml b/proxmox-cache/Cargo.toml
new file mode 100644
index 0000000..b20921f
--- /dev/null
+++ b/proxmox-cache/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox-cache"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+repository.workspace = true
+exclude.workspace = true
+description = "Cache implementations"
+
+[dependencies]
+anyhow.workspace = true
+proxmox-sys.workspace = true
+proxmox-time.workspace = true
+proxmox-schema = { workspace = true, features = ["api-types"]}
+serde_json.workspace = true
+serde = { workspace = true, features = ["derive"]}
+
+[dev-dependencies]
+nix.workspace = true
diff --git a/proxmox-cache/examples/performance.rs b/proxmox-cache/examples/performance.rs
new file mode 100644
index 0000000..420f61c
--- /dev/null
+++ b/proxmox-cache/examples/performance.rs
@@ -0,0 +1,82 @@
+use proxmox_cache::Cache;
+use proxmox_cache::SharedCache;
+use proxmox_sys::fs::CreateOptions;
+use std::time::Instant;
+
+fn main() {
+    let options = CreateOptions::new()
+        .owner(nix::unistd::Uid::effective())
+        .group(nix::unistd::Gid::effective())
+        .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
+
+    let cache = SharedCache::new("/tmp/pmx-cache", options).unwrap();
+
+    let mut keys = Vec::new();
+
+    for i in 0..100000 {
+        keys.push(format!("key_{i}"));
+    }
+
+    let data = serde_json::json!({
+        "member1": "foo",
+        "member2": "foo",
+        "member3": "foo",
+        "member4": "foo",
+        "member5": "foo",
+        "member5": "foo",
+        "member6": "foo",
+        "member7": "foo",
+        "member8": "foo",
+        "array": [10, 20, 30, 40, 50],
+        "object": {
+            "member1": "foo",
+            "member2": "foo",
+            "member3": "foo",
+            "member4": "foo",
+            "member5": "foo",
+            "member5": "foo",
+            "member6": "foo",
+            "member7": "foo",
+            "member8": "foo",
+        }
+    });
+
+    let before = Instant::now();
+
+    for key in &keys {
+        cache
+            .set(key, data.clone(), None)
+            .expect("could not insert value");
+    }
+
+    let time = Instant::now() - before;
+    let time_per_op = time / keys.len() as u32;
+    println!(
+        "inserting {len} keys took {time:?} ({time_per_op:?} per key)",
+        len = keys.len(),
+    );
+
+    let before = Instant::now();
+    for key in &keys {
+        let _ = cache.get(key).expect("could not get value");
+    }
+
+    let time = Instant::now() - before;
+    let time_per_op = time / keys.len() as u32;
+    println!(
+        "getting {len} keys took {time:?} ({time_per_op:?} per key)",
+        len = keys.len(),
+    );
+
+    let before = Instant::now();
+    for key in &keys {
+        cache.delete(key).expect("could not delete value");
+    }
+
+    let time = Instant::now() - before;
+    let time_per_op = time / keys.len() as u32;
+    println!(
+        "deleting {len} keys took {time:?} ({time_per_op:?} per key)",
+        len = keys.len(),
+    );
+}
diff --git a/proxmox-cache/src/lib.rs b/proxmox-cache/src/lib.rs
new file mode 100644
index 0000000..d496dc7
--- /dev/null
+++ b/proxmox-cache/src/lib.rs
@@ -0,0 +1,40 @@
+use anyhow::Error;
+use serde_json::Value;
+
+pub mod shared_cache;
+
+pub use shared_cache::SharedCache;
+
+trait TimeProvider {
+    /// Returns the current time as a UNIX epoch (second resolution)
+    fn now(&self) -> i64;
+}
+
+struct DefaultTimeProvider;
+
+impl TimeProvider for DefaultTimeProvider {
+    fn now(&self) -> i64 {
+        proxmox_time::epoch_i64()
+    }
+}
+
+pub trait Cache {
+    /// Set or insert a cache entry.
+    ///
+    /// If `expires_in` is set, this entry will expire in the desired number of
+    /// seconds.
+    fn set<S: AsRef<str>>(
+        &self,
+        key: S,
+        value: Value,
+        expires_in: Option<i64>,
+    ) -> Result<(), Error>;
+
+    /// Delete a cache entry.
+    fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error>;
+
+    /// Get a value from the cache.
+    ///
+    /// Expired entries will *not* be returned.
+    fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error>;
+}
diff --git a/proxmox-cache/src/shared_cache.rs b/proxmox-cache/src/shared_cache.rs
new file mode 100644
index 0000000..be6212c
--- /dev/null
+++ b/proxmox-cache/src/shared_cache.rs
@@ -0,0 +1,263 @@
+use std::path::{Path, PathBuf};
+
+use anyhow::{bail, Error};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+use proxmox_schema::api_types::SAFE_ID_FORMAT;
+use proxmox_sys::fs::CreateOptions;
+
+use crate::{Cache, DefaultTimeProvider, TimeProvider};
+
+/// A simple, file-backed cache that can be used from multiple processes concurrently.
+///
+/// Cache entries are stored as individual files inside a base directory. For instance,
+/// a cache entry with the key 'disk_stats' will result in a file 'disk_stats.json' inside
+/// the base directory. As the extension implies, the cached data will be stored as a JSON
+/// string.
+///
+/// For optimal performance, `SharedCache` should have its base directory in a `tmpfs`.
+///
+/// ## Key Space
+/// Due to the fact that cache keys are being directly used as filenames, they have to match the
+/// following regular expression: `[A-Za-z0-9_][A-Za-z0-9._\-]*`
+///
+/// ## Concurrency
+/// All cache operations are based on atomic file operations, thus accessing/updating the cache from
+/// multiple processes at the same time is safe.
+///
+/// ## Performance
+/// On a tmpfs:
+/// ```sh
+///   $ cargo run --release --example=performance
+///   inserting 100000 keys took 896.609758ms (8.966µs per key)
+///   getting 100000 keys took 584.874842ms (5.848µs per key)
+///   deleting 100000 keys took 247.742702ms (2.477µs per key)
+///
+/// Inserting/getting large objects might of course result in lower performance due to the cost
+/// of serialization.
+/// ```
+///
+pub struct SharedCache {
+    base_path: PathBuf,
+    time_provider: Box<dyn TimeProvider>,
+    create_options: CreateOptions,
+}
+
+impl Cache for SharedCache {
+    fn set<S: AsRef<str>>(
+        &self,
+        key: S,
+        value: Value,
+        expires_in: Option<i64>,
+    ) -> Result<(), Error> {
+        let path = self.get_path_for_key(key.as_ref())?;
+        let added_at = self.time_provider.now();
+
+        let item = CachedItem {
+            value,
+            added_at,
+            expires_in,
+        };
+
+        let serialized = serde_json::to_vec_pretty(&item)?;
+
+        // Atomically replace file
+        proxmox_sys::fs::replace_file(path, &serialized, self.create_options.clone(), true)?;
+        Ok(())
+    }
+
+    fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error> {
+        let path = self.get_path_for_key(key.as_ref())?;
+        std::fs::remove_file(path)?;
+        Ok(())
+    }
+
+    fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error> {
+        let path = self.get_path_for_key(key.as_ref())?;
+
+        let value = if let Some(content) = proxmox_sys::fs::file_get_optional_contents(path)? {
+            let value: CachedItem = serde_json::from_slice(&content)?;
+
+            let now = self.time_provider.now();
+
+            if let Some(expires_in) = value.expires_in {
+                // Check if value is not expired yet. Also do not allow
+                // values from the future, in case we have clock jumps
+                if value.added_at + expires_in > now && value.added_at <= now {
+                    Some(value.value)
+                } else {
+                    None
+                }
+            } else {
+                Some(value.value)
+            }
+        } else {
+            None
+        };
+
+        Ok(value)
+    }
+}
+
+impl SharedCache {
+    pub fn new<P: AsRef<Path>>(base_path: P, options: CreateOptions) -> Result<Self, Error> {
+        proxmox_sys::fs::create_path(
+            base_path.as_ref(),
+            Some(options.clone()),
+            Some(options.clone()),
+        )?;
+
+        Ok(SharedCache {
+            base_path: base_path.as_ref().to_owned(),
+            time_provider: Box::new(DefaultTimeProvider),
+            create_options: options,
+        })
+    }
+
+    fn enforce_safe_key(key: &str) -> Result<(), Error> {
+        let safe_id_regex = SAFE_ID_FORMAT.unwrap_pattern_format();
+        if safe_id_regex.is_match(key) {
+            Ok(())
+        } else {
+            bail!("invalid key format")
+        }
+    }
+
+    fn get_path_for_key(&self, key: &str) -> Result<PathBuf, Error> {
+        Self::enforce_safe_key(key)?;
+        let mut path = self.base_path.join(key);
+        path.set_extension("json");
+        Ok(path)
+    }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct CachedItem {
+    value: Value,
+    added_at: i64,
+    expires_in: Option<i64>,
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::cell::Cell;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_basic_set_and_get() {
+        let cache = TestCache::new();
+        cache
+            .cache
+            .set("foo", Value::String("bar".into()), None)
+            .unwrap();
+
+        assert_eq!(
+            cache.cache.get("foo").unwrap(),
+            Some(Value::String("bar".into()))
+        );
+        assert!(cache.cache.get("notthere").unwrap().is_none());
+    }
+
+    #[derive(Clone)]
+    struct MockTimeProvider {
+        current_time: Arc<Cell<i64>>,
+    }
+
+    impl TimeProvider for MockTimeProvider {
+        fn now(&self) -> i64 {
+            self.current_time.get()
+        }
+    }
+
+    impl MockTimeProvider {
+        fn elapse_time(&self, duration: i64) {
+            let now = self.current_time.get();
+            self.current_time.set(now + duration);
+        }
+    }
+
+    impl Default for MockTimeProvider {
+        fn default() -> Self {
+            Self {
+                current_time: Arc::new(Cell::new(0)),
+            }
+        }
+    }
+
+    struct TestCache {
+        cache: SharedCache,
+        time: MockTimeProvider,
+        path: PathBuf,
+    }
+
+    impl TestCache {
+        fn new() -> Self {
+            let path = proxmox_sys::fs::make_tmp_dir("/tmp/", None).unwrap();
+
+            let options = CreateOptions::new()
+                .owner(nix::unistd::Uid::effective())
+                .group(nix::unistd::Gid::effective())
+                .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
+
+            let mut cache = SharedCache::new(&path, options).unwrap();
+            let time = MockTimeProvider::default();
+
+            cache.time_provider = Box::new(time.clone());
+
+            Self { cache, time, path }
+        }
+    }
+
+    impl Drop for TestCache {
+        fn drop(&mut self) {
+            let _ = std::fs::remove_dir_all(&self.path);
+        }
+    }
+
+    #[test]
+    fn test_expiry() {
+        let cache = TestCache::new();
+
+        cache
+            .cache
+            .set("expiring", Value::String("bar".into()), Some(10))
+            .unwrap();
+        assert!(cache.cache.get("expiring").unwrap().is_some());
+
+        cache.time.elapse_time(9);
+        assert!(cache.cache.get("expiring").unwrap().is_some());
+        cache.time.elapse_time(2);
+        assert!(cache.cache.get("expiring").unwrap().is_none());
+    }
+
+    #[test]
+    fn test_backwards_time_jump() {
+        let cache = TestCache::new();
+
+        cache.time.elapse_time(50);
+        cache
+            .cache
+            .set("future", Value::String("bar".into()), Some(10))
+            .unwrap();
+        cache.time.elapse_time(-20);
+        assert!(cache.cache.get("future").unwrap().is_none());
+    }
+
+    #[test]
+    fn test_invalid_keys() {
+        let cache = TestCache::new();
+
+        assert!(cache
+            .cache
+            .set("../escape_base", Value::Null, None)
+            .is_err());
+        assert!(cache
+            .cache
+            .set("bjørnen drikker øl", Value::Null, None)
+            .is_err());
+        assert!(cache.cache.set("test space", Value::Null, None).is_err());
+        assert!(cache.cache.set("~/foo", Value::Null, None).is_err());
+    }
+}
-- 
2.39.2






More information about the pve-devel mailing list