[pve-devel] [RFC proxmox 4/7] cache: add new crate 'proxmox-cache'
Max Carrara
m.carrara at proxmox.com
Tue Aug 22 12:08:38 CEST 2023
On 8/21/23 15:44, Lukas Wagner wrote:
> For now, it contains a file-backed cache with expiration logic.
> The cache should be safe to be accessed from multiple processes at
> once.
>
This seems pretty neat! The cache implementation seems straightforward
enough. I'll see if I can test it more thoroughly later.
However, in my opinion we should have a crate like
"proxmox-collections" (or something of the sort) with modules for each
data structure / collection similar to the standard library; I'm
curious what others think about that. imo it would be a great
opportunity to introduce that crate in this series, since you're
already introducing one for the cache anyway.
So, proxmox-collections would look something like this:
proxmox-collections
└── src
├── cache
│ ├── mod.rs
│ └── shared_cache.rs
└── lib.rs
Let me know what you think!
> The cache stores values in a directory, based on the key.
> E.g. key "foo" results in a file 'foo.json' in the given base
> directory. If a new value is set, the file is atomically replaced.
> The JSON file also contains some metadata, namely 'added_at' and
> 'expire_in' - they are used for cache expiration.
>
> Note: This cache is not suited to applications that
> - Might want to cache huge amounts of data, and/or access the cache
> very frequently (due to the overhead of JSON de/serialization)
> - Require arbitrary keys - right now, keys are limited by
> SAFE_ID_REGEX
>
> The cache was developed for the use in pvestatd, in order to cache
> e.g. storage plugin status. There, these limitations do not really
> play any role.
>
> Signed-off-by: Lukas Wagner <l.wagner at proxmox.com>
> ---
> Cargo.toml | 1 +
> proxmox-cache/Cargo.toml | 20 ++
> proxmox-cache/examples/performance.rs | 82 ++++++++
> proxmox-cache/src/lib.rs | 40 ++++
> proxmox-cache/src/shared_cache.rs | 263 ++++++++++++++++++++++++++
> 5 files changed, 406 insertions(+)
> create mode 100644 proxmox-cache/Cargo.toml
> create mode 100644 proxmox-cache/examples/performance.rs
> create mode 100644 proxmox-cache/src/lib.rs
> create mode 100644 proxmox-cache/src/shared_cache.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> index e334ac1..940e1d0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -5,6 +5,7 @@ members = [
> "proxmox-async",
> "proxmox-auth-api",
> "proxmox-borrow",
> + "proxmox-cache",
> "proxmox-client",
> "proxmox-compression",
> "proxmox-http",
> diff --git a/proxmox-cache/Cargo.toml b/proxmox-cache/Cargo.toml
> new file mode 100644
> index 0000000..b20921f
> --- /dev/null
> +++ b/proxmox-cache/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-cache"
> +version = "0.1.0"
> +authors.workspace = true
> +edition.workspace = true
> +license.workspace = true
> +repository.workspace = true
> +exclude.workspace = true
> +description = "Cache implementations"
> +
> +[dependencies]
> +anyhow.workspace = true
> +proxmox-sys.workspace = true
> +proxmox-time.workspace = true
> +proxmox-schema = { workspace = true, features = ["api-types"]}
> +serde_json.workspace = true
> +serde = { workspace = true, features = ["derive"]}
> +
> +[dev-dependencies]
> +nix.workspace = true
> diff --git a/proxmox-cache/examples/performance.rs b/proxmox-cache/examples/performance.rs
> new file mode 100644
> index 0000000..420f61c
> --- /dev/null
> +++ b/proxmox-cache/examples/performance.rs
> @@ -0,0 +1,82 @@
> +use proxmox_cache::Cache;
> +use proxmox_cache::SharedCache;
> +use proxmox_sys::fs::CreateOptions;
> +use std::time::Instant;
> +
> +fn main() {
> + let options = CreateOptions::new()
> + .owner(nix::unistd::Uid::effective())
> + .group(nix::unistd::Gid::effective())
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o755));
> +
> + let cache = SharedCache::new("/tmp/pmx-cache", options).unwrap();
> +
> + let mut keys = Vec::new();
> +
> + for i in 0..100000 {
> + keys.push(format!("key_{i}"));
> + }
> +
> + let data = serde_json::json!({
> + "member1": "foo",
> + "member2": "foo",
> + "member3": "foo",
> + "member4": "foo",
> + "member5": "foo",
> + "member5": "foo",
> + "member6": "foo",
> + "member7": "foo",
> + "member8": "foo",
> + "array": [10, 20, 30, 40, 50],
> + "object": {
> + "member1": "foo",
> + "member2": "foo",
> + "member3": "foo",
> + "member4": "foo",
> + "member5": "foo",
> + "member5": "foo",
> + "member6": "foo",
> + "member7": "foo",
> + "member8": "foo",
> + }
> + });
> +
> + let before = Instant::now();
> +
> + for key in &keys {
> + cache
> + .set(key, data.clone(), None)
> + .expect("could not insert value");
> + }
> +
> + let time = Instant::now() - before;
> + let time_per_op = time / keys.len() as u32;
> + println!(
> + "inserting {len} keys took {time:?} ({time_per_op:?} per key)",
> + len = keys.len(),
> + );
> +
> + let before = Instant::now();
> + for key in &keys {
> + let _ = cache.get(key).expect("could not get value");
> + }
> +
> + let time = Instant::now() - before;
> + let time_per_op = time / keys.len() as u32;
> + println!(
> + "getting {len} keys took {time:?} ({time_per_op:?} per key)",
> + len = keys.len(),
> + );
> +
> + let before = Instant::now();
> + for key in &keys {
> + cache.delete(key).expect("could not delete value");
> + }
> +
> + let time = Instant::now() - before;
> + let time_per_op = time / keys.len() as u32;
> + println!(
> + "deleting {len} keys took {time:?} ({time_per_op:?} per key)",
> + len = keys.len(),
> + );
> +}
> diff --git a/proxmox-cache/src/lib.rs b/proxmox-cache/src/lib.rs
> new file mode 100644
> index 0000000..d496dc7
> --- /dev/null
> +++ b/proxmox-cache/src/lib.rs
> @@ -0,0 +1,40 @@
> +use anyhow::Error;
> +use serde_json::Value;
> +
> +pub mod shared_cache;
> +
> +pub use shared_cache::SharedCache;
> +
> +trait TimeProvider {
> + /// Returns the current time as a UNIX epoch (second resolution)
> + fn now(&self) -> i64;
> +}
> +
> +struct DefaultTimeProvider;
> +
> +impl TimeProvider for DefaultTimeProvider {
> + fn now(&self) -> i64 {
> + proxmox_time::epoch_i64()
> + }
> +}
> +
> +pub trait Cache {
> + /// Set or insert a cache entry.
> + ///
> + /// If `expires_in` is set, this entry will expire in the desired number of
> + /// seconds.
> + fn set<S: AsRef<str>>(
> + &self,
> + key: S,
> + value: Value,
> + expires_in: Option<i64>,
> + ) -> Result<(), Error>;
> +
> + /// Delete a cache entry.
> + fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error>;
> +
> + /// Get a value from the cache.
> + ///
> + /// Expired entries will *not* be returned.
> + fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error>;
> +}
I don't necessarily think that a trait would be necessary in this
case, as there's not really any other structure (that can be used as
caching mechanism) that you're abstracting over. (more below)
> diff --git a/proxmox-cache/src/shared_cache.rs b/proxmox-cache/src/shared_cache.rs
> new file mode 100644
> index 0000000..be6212c
> --- /dev/null
> +++ b/proxmox-cache/src/shared_cache.rs
> @@ -0,0 +1,263 @@
> +use std::path::{Path, PathBuf};
> +
> +use anyhow::{bail, Error};
> +use serde::{Deserialize, Serialize};
> +use serde_json::Value;
> +
> +use proxmox_schema::api_types::SAFE_ID_FORMAT;
> +use proxmox_sys::fs::CreateOptions;
> +
> +use crate::{Cache, DefaultTimeProvider, TimeProvider};
> +
> +/// A simple, file-backed cache that can be used from multiple processes concurrently.
> +///
> +/// Cache entries are stored as individual files inside a base directory. For instance,
> +/// a cache entry with the key 'disk_stats' will result in a file 'disk_stats.json' inside
> +/// the base directory. As the extension implies, the cached data will be stored as a JSON
> +/// string.
> +///
> +/// For optimal performance, `SharedCache` should have its base directory in a `tmpfs`.
> +///
> +/// ## Key Space
> +/// Due to the fact that cache keys are being directly used as filenames, they have to match the
> +/// following regular expression: `[A-Za-z0-9_][A-Za-z0-9._\-]*`
> +///
> +/// ## Concurrency
> +/// All cache operations are based on atomic file operations, thus accessing/updating the cache from
> +/// multiple processes at the same time is safe.
> +///
> +/// ## Performance
> +/// On a tmpfs:
> +/// ```sh
> +/// $ cargo run --release --example=performance
> +/// inserting 100000 keys took 896.609758ms (8.966µs per key)
> +/// getting 100000 keys took 584.874842ms (5.848µs per key)
> +/// deleting 100000 keys took 247.742702ms (2.477µs per key)
> +///
> +/// Inserting/getting large objects might of course result in lower performance due to the cost
> +/// of serialization.
> +/// ```
> +///
> +pub struct SharedCache {
> + base_path: PathBuf,
> + time_provider: Box<dyn TimeProvider>,
> + create_options: CreateOptions,
> +}
Instead, this should be generic:
pub struct SharedCache<K, V> { ... }
.. and maybe rename it to SharedFileCache to make it explicit that this
operates on a file. (but that's more dependent on one's taste tbh)
.. and the impl block below ...
> +
> +impl Cache for SharedCache {
> + fn set<S: AsRef<str>>(
> + &self,
> + key: S,
> + value: Value,
> + expires_in: Option<i64>,
> + ) -> Result<(), Error> {
> + let path = self.get_path_for_key(key.as_ref())?;
> + let added_at = self.time_provider.now();
> +
> + let item = CachedItem {
> + value,
> + added_at,
> + expires_in,
> + };
> +
> + let serialized = serde_json::to_vec_pretty(&item)?;
> +
> + // Atomically replace file
> + proxmox_sys::fs::replace_file(path, &serialized, self.create_options.clone(), true)?;
> + Ok(())
> + }
> +
> + fn delete<S: AsRef<str>>(&self, key: S) -> Result<(), Error> {
> + let path = self.get_path_for_key(key.as_ref())?;
> + std::fs::remove_file(path)?;
> + Ok(())
> + }
> +
> + fn get<S: AsRef<str>>(&self, key: S) -> Result<Option<Value>, Error> {
> + let path = self.get_path_for_key(key.as_ref())?;
> +
> + let value = if let Some(content) = proxmox_sys::fs::file_get_optional_contents(path)? {
> + let value: CachedItem = serde_json::from_slice(&content)?;
> +
> + let now = self.time_provider.now();
> +
> + if let Some(expires_in) = value.expires_in {
> + // Check if value is not expired yet. Also do not allow
> + // values from the future, in case we have clock jumps
> + if value.added_at + expires_in > now && value.added_at <= now {
> + Some(value.value)
> + } else {
> + None
> + }
> + } else {
> + Some(value.value)
> + }
> + } else {
> + None
> + };
> +
> + Ok(value)
> + }
> +}
... can be replaced as follows, in order to make it similar to
std::collections::{HashMap, BTreeMap}:
impl<K: AsRef<str>> for SharedCache<K, Value> {
// Returns old value on successful insert, if given
fn insert(&self, k: K, v: Value) -> Result<Option<Value>, Error> {
// ...
}
fn get(&self, k: K) -> Result<Option<Value>, Error> {
// ...
}
fn remove(&self, k: K) -> Result<Option<Value>, Error> {
// ...
}
}
If necessary / sensible, other methods (inspired by {HashMap, BTreeMap} can
be added as well, such as remove_entry, retain, clear, etc.
> +
> +impl SharedCache {
> + pub fn new<P: AsRef<Path>>(base_path: P, options: CreateOptions) -> Result<Self, Error> {
> + proxmox_sys::fs::create_path(
> + base_path.as_ref(),
> + Some(options.clone()),
> + Some(options.clone()),
> + )?;
> +
> + Ok(SharedCache {
> + base_path: base_path.as_ref().to_owned(),
> + time_provider: Box::new(DefaultTimeProvider),
> + create_options: options,
> + })
> + }
> +
> + fn enforce_safe_key(key: &str) -> Result<(), Error> {
> + let safe_id_regex = SAFE_ID_FORMAT.unwrap_pattern_format();
> + if safe_id_regex.is_match(key) {
> + Ok(())
> + } else {
> + bail!("invalid key format")
> + }
> + }
> +
> + fn get_path_for_key(&self, key: &str) -> Result<PathBuf, Error> {
> + Self::enforce_safe_key(key)?;
> + let mut path = self.base_path.join(key);
> + path.set_extension("json");
> + Ok(path)
> + }
> +}
> +
> +#[derive(Debug, Clone, Serialize, Deserialize)]
> +struct CachedItem {
> + value: Value,
> + added_at: i64,
> + expires_in: Option<i64>,
> +}
> +
... and for completion's sake: This can stay, as it's specific to the
alternative implementation I've written above.
All in all, I think this would make your implementation more flexible.
Let me know what you think!
> +#[cfg(test)]
> +mod tests {
> + use super::*;
> + use std::cell::Cell;
> + use std::sync::Arc;
> +
> + #[test]
> + fn test_basic_set_and_get() {
> + let cache = TestCache::new();
> + cache
> + .cache
> + .set("foo", Value::String("bar".into()), None)
> + .unwrap();
> +
> + assert_eq!(
> + cache.cache.get("foo").unwrap(),
> + Some(Value::String("bar".into()))
> + );
> + assert!(cache.cache.get("notthere").unwrap().is_none());
> + }
> +
> + #[derive(Clone)]
> + struct MockTimeProvider {
> + current_time: Arc<Cell<i64>>,
> + }
> +
> + impl TimeProvider for MockTimeProvider {
> + fn now(&self) -> i64 {
> + self.current_time.get()
> + }
> + }
> +
> + impl MockTimeProvider {
> + fn elapse_time(&self, duration: i64) {
> + let now = self.current_time.get();
> + self.current_time.set(now + duration);
> + }
> + }
> +
> + impl Default for MockTimeProvider {
> + fn default() -> Self {
> + Self {
> + current_time: Arc::new(Cell::new(0)),
> + }
> + }
> + }
> +
> + struct TestCache {
> + cache: SharedCache,
> + time: MockTimeProvider,
> + path: PathBuf,
> + }
> +
> + impl TestCache {
> + fn new() -> Self {
> + let path = proxmox_sys::fs::make_tmp_dir("/tmp/", None).unwrap();
> +
> + let options = CreateOptions::new()
> + .owner(nix::unistd::Uid::effective())
> + .group(nix::unistd::Gid::effective())
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o600));
> +
> + let mut cache = SharedCache::new(&path, options).unwrap();
> + let time = MockTimeProvider::default();
> +
> + cache.time_provider = Box::new(time.clone());
> +
> + Self { cache, time, path }
> + }
> + }
> +
> + impl Drop for TestCache {
> + fn drop(&mut self) {
> + let _ = std::fs::remove_dir_all(&self.path);
> + }
> + }
> +
> + #[test]
> + fn test_expiry() {
> + let cache = TestCache::new();
> +
> + cache
> + .cache
> + .set("expiring", Value::String("bar".into()), Some(10))
> + .unwrap();
> + assert!(cache.cache.get("expiring").unwrap().is_some());
> +
> + cache.time.elapse_time(9);
> + assert!(cache.cache.get("expiring").unwrap().is_some());
> + cache.time.elapse_time(2);
> + assert!(cache.cache.get("expiring").unwrap().is_none());
> + }
> +
> + #[test]
> + fn test_backwards_time_jump() {
> + let cache = TestCache::new();
> +
> + cache.time.elapse_time(50);
> + cache
> + .cache
> + .set("future", Value::String("bar".into()), Some(10))
> + .unwrap();
> + cache.time.elapse_time(-20);
> + assert!(cache.cache.get("future").unwrap().is_none());
> + }
> +
> + #[test]
> + fn test_invalid_keys() {
> + let cache = TestCache::new();
> +
> + assert!(cache
> + .cache
> + .set("../escape_base", Value::Null, None)
> + .is_err());
> + assert!(cache
> + .cache
> + .set("bjørnen drikker øl", Value::Null, None)
> + .is_err());
> + assert!(cache.cache.set("test space", Value::Null, None).is_err());
> + assert!(cache.cache.set("~/foo", Value::Null, None).is_err());
> + }
> +}
[0]: https://doc.rust-lang.org/reference/items/traits.html#object-safety
More information about the pve-devel
mailing list