[pve-devel] [RFC proxmox 4/7] cache: add new crate 'proxmox-cache'

Max Carrara m.carrara at proxmox.com
Tue Aug 22 12:08:38 CEST 2023


On 8/21/23 15:44, Lukas Wagner wrote:
> For now, it contains a file-backed cache with expiration logic.
> The cache should be safe to be accessed from multiple processes at
> once.
> 

This seems pretty neat! The cache implementation seems straightforward
enough. I'll see if I can test it more thoroughly later.

However, in my opinion we should have a crate like
"proxmox-collections" (or something of the sort) with modules for each
data structure / collection similar to the standard library; I'm
curious what others think about that. imo it would be a great
opportunity to introduce that crate in this series, since you're
already introducing one for the cache anyway.

So, proxmox-collections would look something like this:

  proxmox-collections
  └── src
      ├── cache
      │   ├── mod.rs
      │   └── shared_cache.rs
      └── lib.rs

Let me know what you think!

> The cache stores values in a directory, based on the key.
> E.g. key "foo" results in a file 'foo.json' in the given base
> directory. If a new value is set, the file is atomically replaced.
> The JSON file also contains some metadata, namely 'added_at' and
> 'expire_in' - they are used for cache expiration.
> 
> Note: This cache is not suited to applications that
>  - Might want to cache huge amounts of data, and/or access the cache
>    very frequently (due to the overhead of JSON de/serialization)
>  - Require arbitrary keys - right now, keys are limited by
>    SAFE_ID_REGEX
> 
> The cache was developed for the use in pvestatd, in order to cache
> e.g. storage plugin status. There, these limitations do not really
> play any role.
> 
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
>  Cargo.toml                            |   1 +
>  proxmox-cache/Cargo.toml              |  20 ++
>  proxmox-cache/examples/performance.rs |  82 ++++++++
>  proxmox-cache/src/lib.rs              |  40 ++++
>  proxmox-cache/src/shared_cache.rs     | 263 ++++++++++++++++++++++++++
>  5 files changed, 406 insertions(+)
>  create mode 100644 proxmox-cache/Cargo.toml
>  create mode 100644 proxmox-cache/examples/performance.rs
>  create mode 100644 proxmox-cache/src/lib.rs
>  create mode 100644 proxmox-cache/src/shared_cache.rs
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index e334ac1..940e1d0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -5,6 +5,7 @@ members = [
>      "proxmox-async",
>      "proxmox-auth-api",
>      "proxmox-borrow",
> +    "proxmox-cache",
>      "proxmox-client",
>      "proxmox-compression",
>      "proxmox-http",
> diff --git a/proxmox-cache/Cargo.toml b/proxmox-cache/Cargo.toml
> new file mode 100644
> index 0000000..b20921f
> --- /dev/null
> +++ b/proxmox-cache/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-cache"
> +version = "0.1.0"
> +authors.workspace = true
> +edition.workspace = true
> +license.workspace = true
> +repository.workspace = true
> +exclude.workspace = true
> +description = "Cache implementations"
> +
> +[dependencies]
> +anyhow.workspace = true
> +proxmox-sys.workspace = true
> +proxmox-time.workspace = true
> +proxmox-schema = { workspace = true, features = ["api-types"]}
> +serde_json.workspace = true
> +serde = { workspace = true, features = ["derive"]}
> +
> +[dev-dependencies]
> +nix.workspace = true
> diff --git a/proxmox-cache/examples/performance.rs b/proxmox-cache/examples/performance.rs
> new file mode 100644
> index 0000000..420f61c
> --- /dev/null
> +++ b/proxmox-cache/examples/performance.rs
> @@ -0,0 +1,82 @@
> +use proxmox_cache::Cache;
> +use proxmox_cache::SharedCache;
> +use proxmox_sys::fs::CreateOptions;
> +use std::time::Instant;
> +
> +fn main() {
> +    let options = CreateOptions::new()
> +        .owner(nix::unistd::Uid::effective())
> +        .group(nix::unistd::Gid::effective())
> +        .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> +    let cache = SharedCache::new("/tmp/pmx-cache", options).unwrap();
> +
> +    let mut keys = Vec::new();
> +
> +    for i in 0..100000 {
> +        keys.push(format!("key_{i}"));
> +    }
> +
> +    let data = serde_json::json!({
> +        "member1": "foo",
> +        "member2": "foo",
> +        "member3": "foo",
> +        "member4": "foo",
> +        "member5": "foo",
> +        "member5": "foo",
> +        "member6": "foo",
> +        "member7": "foo",
> +        "member8": "foo",
> +        "array": [10, 20, 30, 40, 50],
> +        "object": {
> +            "member1": "foo",
> +            "member2": "foo",
> +            "member3": "foo",
> +            "member4": "foo",
> +            "member5": "foo",
> +            "member5": "foo",
> +            "member6": "foo",
> +            "member7": "foo",
> +            "member8": "foo",
> +        }
> +    });
> +
> +    let before = Instant::now();
> +
> +    for key in &keys {
> +        cache
> +            .set(key, data.clone(), None)
> +            .expect("could not insert value");
> +    }
> +
> +    let time = Instant::now() - before;
> +    let time_per_op = time / keys.len() as u32;
> +    println!(
> +        "inserting {len} keys took {time:?} ({time_per_op:?} per key)",
> +        len = keys.len(),
> +    );
> +
> +    let before = Instant::now();
> +    for key in &keys {
> +        let _ = cache.get(key).expect("could not get value");
> +    }
> +
> +    let time = Instant::now() - before;
> +    let time_per_op = time / keys.len() as u32;
> +    println!(
> +        "getting {len} keys took {time:?} ({time_per_op:?} per key)",
> +        len = keys.len(),
> +    );
> +
> +    let before = Instant::now();
> +    for key in &keys {
> +        cache.delete(key).expect("could not delete value");
> +    }
> +
> +    let time = Instant::now() - before;
> +    let time_per_op = time / keys.len() as u32;
> +    println!(
> +        "deleting {len} keys took {time:?} ({time_per_op:?} per key)",
> +        len = keys.len(),
> +    );
> +}
> diff --git a/proxmox-cache/src/lib.rs b/proxmox-cache/src/lib.rs
> new file mode 100644
> index 0000000..d496dc7
> --- /dev/null
> +++ b/proxmox-cache/src/lib.rs
> @@ -0,0 +1,40 @@
> +use anyhow::Error;
> +use serde_json::Value;
> +
> +pub mod shared_cache;
> +
> +pub use shared_cache::SharedCache;
> +
> +trait TimeProvider {
> +    /// Returns the current time as a UNIX epoch (second resolution)
> +    fn now(&self) -> i64;
> +}
> +
> +struct DefaultTimeProvider;
> +
> +impl TimeProvider for DefaultTimeProvider {
> +    fn now(&self) -> i64 {
> +        proxmox_time::epoch_i64()
> +    }
> +}
> +
> +pub trait Cache {
> +    /// Set or insert a cache entry.
> +    ///
> +    /// If `expires_in` is set, this entry will expire in the desired number of
> +    /// seconds.
> +    fn set<S: AsRef<str>>(
> +        &self,
> +        key: S,
> +        value: Value,
> +        expires_in: Option<i64>,
> +    ) -> Result<(), Error>;
> +
> +    /// Delete a cache entry.
> +    fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error>;
> +
> +    /// Get a value from the cache.
> +    ///
> +    /// Expired entries will *not* be returned.
> +    fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error>;
> +}

I don't necessarily think that a trait would be necessary in this
case, as there's not really any other structure (that can be used as
caching mechanism) that you're abstracting over. (more below)

> diff --git a/proxmox-cache/src/shared_cache.rs b/proxmox-cache/src/shared_cache.rs
> new file mode 100644
> index 0000000..be6212c
> --- /dev/null
> +++ b/proxmox-cache/src/shared_cache.rs
> @@ -0,0 +1,263 @@
> +use std::path::{Path, PathBuf};
> +
> +use anyhow::{bail, Error};
> +use serde::{Deserialize, Serialize};
> +use serde_json::Value;
> +
> +use proxmox_schema::api_types::SAFE_ID_FORMAT;
> +use proxmox_sys::fs::CreateOptions;
> +
> +use crate::{Cache, DefaultTimeProvider, TimeProvider};
> +
> +/// A simple, file-backed cache that can be used from multiple processes concurrently.
> +///
> +/// Cache entries are stored as individual files inside a base directory. For instance,
> +/// a cache entry with the key 'disk_stats' will result in a file 'disk_stats.json' inside
> +/// the base directory. As the extension implies, the cached data will be stored as a JSON
> +/// string.
> +///
> +/// For optimal performance, `SharedCache` should have its base directory in a `tmpfs`.
> +///
> +/// ## Key Space
> +/// Due to the fact that cache keys are being directly used as filenames, they have to match the
> +/// following regular expression: `[A-Za-z0-9_][A-Za-z0-9._\-]*`
> +///
> +/// ## Concurrency
> +/// All cache operations are based on atomic file operations, thus accessing/updating the cache from
> +/// multiple processes at the same time is safe.
> +///
> +/// ## Performance
> +/// On a tmpfs:
> +/// ```sh
> +///   $ cargo run --release --example=performance
> +///   inserting 100000 keys took 896.609758ms (8.966µs per key)
> +///   getting 100000 keys took 584.874842ms (5.848µs per key)
> +///   deleting 100000 keys took 247.742702ms (2.477µs per key)
> +///
> +/// Inserting/getting large objects might of course result in lower performance due to the cost
> +/// of serialization.
> +/// ```
> +///
> +pub struct SharedCache {
> +    base_path: PathBuf,
> +    time_provider: Box<dyn TimeProvider>,
> +    create_options: CreateOptions,
> +}

Instead, this should be generic:

pub struct SharedCache<K, V> { ... }

.. and maybe rename it to SharedFileCache to make it explicit that this
operates on a file. (but that's more dependent on one's taste tbh)

.. and the impl block below ...

> +
> +impl Cache for SharedCache {
> +    fn set<S: AsRef<str>>(
> +        &self,
> +        key: S,
> +        value: Value,
> +        expires_in: Option<i64>,
> +    ) -> Result<(), Error> {
> +        let path = self.get_path_for_key(key.as_ref())?;
> +        let added_at = self.time_provider.now();
> +
> +        let item = CachedItem {
> +            value,
> +            added_at,
> +            expires_in,
> +        };
> +
> +        let serialized = serde_json::to_vec_pretty(&item)?;
> +
> +        // Atomically replace file
> +        proxmox_sys::fs::replace_file(path, &serialized, self.create_options.clone(), true)?;
> +        Ok(())
> +    }
> +
> +    fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error> {
> +        let path = self.get_path_for_key(key.as_ref())?;
> +        std::fs::remove_file(path)?;
> +        Ok(())
> +    }
> +
> +    fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error> {
> +        let path = self.get_path_for_key(key.as_ref())?;
> +
> +        let value = if let Some(content) = proxmox_sys::fs::file_get_optional_contents(path)? {
> +            let value: CachedItem = serde_json::from_slice(&content)?;
> +
> +            let now = self.time_provider.now();
> +
> +            if let Some(expires_in) = value.expires_in {
> +                // Check if value is not expired yet. Also do not allow
> +                // values from the future, in case we have clock jumps
> +                if value.added_at + expires_in > now && value.added_at <= now {
> +                    Some(value.value)
> +                } else {
> +                    None
> +                }
> +            } else {
> +                Some(value.value)
> +            }
> +        } else {
> +            None
> +        };
> +
> +        Ok(value)
> +    }
> +}

... can be replaced as follows, in order to make it similar to
std::collections::{HashMap, BTreeMap}:

impl<K: AsRef<str>> for SharedCache<K, Value> {
    // Returns old value on successful insert, if given
    fn insert(&self, k: K, v: Value) -> Result<Option<Value>, Error> {
        // ...
    }

    fn get(&self, k: K) -> Result<Option<Value>, Error> {
        // ...
    }

    fn remove(&self, k: K) -> Result<Option<Value>, Error> {
        // ...
    }
}

If necessary / sensible, other methods (inspired by {HashMap, BTreeMap} can
be added as well, such as remove_entry, retain, clear, etc.


> +
> +impl SharedCache {
> +    pub fn new<P: AsRef<Path>>(base_path: P, options: CreateOptions) -> Result<Self, Error> {
> +        proxmox_sys::fs::create_path(
> +            base_path.as_ref(),
> +            Some(options.clone()),
> +            Some(options.clone()),
> +        )?;
> +
> +        Ok(SharedCache {
> +            base_path: base_path.as_ref().to_owned(),
> +            time_provider: Box::new(DefaultTimeProvider),
> +            create_options: options,
> +        })
> +    }
> +
> +    fn enforce_safe_key(key: &str) -> Result<(), Error> {
> +        let safe_id_regex = SAFE_ID_FORMAT.unwrap_pattern_format();
> +        if safe_id_regex.is_match(key) {
> +            Ok(())
> +        } else {
> +            bail!("invalid key format")
> +        }
> +    }
> +
> +    fn get_path_for_key(&self, key: &str) -> Result<PathBuf, Error> {
> +        Self::enforce_safe_key(key)?;
> +        let mut path = self.base_path.join(key);
> +        path.set_extension("json");
> +        Ok(path)
> +    }
> +}
> +
> +#[derive(Debug, Clone, Serialize, Deserialize)]
> +struct CachedItem {
> +    value: Value,
> +    added_at: i64,
> +    expires_in: Option<i64>,
> +}
> +

... and for completion's sake: This can stay, as it's specific to the
alternative implementation I've written above.

All in all, I think this would make your implementation more flexible.
Let me know what you think!

> +#[cfg(test)]
> +mod tests {
> +    use super::*;
> +    use std::cell::Cell;
> +    use std::sync::Arc;
> +
> +    #[test]
> +    fn test_basic_set_and_get() {
> +        let cache = TestCache::new();
> +        cache
> +            .cache
> +            .set("foo", Value::String("bar".into()), None)
> +            .unwrap();
> +
> +        assert_eq!(
> +            cache.cache.get("foo").unwrap(),
> +            Some(Value::String("bar".into()))
> +        );
> +        assert!(cache.cache.get("notthere").unwrap().is_none());
> +    }
> +
> +    #[derive(Clone)]
> +    struct MockTimeProvider {
> +        current_time: Arc<Cell<i64>>,
> +    }
> +
> +    impl TimeProvider for MockTimeProvider {
> +        fn now(&self) -> i64 {
> +            self.current_time.get()
> +        }
> +    }
> +
> +    impl MockTimeProvider {
> +        fn elapse_time(&self, duration: i64) {
> +            let now = self.current_time.get();
> +            self.current_time.set(now + duration);
> +        }
> +    }
> +
> +    impl Default for MockTimeProvider {
> +        fn default() -> Self {
> +            Self {
> +                current_time: Arc::new(Cell::new(0)),
> +            }
> +        }
> +    }
> +
> +    struct TestCache {
> +        cache: SharedCache,
> +        time: MockTimeProvider,
> +        path: PathBuf,
> +    }
> +
> +    impl TestCache {
> +        fn new() -> Self {
> +            let path = proxmox_sys::fs::make_tmp_dir("/tmp/", None).unwrap();
> +
> +            let options = CreateOptions::new()
> +                .owner(nix::unistd::Uid::effective())
> +                .group(nix::unistd::Gid::effective())
> +                .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
> +
> +            let mut cache = SharedCache::new(&path, options).unwrap();
> +            let time = MockTimeProvider::default();
> +
> +            cache.time_provider = Box::new(time.clone());
> +
> +            Self { cache, time, path }
> +        }
> +    }
> +
> +    impl Drop for TestCache {
> +        fn drop(&mut self) {
> +            let _ = std::fs::remove_dir_all(&self.path);
> +        }
> +    }
> +
> +    #[test]
> +    fn test_expiry() {
> +        let cache = TestCache::new();
> +
> +        cache
> +            .cache
> +            .set("expiring", Value::String("bar".into()), Some(10))
> +            .unwrap();
> +        assert!(cache.cache.get("expiring").unwrap().is_some());
> +
> +        cache.time.elapse_time(9);
> +        assert!(cache.cache.get("expiring").unwrap().is_some());
> +        cache.time.elapse_time(2);
> +        assert!(cache.cache.get("expiring").unwrap().is_none());
> +    }
> +
> +    #[test]
> +    fn test_backwards_time_jump() {
> +        let cache = TestCache::new();
> +
> +        cache.time.elapse_time(50);
> +        cache
> +            .cache
> +            .set("future", Value::String("bar".into()), Some(10))
> +            .unwrap();
> +        cache.time.elapse_time(-20);
> +        assert!(cache.cache.get("future").unwrap().is_none());
> +    }
> +
> +    #[test]
> +    fn test_invalid_keys() {
> +        let cache = TestCache::new();
> +
> +        assert!(cache
> +            .cache
> +            .set("../escape_base", Value::Null, None)
> +            .is_err());
> +        assert!(cache
> +            .cache
> +            .set("bjørnen drikker øl", Value::Null, None)
> +            .is_err());
> +        assert!(cache.cache.set("test space", Value::Null, None).is_err());
> +        assert!(cache.cache.set("~/foo", Value::Null, None).is_err());
> +    }
> +}

[0]: https://doc.rust-lang.org/reference/items/traits.html#object-safety






More information about the pve-devel mailing list