[pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate
Kefu Chai
k.chai at proxmox.com
Tue Jan 6 15:24:28 CET 2026
Add RRD (Round-Robin Database) file persistence system:
- RrdWriter: Main API for RRD operations
- Schema definitions for CPU, memory, network metrics
- Format migration support (v1/v2/v3)
- rrdcached integration for batched writes
- Data transformation for legacy formats
This is an independent crate with no internal dependencies,
only requiring external RRD libraries (rrd, rrdcached-client)
and tokio for async operations. It handles time-series data
storage compatible with the C implementation.
Includes comprehensive unit tests for data transformation,
schema generation, and multi-source data processing.
Signed-off-by: Kefu Chai <k.chai at proxmox.com>
---
src/pmxcfs-rs/Cargo.toml | 1 +
src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml | 18 +
src/pmxcfs-rs/pmxcfs-rrd/README.md | 51 ++
src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs | 67 ++
.../pmxcfs-rrd/src/backend/backend_daemon.rs | 214 +++++++
.../pmxcfs-rrd/src/backend/backend_direct.rs | 606 ++++++++++++++++++
.../src/backend/backend_fallback.rs | 229 +++++++
src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs | 140 ++++
src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs | 313 +++++++++
src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs | 21 +
src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs | 577 +++++++++++++++++
src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs | 397 ++++++++++++
12 files changed, 2634 insertions(+)
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 4d17e87e..dd36c81f 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -4,6 +4,7 @@ members = [
"pmxcfs-api-types", # Shared types and error definitions
"pmxcfs-config", # Configuration management
"pmxcfs-logger", # Cluster log with ring buffer and deduplication
+ "pmxcfs-rrd", # RRD (Round-Robin Database) persistence
]
resolver = "2"
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
new file mode 100644
index 00000000..bab71423
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "pmxcfs-rrd"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+async-trait = "0.1"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+rrd = "0.2"
+rrdcached-client = "0.1.5"
+tokio.workspace = true
+tracing.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md
new file mode 100644
index 00000000..800d78cf
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
@@ -0,0 +1,51 @@
+# pmxcfs-rrd
+
+RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
+
+## Overview
+
+This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes.
+
+### Key Features
+
+- RRD file creation with schema-based initialization
+- RRD updates (write metrics to disk)
+- rrdcached integration for batched writes
+- Support for both legacy and current schema versions
+- Type-safe key parsing and validation
+- Compatible with existing C-created RRD files
+
+## Module Structure
+
+| Module | Purpose |
+|--------|---------|
+| `writer.rs` | Main RrdWriter API |
+| `schema.rs` | RRD schema definitions (DS, RRA) |
+| `key_type.rs` | RRD key parsing and validation |
+| `daemon.rs` | rrdcached daemon client |
+
+## External Dependencies
+
+- **librrd**: RRDtool library (via FFI bindings)
+- **rrdcached**: Optional daemon for batched writes and improved performance
+
+## Testing
+
+Unit tests verify:
+- Schema generation and validation
+- Key parsing for different RRD types (node, VM, storage)
+- RRD file creation and update operations
+- rrdcached client connection and fallback behavior
+
+Run tests with:
+```bash
+cargo test -p pmxcfs-rrd
+```
+
+## References
+
+- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
+- **Related Crates**:
+ - `pmxcfs-status` - Uses RrdWriter for metrics persistence
+ - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
+- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
new file mode 100644
index 00000000..58652831
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
@@ -0,0 +1,67 @@
+/// RRD Backend Trait and Implementations
+///
+/// This module provides an abstraction over different RRD writing mechanisms:
+/// - Daemon-based (via rrdcached) for performance and batching
+/// - Direct file writing for reliability and fallback scenarios
+/// - Fallback composite that tries daemon first, then falls back to direct
+///
+/// This design matches the C implementation's behavior in status.c where
+/// it attempts daemon update first, then falls back to direct file writes.
+use super::schema::RrdSchema;
+use anyhow::Result;
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Trait for RRD backend implementations
+///
+/// Provides abstraction over different RRD writing mechanisms.
+/// All methods are async to support both async (daemon) and sync (direct file) operations.
+#[async_trait]
+pub trait RrdBackend: Send + Sync {
+ /// Update RRD file with new data
+ ///
+ /// # Arguments
+ /// * `file_path` - Full path to the RRD file
+ /// * `data` - Update data in format "timestamp:value1:value2:..."
+ async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
+
+ /// Create new RRD file with schema
+ ///
+ /// # Arguments
+ /// * `file_path` - Full path where RRD file should be created
+ /// * `schema` - RRD schema defining data sources and archives
+ /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
+ async fn create(
+ &mut self,
+ file_path: &Path,
+ schema: &RrdSchema,
+ start_timestamp: i64,
+ ) -> Result<()>;
+
+ /// Flush pending updates to disk
+ ///
+ /// For daemon backends, this sends a FLUSH command.
+ /// For direct backends, this is a no-op (writes are immediate).
+ #[allow(dead_code)] // Used in backend implementations via trait dispatch
+ async fn flush(&mut self) -> Result<()>;
+
+ /// Check if backend is available and healthy
+ ///
+ /// Returns true if the backend can be used for operations.
+ /// For daemon backends, this checks if the connection is alive.
+ /// For direct backends, this always returns true.
+ #[allow(dead_code)] // Used in fallback backend via trait dispatch
+ async fn is_available(&self) -> bool;
+
+ /// Get a human-readable name for this backend
+ fn name(&self) -> &str;
+}
+
+// Backend implementations
+mod backend_daemon;
+mod backend_direct;
+mod backend_fallback;
+
+pub use backend_daemon::RrdCachedBackend;
+pub use backend_direct::RrdDirectBackend;
+pub use backend_fallback::RrdFallbackBackend;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
new file mode 100644
index 00000000..28c1a99a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
@@ -0,0 +1,214 @@
+/// RRD Backend: rrdcached daemon
+///
+/// Uses rrdcached for batched, high-performance RRD updates.
+/// This is the preferred backend when the daemon is available.
+use super::super::schema::RrdSchema;
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use rrdcached_client::RRDCachedClient;
+use rrdcached_client::consolidation_function::ConsolidationFunction;
+use rrdcached_client::create::{
+ CreateArguments, CreateDataSource, CreateDataSourceType, CreateRoundRobinArchive,
+};
+use std::path::Path;
+
+/// RRD backend using rrdcached daemon
+pub struct RrdCachedBackend {
+ client: RRDCachedClient<tokio::net::UnixStream>,
+}
+
+impl RrdCachedBackend {
+ /// Connect to rrdcached daemon
+ ///
+ /// # Arguments
+ /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock)
+ pub async fn connect(socket_path: &str) -> Result<Self> {
+ let client = RRDCachedClient::connect_unix(socket_path)
+ .await
+ .with_context(|| format!("Failed to connect to rrdcached at {socket_path}"))?;
+
+ tracing::info!("Connected to rrdcached at {}", socket_path);
+
+ Ok(Self { client })
+ }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdCachedBackend {
+ async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+ // Parse the update data
+ let parts: Vec<&str> = data.split(':').collect();
+ if parts.len() < 2 {
+ anyhow::bail!("Invalid update data format: {data}");
+ }
+
+ let timestamp = if parts[0] == "N" {
+ None
+ } else {
+ Some(
+ parts[0]
+ .parse::<usize>()
+ .with_context(|| format!("Invalid timestamp: {}", parts[0]))?,
+ )
+ };
+
+ let values: Vec<f64> = parts[1..]
+ .iter()
+ .map(|v| {
+ if *v == "U" {
+ Ok(f64::NAN)
+ } else {
+ v.parse::<f64>()
+ .with_context(|| format!("Invalid value: {v}"))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Get file path without .rrd extension (rrdcached-client adds it)
+ let path_str = file_path.to_string_lossy();
+ let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str);
+
+ // Send update via rrdcached
+ self.client
+ .update(path_without_ext, timestamp, values)
+ .await
+ .with_context(|| format!("rrdcached update failed for {:?}", file_path))?;
+
+ tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data);
+
+ Ok(())
+ }
+
+ async fn create(
+ &mut self,
+ file_path: &Path,
+ schema: &RrdSchema,
+ start_timestamp: i64,
+ ) -> Result<()> {
+ tracing::debug!(
+ "Creating RRD file via daemon: {:?} with {} data sources",
+ file_path,
+ schema.column_count()
+ );
+
+ // Convert our data sources to rrdcached-client CreateDataSource objects
+ let mut data_sources = Vec::new();
+ for ds in &schema.data_sources {
+ let serie_type = match ds.ds_type {
+ "GAUGE" => CreateDataSourceType::Gauge,
+ "DERIVE" => CreateDataSourceType::Derive,
+ "COUNTER" => CreateDataSourceType::Counter,
+ "ABSOLUTE" => CreateDataSourceType::Absolute,
+ _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type),
+ };
+
+ // Parse min/max values
+ let minimum = if ds.min == "U" {
+ None
+ } else {
+ ds.min.parse().ok()
+ };
+ let maximum = if ds.max == "U" {
+ None
+ } else {
+ ds.max.parse().ok()
+ };
+
+ let data_source = CreateDataSource {
+ name: ds.name.to_string(),
+ minimum,
+ maximum,
+ heartbeat: ds.heartbeat as i64,
+ serie_type,
+ };
+
+ data_sources.push(data_source);
+ }
+
+ // Convert our RRA definitions to rrdcached-client CreateRoundRobinArchive objects
+ let mut archives = Vec::new();
+ for rra in &schema.archives {
+ // Parse RRA string: "RRA:AVERAGE:0.5:1:70"
+ let parts: Vec<&str> = rra.split(':').collect();
+ if parts.len() != 5 || parts[0] != "RRA" {
+ anyhow::bail!("Invalid RRA format: {rra}");
+ }
+
+ let consolidation_function = match parts[1] {
+ "AVERAGE" => ConsolidationFunction::Average,
+ "MIN" => ConsolidationFunction::Min,
+ "MAX" => ConsolidationFunction::Max,
+ "LAST" => ConsolidationFunction::Last,
+ _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]),
+ };
+
+ let xfiles_factor: f64 = parts[2]
+ .parse()
+ .with_context(|| format!("Invalid xff in RRA: {rra}"))?;
+ let steps: i64 = parts[3]
+ .parse()
+ .with_context(|| format!("Invalid steps in RRA: {rra}"))?;
+ let rows: i64 = parts[4]
+ .parse()
+ .with_context(|| format!("Invalid rows in RRA: {rra}"))?;
+
+ let archive = CreateRoundRobinArchive {
+ consolidation_function,
+ xfiles_factor,
+ steps,
+ rows,
+ };
+ archives.push(archive);
+ }
+
+ // Get path without .rrd extension (rrdcached-client adds it)
+ let path_str = file_path.to_string_lossy();
+ let path_without_ext = path_str
+ .strip_suffix(".rrd")
+ .unwrap_or(&path_str)
+ .to_string();
+
+ // Create CreateArguments
+ let create_args = CreateArguments {
+ path: path_without_ext,
+ data_sources,
+ round_robin_archives: archives,
+ start_timestamp: start_timestamp as u64,
+ step_seconds: 60, // 60-second step (1 minute resolution)
+ };
+
+ // Validate before sending
+ create_args.validate().context("Invalid CREATE arguments")?;
+
+ // Send CREATE command via rrdcached
+ self.client
+ .create(create_args)
+ .await
+ .with_context(|| format!("Failed to create RRD file via daemon: {file_path:?}"))?;
+
+ tracing::info!("Created RRD file via daemon: {:?} ({})", file_path, schema);
+
+ Ok(())
+ }
+
+ async fn flush(&mut self) -> Result<()> {
+ self.client
+ .flush_all()
+ .await
+ .context("Failed to flush rrdcached")?;
+
+ tracing::debug!("Flushed all pending RRD updates");
+
+ Ok(())
+ }
+
+ async fn is_available(&self) -> bool {
+ // For now, assume we're available if we have a client
+ // Could add a PING command in the future
+ true
+ }
+
+ fn name(&self) -> &str {
+ "rrdcached"
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
new file mode 100644
index 00000000..6be3eb5d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
@@ -0,0 +1,606 @@
+/// RRD Backend: Direct file writing
+///
+/// Uses the `rrd` crate (librrd bindings) for direct RRD file operations.
+/// This backend is used as a fallback when rrdcached is unavailable.
+///
+/// This matches the C implementation's behavior in status.c:1416-1420 where
+/// it falls back to rrd_update_r() and rrd_create_r() for direct file access.
+use super::super::schema::RrdSchema;
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use std::path::Path;
+use std::time::Duration;
+
+/// RRD backend using direct file operations via librrd
+pub struct RrdDirectBackend {
+ // Currently stateless, but kept as struct for future enhancements
+}
+
+impl RrdDirectBackend {
+ /// Create a new direct file backend
+ pub fn new() -> Self {
+ tracing::info!("Using direct RRD file backend (via librrd)");
+ Self {}
+ }
+}
+
+impl Default for RrdDirectBackend {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdDirectBackend {
+ async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+ let path = file_path.to_path_buf();
+ let data_str = data.to_string();
+
+ // Use tokio::task::spawn_blocking for sync rrd operations
+ // This prevents blocking the async runtime
+ tokio::task::spawn_blocking(move || {
+ // Parse the update data to extract timestamp and values
+ // Format: "timestamp:value1:value2:..."
+ let parts: Vec<&str> = data_str.split(':').collect();
+ if parts.is_empty() {
+ anyhow::bail!("Empty update data");
+ }
+
+ // Use rrd::ops::update::update_all_with_timestamp
+ // This is the most direct way to update RRD files
+ let timestamp_str = parts[0];
+ let timestamp: i64 = if timestamp_str == "N" {
+ // "N" means "now" in RRD terminology
+ chrono::Utc::now().timestamp()
+ } else {
+ timestamp_str
+ .parse()
+ .with_context(|| format!("Invalid timestamp: {}", timestamp_str))?
+ };
+
+ let timestamp = chrono::DateTime::from_timestamp(timestamp, 0)
+ .ok_or_else(|| anyhow::anyhow!("Invalid timestamp value: {}", timestamp))?;
+
+ // Convert values to Datum
+ let values: Vec<rrd::ops::update::Datum> = parts[1..]
+ .iter()
+ .map(|v| {
+ if *v == "U" {
+ // Unknown/unspecified value
+ rrd::ops::update::Datum::Unspecified
+ } else if let Ok(int_val) = v.parse::<u64>() {
+ rrd::ops::update::Datum::Int(int_val)
+ } else if let Ok(float_val) = v.parse::<f64>() {
+ rrd::ops::update::Datum::Float(float_val)
+ } else {
+ rrd::ops::update::Datum::Unspecified
+ }
+ })
+ .collect();
+
+ // Perform the update
+ rrd::ops::update::update_all(
+ &path,
+ rrd::ops::update::ExtraFlags::empty(),
+ &[(
+ rrd::ops::update::BatchTime::Timestamp(timestamp),
+ values.as_slice(),
+ )],
+ )
+ .with_context(|| format!("Direct RRD update failed for {:?}", path))?;
+
+ tracing::trace!("Updated RRD via direct file: {:?} -> {}", path, data_str);
+
+ Ok::<(), anyhow::Error>(())
+ })
+ .await
+ .context("Failed to spawn blocking task for RRD update")??;
+
+ Ok(())
+ }
+
+ async fn create(
+ &mut self,
+ file_path: &Path,
+ schema: &RrdSchema,
+ start_timestamp: i64,
+ ) -> Result<()> {
+ tracing::debug!(
+ "Creating RRD file via direct: {:?} with {} data sources",
+ file_path,
+ schema.column_count()
+ );
+
+ let path = file_path.to_path_buf();
+ let schema = schema.clone();
+
+ // Ensure parent directory exists
+ if let Some(parent) = path.parent() {
+ std::fs::create_dir_all(parent)
+ .with_context(|| format!("Failed to create directory: {parent:?}"))?;
+ }
+
+ // Use tokio::task::spawn_blocking for sync rrd operations
+ tokio::task::spawn_blocking(move || {
+ // Convert timestamp
+ let start = chrono::DateTime::from_timestamp(start_timestamp, 0)
+ .ok_or_else(|| anyhow::anyhow!("Invalid start timestamp: {}", start_timestamp))?;
+
+ // Convert data sources
+ let data_sources: Vec<rrd::ops::create::DataSource> = schema
+ .data_sources
+ .iter()
+ .map(|ds| {
+ let name = rrd::ops::create::DataSourceName::new(ds.name);
+
+ match ds.ds_type {
+ "GAUGE" => {
+ let min = if ds.min == "U" {
+ None
+ } else {
+ Some(ds.min.parse().context("Invalid min value")?)
+ };
+ let max = if ds.max == "U" {
+ None
+ } else {
+ Some(ds.max.parse().context("Invalid max value")?)
+ };
+ Ok(rrd::ops::create::DataSource::gauge(
+ name,
+ ds.heartbeat,
+ min,
+ max,
+ ))
+ }
+ "DERIVE" => {
+ let min = if ds.min == "U" {
+ None
+ } else {
+ Some(ds.min.parse().context("Invalid min value")?)
+ };
+ let max = if ds.max == "U" {
+ None
+ } else {
+ Some(ds.max.parse().context("Invalid max value")?)
+ };
+ Ok(rrd::ops::create::DataSource::derive(
+ name,
+ ds.heartbeat,
+ min,
+ max,
+ ))
+ }
+ "COUNTER" => {
+ let min = if ds.min == "U" {
+ None
+ } else {
+ Some(ds.min.parse().context("Invalid min value")?)
+ };
+ let max = if ds.max == "U" {
+ None
+ } else {
+ Some(ds.max.parse().context("Invalid max value")?)
+ };
+ Ok(rrd::ops::create::DataSource::counter(
+ name,
+ ds.heartbeat,
+ min,
+ max,
+ ))
+ }
+ "ABSOLUTE" => {
+ let min = if ds.min == "U" {
+ None
+ } else {
+ Some(ds.min.parse().context("Invalid min value")?)
+ };
+ let max = if ds.max == "U" {
+ None
+ } else {
+ Some(ds.max.parse().context("Invalid max value")?)
+ };
+ Ok(rrd::ops::create::DataSource::absolute(
+ name,
+ ds.heartbeat,
+ min,
+ max,
+ ))
+ }
+ _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type),
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert RRAs
+ let archives: Result<Vec<rrd::ops::create::Archive>> = schema
+ .archives
+ .iter()
+ .map(|rra| {
+ // Parse RRA string: "RRA:AVERAGE:0.5:1:1440"
+ let parts: Vec<&str> = rra.split(':').collect();
+ if parts.len() != 5 || parts[0] != "RRA" {
+ anyhow::bail!("Invalid RRA format: {}", rra);
+ }
+
+ let cf = match parts[1] {
+ "AVERAGE" => rrd::ConsolidationFn::Avg,
+ "MIN" => rrd::ConsolidationFn::Min,
+ "MAX" => rrd::ConsolidationFn::Max,
+ "LAST" => rrd::ConsolidationFn::Last,
+ _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]),
+ };
+
+ let xff: f64 = parts[2]
+ .parse()
+ .with_context(|| format!("Invalid xff in RRA: {}", rra))?;
+ let steps: u32 = parts[3]
+ .parse()
+ .with_context(|| format!("Invalid steps in RRA: {}", rra))?;
+ let rows: u32 = parts[4]
+ .parse()
+ .with_context(|| format!("Invalid rows in RRA: {}", rra))?;
+
+ rrd::ops::create::Archive::new(cf, xff, steps, rows)
+ .map_err(|e| anyhow::anyhow!("Failed to create archive: {}", e))
+ })
+ .collect();
+
+ let archives = archives?;
+
+ // Call rrd::ops::create::create
+ rrd::ops::create::create(
+ &path,
+ start,
+ Duration::from_secs(60), // 60-second step
+ false, // no_overwrite = false
+ None, // template
+ &[], // sources
+ data_sources.iter(),
+ archives.iter(),
+ )
+ .with_context(|| format!("Direct RRD create failed for {:?}", path))?;
+
+ tracing::info!("Created RRD file via direct: {:?} ({})", path, schema);
+
+ Ok::<(), anyhow::Error>(())
+ })
+ .await
+ .context("Failed to spawn blocking task for RRD create")??;
+
+ Ok(())
+ }
+
+ async fn flush(&mut self) -> Result<()> {
+ // No-op for direct backend - writes are immediate
+ tracing::trace!("Flush called on direct backend (no-op)");
+ Ok(())
+ }
+
+ async fn is_available(&self) -> bool {
+ // Direct backend is always available (no external dependencies)
+ true
+ }
+
+ fn name(&self) -> &str {
+ "direct"
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::backend::RrdBackend;
+ use crate::schema::{RrdFormat, RrdSchema};
+ use std::path::PathBuf;
+ use tempfile::TempDir;
+
+ // ===== Test Helpers =====
+
+ /// Create a temporary directory for RRD files
+ fn setup_temp_dir() -> TempDir {
+ TempDir::new().expect("Failed to create temp directory")
+ }
+
+ /// Create a test RRD file path
+ fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf {
+ dir.path().join(format!("{}.rrd", name))
+ }
+
+ // ===== RrdDirectBackend Tests =====
+
+ #[tokio::test]
+ async fn test_direct_backend_create_node_rrd() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "node_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let start_time = 1704067200; // 2024-01-01 00:00:00
+
+ // Create RRD file
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(
+ result.is_ok(),
+ "Failed to create node RRD: {:?}",
+ result.err()
+ );
+
+ // Verify file was created
+ assert!(rrd_path.exists(), "RRD file should exist after create");
+
+ // Verify backend name
+ assert_eq!(backend.name(), "direct");
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_create_vm_rrd() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "vm_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let start_time = 1704067200;
+
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(
+ result.is_ok(),
+ "Failed to create VM RRD: {:?}",
+ result.err()
+ );
+ assert!(rrd_path.exists());
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_create_storage_rrd() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "storage_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(
+ result.is_ok(),
+ "Failed to create storage RRD: {:?}",
+ result.err()
+ );
+ assert!(rrd_path.exists());
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_update_with_timestamp() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "update_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ // Create RRD file
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("Failed to create RRD");
+
+ // Update with explicit timestamp and values
+ // Format: "timestamp:value1:value2"
+ let update_data = "1704067260:1000000:500000"; // total=1MB, used=500KB
+ let result = backend.update(&rrd_path, update_data).await;
+
+ assert!(result.is_ok(), "Failed to update RRD: {:?}", result.err());
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_update_with_n_timestamp() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "update_n_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("Failed to create RRD");
+
+ // Update with "N" (current time) timestamp
+ let update_data = "N:2000000:750000";
+ let result = backend.update(&rrd_path, update_data).await;
+
+ assert!(
+ result.is_ok(),
+ "Failed to update RRD with N timestamp: {:?}",
+ result.err()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_update_with_unknown_values() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "update_u_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("Failed to create RRD");
+
+ // Update with "U" (unknown) values
+ let update_data = "N:U:1000000"; // total unknown, used known
+ let result = backend.update(&rrd_path, update_data).await;
+
+ assert!(
+ result.is_ok(),
+ "Failed to update RRD with U values: {:?}",
+ result.err()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_update_invalid_data() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "invalid_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("Failed to create RRD");
+
+ // Test truly invalid data formats that MUST fail
+ // Note: Invalid values like "abc" are converted to Unspecified (U), which is valid RRD behavior
+ let invalid_cases = vec![
+ "", // Empty string
+ ":", // Only separator
+ "timestamp", // Missing values
+ "N", // No colon separator
+ "abc:123:456", // Invalid timestamp (not N or integer)
+ ];
+
+ for invalid_data in invalid_cases {
+ let result = backend.update(&rrd_path, invalid_data).await;
+ assert!(
+ result.is_err(),
+ "Update should fail for invalid data: '{}', but got Ok",
+ invalid_data
+ );
+ }
+
+ // Test lenient data formats that succeed (invalid values become Unspecified)
+ // Use explicit timestamps to avoid "same timestamp" errors
+ let mut timestamp = start_time + 60;
+ let lenient_cases = vec![
+ "abc:456", // Invalid first value -> becomes U
+ "123:def", // Invalid second value -> becomes U
+ "U:U", // All unknown
+ ];
+
+ for valid_data in lenient_cases {
+ let update_data = format!("{}:{}", timestamp, valid_data);
+ let result = backend.update(&rrd_path, &update_data).await;
+ assert!(
+ result.is_ok(),
+ "Update should succeed for lenient data: '{}', but got Err: {:?}",
+ update_data,
+ result.err()
+ );
+ timestamp += 60; // Increment timestamp for next update
+ }
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_update_nonexistent_file() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "nonexistent");
+
+ let mut backend = RrdDirectBackend::new();
+
+ // Try to update a file that doesn't exist
+ let result = backend.update(&rrd_path, "N:100:200").await;
+
+ assert!(result.is_err(), "Update should fail for nonexistent file");
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_flush() {
+ let mut backend = RrdDirectBackend::new();
+
+ // Flush should always succeed for direct backend (no-op)
+ let result = backend.flush().await;
+ assert!(
+ result.is_ok(),
+ "Flush should always succeed for direct backend"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_is_available() {
+ let backend = RrdDirectBackend::new();
+
+ // Direct backend should always be available
+ assert!(
+ backend.is_available().await,
+ "Direct backend should always be available"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_multiple_updates() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "multi_update_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("Failed to create RRD");
+
+ // Perform multiple updates
+ for i in 0..10 {
+ let timestamp = start_time + 60 * (i + 1); // 1 minute intervals
+ let total = 1000000 + (i * 100000);
+ let used = 500000 + (i * 50000);
+ let update_data = format!("{}:{}:{}", timestamp, total, used);
+
+ let result = backend.update(&rrd_path, &update_data).await;
+ assert!(result.is_ok(), "Update {} failed: {:?}", i, result.err());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_overwrite_file() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "overwrite_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ // Create file first time
+ backend
+ .create(&rrd_path, &schema, start_time)
+ .await
+ .expect("First create failed");
+
+ // Create same file again - should succeed (overwrites)
+ // Note: librrd create() with no_overwrite=false allows overwriting
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(
+ result.is_ok(),
+ "Creating file again should succeed (overwrite mode): {:?}",
+ result.err()
+ );
+ }
+
+ #[tokio::test]
+ async fn test_direct_backend_large_schema() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "large_schema_test");
+
+ let mut backend = RrdDirectBackend::new();
+ let schema = RrdSchema::node(RrdFormat::Pve9_0); // 19 data sources
+ let start_time = 1704067200;
+
+ // Create RRD with large schema
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(result.is_ok(), "Failed to create RRD with large schema");
+
+ // Update with all values
+ let values = "100:200:50.5:10.2:8000000:4000000:2000000:500000:50000000:25000000:1000000:2000000:6000000:1000000:0.5:1.2:0.8:0.3:0.1";
+ let update_data = format!("N:{}", values);
+
+ let result = backend.update(&rrd_path, &update_data).await;
+ assert!(result.is_ok(), "Failed to update RRD with large schema");
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
new file mode 100644
index 00000000..7d574e5b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
@@ -0,0 +1,229 @@
+/// RRD Backend: Fallback (Daemon + Direct)
+///
+/// Composite backend that tries daemon first, falls back to direct file writing.
+/// This matches the C implementation's behavior in status.c:1405-1420 where
+/// it attempts rrdc_update() first, then falls back to rrd_update_r().
+use super::super::schema::RrdSchema;
+use super::{RrdCachedBackend, RrdDirectBackend};
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Composite backend that tries daemon first, falls back to direct
+///
+/// This provides the same behavior as the C implementation:
+/// 1. Try to use rrdcached daemon for performance
+/// 2. If daemon fails or is unavailable, fall back to direct file writes
+pub struct RrdFallbackBackend {
+ /// Optional daemon backend (None if daemon is unavailable/failed)
+ daemon: Option<RrdCachedBackend>,
+ /// Direct backend (always available)
+ direct: RrdDirectBackend,
+}
+
+impl RrdFallbackBackend {
+ /// Create a new fallback backend
+ ///
+ /// Attempts to connect to rrdcached daemon. If successful, will prefer daemon.
+ /// If daemon is unavailable, will use direct mode only.
+ ///
+ /// # Arguments
+ /// * `daemon_socket` - Path to rrdcached Unix socket
+ pub async fn new(daemon_socket: &str) -> Self {
+ let daemon = match RrdCachedBackend::connect(daemon_socket).await {
+ Ok(backend) => {
+ tracing::info!("RRD fallback backend: daemon available, will prefer daemon mode");
+ Some(backend)
+ }
+ Err(e) => {
+ tracing::warn!(
+ "RRD fallback backend: daemon unavailable ({}), using direct mode only",
+ e
+ );
+ None
+ }
+ };
+
+ let direct = RrdDirectBackend::new();
+
+ Self { daemon, direct }
+ }
+
+ /// Create a fallback backend with explicit daemon and direct backends
+ ///
+ /// Useful for testing or custom configurations
+ #[allow(dead_code)] // Used in tests for custom backend configurations
+ pub fn with_backends(daemon: Option<RrdCachedBackend>, direct: RrdDirectBackend) -> Self {
+ Self { daemon, direct }
+ }
+
+ /// Check if daemon is currently being used
+ #[allow(dead_code)] // Used for debugging/monitoring daemon status
+ pub fn is_using_daemon(&self) -> bool {
+ self.daemon.is_some()
+ }
+
+ /// Disable daemon mode and switch to direct mode only
+ ///
+ /// Called automatically when daemon operations fail
+ fn disable_daemon(&mut self) {
+ if self.daemon.is_some() {
+ tracing::warn!("Disabling daemon mode, switching to direct file writes");
+ self.daemon = None;
+ }
+ }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdFallbackBackend {
+ async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+ // Try daemon first if available
+ if let Some(daemon) = &mut self.daemon {
+ match daemon.update(file_path, data).await {
+ Ok(()) => {
+ tracing::trace!("Updated RRD via daemon (fallback backend)");
+ return Ok(());
+ }
+ Err(e) => {
+ tracing::warn!("Daemon update failed, falling back to direct: {}", e);
+ self.disable_daemon();
+ }
+ }
+ }
+
+ // Fallback to direct
+ self.direct
+ .update(file_path, data)
+ .await
+ .context("Both daemon and direct update failed")
+ }
+
+ async fn create(
+ &mut self,
+ file_path: &Path,
+ schema: &RrdSchema,
+ start_timestamp: i64,
+ ) -> Result<()> {
+ // Try daemon first if available
+ if let Some(daemon) = &mut self.daemon {
+ match daemon.create(file_path, schema, start_timestamp).await {
+ Ok(()) => {
+ tracing::trace!("Created RRD via daemon (fallback backend)");
+ return Ok(());
+ }
+ Err(e) => {
+ tracing::warn!("Daemon create failed, falling back to direct: {}", e);
+ self.disable_daemon();
+ }
+ }
+ }
+
+ // Fallback to direct
+ self.direct
+ .create(file_path, schema, start_timestamp)
+ .await
+ .context("Both daemon and direct create failed")
+ }
+
+ async fn flush(&mut self) -> Result<()> {
+ // Only flush if using daemon
+ if let Some(daemon) = &mut self.daemon {
+ match daemon.flush().await {
+ Ok(()) => return Ok(()),
+ Err(e) => {
+ tracing::warn!("Daemon flush failed: {}", e);
+ self.disable_daemon();
+ }
+ }
+ }
+
+ // Direct backend flush is a no-op
+ self.direct.flush().await
+ }
+
+ async fn is_available(&self) -> bool {
+ // Always available - either daemon or direct will work
+ true
+ }
+
+ fn name(&self) -> &str {
+ if self.daemon.is_some() {
+ "fallback(daemon+direct)"
+ } else {
+ "fallback(direct-only)"
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::backend::RrdBackend;
+ use crate::schema::{RrdFormat, RrdSchema};
+ use std::path::PathBuf;
+ use tempfile::TempDir;
+
+ /// Create a temporary directory for RRD files
+ fn setup_temp_dir() -> TempDir {
+ TempDir::new().expect("Failed to create temp directory")
+ }
+
+ /// Create a test RRD file path
+ fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf {
+ dir.path().join(format!("{}.rrd", name))
+ }
+
+ #[test]
+ fn test_fallback_backend_without_daemon() {
+ let direct = RrdDirectBackend::new();
+ let backend = RrdFallbackBackend::with_backends(None, direct);
+
+ assert!(!backend.is_using_daemon());
+ assert_eq!(backend.name(), "fallback(direct-only)");
+ }
+
+ #[tokio::test]
+ async fn test_fallback_backend_direct_mode_operations() {
+ let temp_dir = setup_temp_dir();
+ let rrd_path = test_rrd_path(&temp_dir, "fallback_test");
+
+ // Create fallback backend without daemon (direct mode only)
+ let direct = RrdDirectBackend::new();
+ let mut backend = RrdFallbackBackend::with_backends(None, direct);
+
+ assert!(!backend.is_using_daemon(), "Should not be using daemon");
+ assert_eq!(backend.name(), "fallback(direct-only)");
+
+ // Test create and update operations work in direct mode
+ let schema = RrdSchema::storage(RrdFormat::Pve2);
+ let start_time = 1704067200;
+
+ let result = backend.create(&rrd_path, &schema, start_time).await;
+ assert!(result.is_ok(), "Create should work in direct mode");
+
+ let result = backend.update(&rrd_path, "N:1000:500").await;
+ assert!(result.is_ok(), "Update should work in direct mode");
+ }
+
+ #[tokio::test]
+ async fn test_fallback_backend_is_always_available() {
+ let direct = RrdDirectBackend::new();
+ let backend = RrdFallbackBackend::with_backends(None, direct);
+
+ // Fallback backend should always be available (even without daemon)
+ assert!(
+ backend.is_available().await,
+ "Fallback backend should always be available"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_fallback_backend_flush_without_daemon() {
+ let direct = RrdDirectBackend::new();
+ let mut backend = RrdFallbackBackend::with_backends(None, direct);
+
+ // Flush should succeed even without daemon (no-op for direct)
+ let result = backend.flush().await;
+ assert!(result.is_ok(), "Flush should succeed without daemon");
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
new file mode 100644
index 00000000..e53b6dad
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
@@ -0,0 +1,140 @@
+/// RRDCached Daemon Client (wrapper around rrdcached-client crate)
+///
+/// This module provides a thin wrapper around the rrdcached-client crate.
+use anyhow::{Context, Result};
+use std::path::Path;
+
+/// Wrapper around rrdcached-client
+#[allow(dead_code)] // Used in backend_daemon.rs via module-level access
+pub struct RrdCachedClient {
+ pub(crate) client:
+ tokio::sync::Mutex<rrdcached_client::RRDCachedClient<tokio::net::UnixStream>>,
+}
+
+impl RrdCachedClient {
+ /// Connect to rrdcached daemon via Unix socket
+ ///
+ /// # Arguments
+ /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock)
+ #[allow(dead_code)] // Used via backend modules
+ pub async fn connect<P: AsRef<Path>>(socket_path: P) -> Result<Self> {
+ let socket_path = socket_path.as_ref().to_string_lossy().to_string();
+
+ tracing::debug!("Connecting to rrdcached at {}", socket_path);
+
+ // Connect to daemon (async operation)
+ let client = rrdcached_client::RRDCachedClient::connect_unix(&socket_path)
+ .await
+ .with_context(|| format!("Failed to connect to rrdcached: {socket_path}"))?;
+
+ tracing::info!("Connected to rrdcached at {}", socket_path);
+
+ Ok(Self {
+ client: tokio::sync::Mutex::new(client),
+ })
+ }
+
+ /// Update RRD file via rrdcached
+ ///
+ /// # Arguments
+ /// * `file_path` - Full path to RRD file
+ /// * `data` - Update data in format "timestamp:value1:value2:..."
+ #[allow(dead_code)] // Used via backend modules
+ pub async fn update<P: AsRef<Path>>(&self, file_path: P, data: &str) -> Result<()> {
+ let file_path = file_path.as_ref();
+
+ // Parse the update data
+ let parts: Vec<&str> = data.split(':').collect();
+ if parts.len() < 2 {
+ anyhow::bail!("Invalid update data format: {data}");
+ }
+
+ let timestamp = if parts[0] == "N" {
+ None
+ } else {
+ Some(
+ parts[0]
+ .parse::<usize>()
+ .with_context(|| format!("Invalid timestamp: {}", parts[0]))?,
+ )
+ };
+
+ let values: Vec<f64> = parts[1..]
+ .iter()
+ .map(|v| {
+ if *v == "U" {
+ Ok(f64::NAN)
+ } else {
+ v.parse::<f64>()
+ .with_context(|| format!("Invalid value: {v}"))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Get file path without .rrd extension (rrdcached-client adds it)
+ let path_str = file_path.to_string_lossy();
+ let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str);
+
+ // Send update via rrdcached
+ let mut client = self.client.lock().await;
+ client
+ .update(path_without_ext, timestamp, values)
+ .await
+ .context("Failed to send update to rrdcached")?;
+
+ tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data);
+
+ Ok(())
+ }
+
+ /// Create RRD file via rrdcached
+ #[allow(dead_code)] // Used via backend modules
+ pub async fn create(&self, args: rrdcached_client::create::CreateArguments) -> Result<()> {
+ let mut client = self.client.lock().await;
+ client
+ .create(args)
+ .await
+ .context("Failed to create RRD via rrdcached")?;
+ Ok(())
+ }
+
+ /// Flush all pending updates
+ #[allow(dead_code)] // Used via backend modules
+ pub async fn flush(&self) -> Result<()> {
+ let mut client = self.client.lock().await;
+ client
+ .flush_all()
+ .await
+ .context("Failed to flush rrdcached")?;
+
+ tracing::debug!("Flushed all RRD files");
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ #[ignore] // Only runs if rrdcached daemon is actually running
+ async fn test_connect_to_daemon() {
+ // This test requires a running rrdcached daemon
+ let result = RrdCachedClient::connect("/var/run/rrdcached.sock").await;
+
+ match result {
+ Ok(client) => {
+ // Try to flush (basic connectivity test)
+ let result = client.flush().await;
+ println!("RRDCached flush result: {:?}", result);
+
+ // Connection successful (flush may fail if no files, that's OK)
+ assert!(result.is_ok() || result.is_err());
+ }
+ Err(e) => {
+ println!("Note: rrdcached not running (expected in test env): {}", e);
+ }
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
new file mode 100644
index 00000000..54021c14
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
@@ -0,0 +1,313 @@
+/// RRD Key Type Parsing and Path Resolution
+///
+/// This module handles parsing RRD status update keys and mapping them
+/// to the appropriate file paths and schemas.
+use anyhow::{Context, Result};
+use std::path::{Path, PathBuf};
+
+use super::schema::{RrdFormat, RrdSchema};
+
+/// RRD key types for routing to correct schema and path
+///
+/// This enum represents the different types of RRD metrics that pmxcfs tracks:
+/// - Node metrics (CPU, memory, network for a node)
+/// - VM metrics (CPU, memory, disk, network for a VM/CT)
+/// - Storage metrics (total/used space for a storage)
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum RrdKeyType {
+ /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
+ Node { nodename: String, format: RrdFormat },
+ /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
+ Vm { vmid: String, format: RrdFormat },
+ /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage}
+ Storage {
+ nodename: String,
+ storage: String,
+ format: RrdFormat,
+ },
+}
+
+impl RrdKeyType {
+ /// Parse RRD key from status update key
+ ///
+ /// Supported formats:
+ /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
+ /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
+ /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
+ /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 }
+ pub(crate) fn parse(key: &str) -> Result<Self> {
+ let parts: Vec<&str> = key.split('/').collect();
+
+ if parts.is_empty() {
+ anyhow::bail!("Empty RRD key");
+ }
+
+ match parts[0] {
+ "pve2-node" => {
+ let nodename = parts.get(1).context("Missing nodename")?.to_string();
+ Ok(RrdKeyType::Node {
+ nodename,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-node-") => {
+ let nodename = parts.get(1).context("Missing nodename")?.to_string();
+ Ok(RrdKeyType::Node {
+ nodename,
+ format: RrdFormat::Pve9_0,
+ })
+ }
+ "pve2.3-vm" => {
+ let vmid = parts.get(1).context("Missing vmid")?.to_string();
+ Ok(RrdKeyType::Vm {
+ vmid,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-vm-") => {
+ let vmid = parts.get(1).context("Missing vmid")?.to_string();
+ Ok(RrdKeyType::Vm {
+ vmid,
+ format: RrdFormat::Pve9_0,
+ })
+ }
+ "pve2-storage" => {
+ let nodename = parts.get(1).context("Missing nodename")?.to_string();
+ let storage = parts.get(2).context("Missing storage")?.to_string();
+ Ok(RrdKeyType::Storage {
+ nodename,
+ storage,
+ format: RrdFormat::Pve2,
+ })
+ }
+ prefix if prefix.starts_with("pve-storage-") => {
+ let nodename = parts.get(1).context("Missing nodename")?.to_string();
+ let storage = parts.get(2).context("Missing storage")?.to_string();
+ Ok(RrdKeyType::Storage {
+ nodename,
+ storage,
+ format: RrdFormat::Pve9_0,
+ })
+ }
+ _ => anyhow::bail!("Unknown RRD key format: {key}"),
+ }
+ }
+
+ /// Get the RRD file path for this key type
+ ///
+ /// Always returns paths using the current format (9.0), regardless of the input format.
+ /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys,
+ /// and they'll be written to `pve-node-9.0/` files automatically.
+ ///
+ /// # Format Migration Strategy
+ ///
+ /// The C implementation always creates files in the current format directory
+ /// (see status.c:1287). This Rust implementation follows the same approach:
+ /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
+ /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
+ ///
+ /// This allows rolling upgrades where old and new nodes coexist in the same cluster.
+ pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
+ match self {
+ RrdKeyType::Node { nodename, .. } => {
+ // Always use current format path
+ base_dir.join("pve-node-9.0").join(nodename)
+ }
+ RrdKeyType::Vm { vmid, .. } => {
+ // Always use current format path
+ base_dir.join("pve-vm-9.0").join(vmid)
+ }
+ RrdKeyType::Storage {
+ nodename, storage, ..
+ } => {
+ // Always use current format path
+ base_dir
+ .join("pve-storage-9.0")
+ .join(nodename)
+ .join(storage)
+ }
+ }
+ }
+
+ /// Get the source format from the input key
+ ///
+ /// This is used for data transformation (padding/truncation).
+ pub(crate) fn source_format(&self) -> RrdFormat {
+ match self {
+ RrdKeyType::Node { format, .. }
+ | RrdKeyType::Vm { format, .. }
+ | RrdKeyType::Storage { format, .. } => *format,
+ }
+ }
+
+ /// Get the target RRD schema (always current format)
+ ///
+ /// Files are always created using the current format (Pve9_0),
+ /// regardless of the source format in the key.
+ pub(crate) fn schema(&self) -> RrdSchema {
+ match self {
+ RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
+ RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
+ RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_node_keys() {
+ let key = RrdKeyType::parse("pve2-node/testnode").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Node {
+ nodename: "testnode".to_string(),
+ format: RrdFormat::Pve2
+ }
+ );
+
+ let key = RrdKeyType::parse("pve-node-9.0/testnode").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Node {
+ nodename: "testnode".to_string(),
+ format: RrdFormat::Pve9_0
+ }
+ );
+ }
+
+ #[test]
+ fn test_parse_vm_keys() {
+ let key = RrdKeyType::parse("pve2.3-vm/100").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Vm {
+ vmid: "100".to_string(),
+ format: RrdFormat::Pve2
+ }
+ );
+
+ let key = RrdKeyType::parse("pve-vm-9.0/100").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Vm {
+ vmid: "100".to_string(),
+ format: RrdFormat::Pve9_0
+ }
+ );
+ }
+
+ #[test]
+ fn test_parse_storage_keys() {
+ let key = RrdKeyType::parse("pve2-storage/node1/local").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Storage {
+ nodename: "node1".to_string(),
+ storage: "local".to_string(),
+ format: RrdFormat::Pve2
+ }
+ );
+
+ let key = RrdKeyType::parse("pve-storage-9.0/node1/local").unwrap();
+ assert_eq!(
+ key,
+ RrdKeyType::Storage {
+ nodename: "node1".to_string(),
+ storage: "local".to_string(),
+ format: RrdFormat::Pve9_0
+ }
+ );
+ }
+
+ #[test]
+ fn test_file_paths() {
+ let base = Path::new("/var/lib/rrdcached/db");
+
+ // New format key → new format path
+ let key = RrdKeyType::Node {
+ nodename: "node1".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ assert_eq!(
+ key.file_path(base),
+ PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1")
+ );
+
+ // Old format key → new format path (auto-upgrade!)
+ let key = RrdKeyType::Node {
+ nodename: "node1".to_string(),
+ format: RrdFormat::Pve2,
+ };
+ assert_eq!(
+ key.file_path(base),
+ PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1"),
+ "Old format keys should create new format files"
+ );
+
+ // VM: Old format → new format
+ let key = RrdKeyType::Vm {
+ vmid: "100".to_string(),
+ format: RrdFormat::Pve2,
+ };
+ assert_eq!(
+ key.file_path(base),
+ PathBuf::from("/var/lib/rrdcached/db/pve-vm-9.0/100"),
+ "Old VM format should upgrade to new format"
+ );
+
+ // Storage: Always uses current format
+ let key = RrdKeyType::Storage {
+ nodename: "node1".to_string(),
+ storage: "local".to_string(),
+ format: RrdFormat::Pve2,
+ };
+ assert_eq!(
+ key.file_path(base),
+ PathBuf::from("/var/lib/rrdcached/db/pve-storage-9.0/node1/local"),
+ "Old storage format should upgrade to new format"
+ );
+ }
+
+ #[test]
+ fn test_source_format() {
+ let key = RrdKeyType::Node {
+ nodename: "node1".to_string(),
+ format: RrdFormat::Pve2,
+ };
+ assert_eq!(key.source_format(), RrdFormat::Pve2);
+
+ let key = RrdKeyType::Vm {
+ vmid: "100".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ assert_eq!(key.source_format(), RrdFormat::Pve9_0);
+ }
+
+ #[test]
+ fn test_schema_always_current_format() {
+ // Even with Pve2 source format, schema should return Pve9_0
+ let key = RrdKeyType::Node {
+ nodename: "node1".to_string(),
+ format: RrdFormat::Pve2,
+ };
+ let schema = key.schema();
+ assert_eq!(
+ schema.format,
+ RrdFormat::Pve9_0,
+ "Schema should always use current format"
+ );
+ assert_eq!(schema.column_count(), 19, "Should have Pve9_0 column count");
+
+ // Pve9_0 source also gets Pve9_0 schema
+ let key = RrdKeyType::Node {
+ nodename: "node1".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ let schema = key.schema();
+ assert_eq!(schema.format, RrdFormat::Pve9_0);
+ assert_eq!(schema.column_count(), 19);
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
new file mode 100644
index 00000000..7a439676
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
@@ -0,0 +1,21 @@
+/// RRD (Round-Robin Database) Persistence Module
+///
+/// This module provides RRD file persistence compatible with the C pmxcfs implementation.
+/// It handles:
+/// - RRD file creation with proper schemas (node, VM, storage)
+/// - RRD file updates (writing metrics to disk)
+/// - Multiple backend strategies:
+/// - Daemon mode: High-performance batched updates via rrdcached
+/// - Direct mode: Reliable fallback using direct file writes
+/// - Fallback mode: Tries daemon first, falls back to direct (matches C behavior)
+/// - Version management (pve2 vs pve-9.0 formats)
+///
+/// The implementation matches the C behavior in status.c where it attempts
+/// daemon updates first, then falls back to direct file operations.
+mod backend;
+mod daemon;
+mod key_type;
+pub(crate) mod schema;
+mod writer;
+
+pub use writer::RrdWriter;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
new file mode 100644
index 00000000..d449bd6e
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
@@ -0,0 +1,577 @@
+/// RRD Schema Definitions
+///
+/// Defines RRD database schemas matching the C pmxcfs implementation.
+/// Each schema specifies data sources (DS) and round-robin archives (RRA).
+use std::fmt;
+
+/// RRD format version
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum RrdFormat {
+ /// Legacy pve2 format (12 columns for node, 10 for VM, 2 for storage)
+ Pve2,
+ /// New pve-9.0 format (19 columns for node, 17 for VM, 2 for storage)
+ Pve9_0,
+}
+
+/// RRD data source definition
+#[derive(Debug, Clone)]
+pub struct RrdDataSource {
+ /// Data source name
+ pub name: &'static str,
+ /// Data source type (GAUGE, COUNTER, DERIVE, ABSOLUTE)
+ pub ds_type: &'static str,
+ /// Heartbeat (seconds before marking as unknown)
+ pub heartbeat: u32,
+ /// Minimum value (U for unknown)
+ pub min: &'static str,
+ /// Maximum value (U for unknown)
+ pub max: &'static str,
+}
+
+impl RrdDataSource {
+ /// Create GAUGE data source with no min/max limits
+ pub(super) const fn gauge(name: &'static str) -> Self {
+ Self {
+ name,
+ ds_type: "GAUGE",
+ heartbeat: 120,
+ min: "0",
+ max: "U",
+ }
+ }
+
+ /// Create DERIVE data source (for counters that can wrap)
+ pub(super) const fn derive(name: &'static str) -> Self {
+ Self {
+ name,
+ ds_type: "DERIVE",
+ heartbeat: 120,
+ min: "0",
+ max: "U",
+ }
+ }
+
+ /// Format as RRD command line argument
+ ///
+ /// Matches C implementation format: "DS:name:TYPE:heartbeat:min:max"
+ /// (see rrd_def_node in src/pmxcfs/status.c:1100)
+ ///
+ /// Currently unused but kept for debugging/testing and C format compatibility.
+ #[allow(dead_code)]
+ pub(super) fn to_arg(&self) -> String {
+ format!(
+ "DS:{}:{}:{}:{}:{}",
+ self.name, self.ds_type, self.heartbeat, self.min, self.max
+ )
+ }
+}
+
+/// RRD schema with data sources and archives
+#[derive(Debug, Clone)]
+pub struct RrdSchema {
+ /// RRD format version
+ pub format: RrdFormat,
+ /// Data sources
+ pub data_sources: Vec<RrdDataSource>,
+ /// Round-robin archives (RRA definitions)
+ pub archives: Vec<String>,
+}
+
+impl RrdSchema {
+ /// Create node RRD schema
+ pub fn node(format: RrdFormat) -> Self {
+ let data_sources = match format {
+ RrdFormat::Pve2 => vec![
+ RrdDataSource::gauge("loadavg"),
+ RrdDataSource::gauge("maxcpu"),
+ RrdDataSource::gauge("cpu"),
+ RrdDataSource::gauge("iowait"),
+ RrdDataSource::gauge("memtotal"),
+ RrdDataSource::gauge("memused"),
+ RrdDataSource::gauge("swaptotal"),
+ RrdDataSource::gauge("swapused"),
+ RrdDataSource::gauge("roottotal"),
+ RrdDataSource::gauge("rootused"),
+ RrdDataSource::derive("netin"),
+ RrdDataSource::derive("netout"),
+ ],
+ RrdFormat::Pve9_0 => vec![
+ RrdDataSource::gauge("loadavg"),
+ RrdDataSource::gauge("maxcpu"),
+ RrdDataSource::gauge("cpu"),
+ RrdDataSource::gauge("iowait"),
+ RrdDataSource::gauge("memtotal"),
+ RrdDataSource::gauge("memused"),
+ RrdDataSource::gauge("swaptotal"),
+ RrdDataSource::gauge("swapused"),
+ RrdDataSource::gauge("roottotal"),
+ RrdDataSource::gauge("rootused"),
+ RrdDataSource::derive("netin"),
+ RrdDataSource::derive("netout"),
+ RrdDataSource::gauge("memavailable"),
+ RrdDataSource::gauge("arcsize"),
+ RrdDataSource::gauge("pressurecpusome"),
+ RrdDataSource::gauge("pressureiosome"),
+ RrdDataSource::gauge("pressureiofull"),
+ RrdDataSource::gauge("pressurememorysome"),
+ RrdDataSource::gauge("pressurememoryfull"),
+ ],
+ };
+
+ Self {
+ format,
+ data_sources,
+ archives: Self::default_archives(),
+ }
+ }
+
+ /// Create VM RRD schema
+ pub fn vm(format: RrdFormat) -> Self {
+ let data_sources = match format {
+ RrdFormat::Pve2 => vec![
+ RrdDataSource::gauge("maxcpu"),
+ RrdDataSource::gauge("cpu"),
+ RrdDataSource::gauge("maxmem"),
+ RrdDataSource::gauge("mem"),
+ RrdDataSource::gauge("maxdisk"),
+ RrdDataSource::gauge("disk"),
+ RrdDataSource::derive("netin"),
+ RrdDataSource::derive("netout"),
+ RrdDataSource::derive("diskread"),
+ RrdDataSource::derive("diskwrite"),
+ ],
+ RrdFormat::Pve9_0 => vec![
+ RrdDataSource::gauge("maxcpu"),
+ RrdDataSource::gauge("cpu"),
+ RrdDataSource::gauge("maxmem"),
+ RrdDataSource::gauge("mem"),
+ RrdDataSource::gauge("maxdisk"),
+ RrdDataSource::gauge("disk"),
+ RrdDataSource::derive("netin"),
+ RrdDataSource::derive("netout"),
+ RrdDataSource::derive("diskread"),
+ RrdDataSource::derive("diskwrite"),
+ RrdDataSource::gauge("memhost"),
+ RrdDataSource::gauge("pressurecpusome"),
+ RrdDataSource::gauge("pressurecpufull"),
+ RrdDataSource::gauge("pressureiosome"),
+ RrdDataSource::gauge("pressureiofull"),
+ RrdDataSource::gauge("pressurememorysome"),
+ RrdDataSource::gauge("pressurememoryfull"),
+ ],
+ };
+
+ Self {
+ format,
+ data_sources,
+ archives: Self::default_archives(),
+ }
+ }
+
+ /// Create storage RRD schema
+ pub fn storage(format: RrdFormat) -> Self {
+ let data_sources = vec![RrdDataSource::gauge("total"), RrdDataSource::gauge("used")];
+
+ Self {
+ format,
+ data_sources,
+ archives: Self::default_archives(),
+ }
+ }
+
+ /// Default RRA (Round-Robin Archive) definitions
+ ///
+ /// These match the C implementation's archives for 60-second step size:
+ /// - RRA:AVERAGE:0.5:1:1440 -> 1 min * 1440 => 1 day
+ /// - RRA:AVERAGE:0.5:30:1440 -> 30 min * 1440 => 30 days
+ /// - RRA:AVERAGE:0.5:360:1440 -> 6 hours * 1440 => 360 days (~1 year)
+ /// - RRA:AVERAGE:0.5:10080:570 -> 1 week * 570 => ~10 years
+ /// - RRA:MAX:0.5:1:1440 -> 1 min * 1440 => 1 day
+ /// - RRA:MAX:0.5:30:1440 -> 30 min * 1440 => 30 days
+ /// - RRA:MAX:0.5:360:1440 -> 6 hours * 1440 => 360 days (~1 year)
+ /// - RRA:MAX:0.5:10080:570 -> 1 week * 570 => ~10 years
+ pub(super) fn default_archives() -> Vec<String> {
+ vec![
+ "RRA:AVERAGE:0.5:1:1440".to_string(),
+ "RRA:AVERAGE:0.5:30:1440".to_string(),
+ "RRA:AVERAGE:0.5:360:1440".to_string(),
+ "RRA:AVERAGE:0.5:10080:570".to_string(),
+ "RRA:MAX:0.5:1:1440".to_string(),
+ "RRA:MAX:0.5:30:1440".to_string(),
+ "RRA:MAX:0.5:360:1440".to_string(),
+ "RRA:MAX:0.5:10080:570".to_string(),
+ ]
+ }
+
+ /// Get number of data sources
+ pub fn column_count(&self) -> usize {
+ self.data_sources.len()
+ }
+}
+
+impl fmt::Display for RrdSchema {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "{:?} schema with {} data sources",
+ self.format,
+ self.column_count()
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn assert_ds_properties(
+ ds: &RrdDataSource,
+ expected_name: &str,
+ expected_type: &str,
+ index: usize,
+ ) {
+ assert_eq!(ds.name, expected_name, "DS[{}] name mismatch", index);
+ assert_eq!(ds.ds_type, expected_type, "DS[{}] type mismatch", index);
+ assert_eq!(ds.heartbeat, 120, "DS[{}] heartbeat should be 120", index);
+ assert_eq!(ds.min, "0", "DS[{}] min should be 0", index);
+ assert_eq!(ds.max, "U", "DS[{}] max should be U", index);
+ }
+
+ #[test]
+ fn test_datasource_construction() {
+ let gauge_ds = RrdDataSource::gauge("cpu");
+ assert_eq!(gauge_ds.name, "cpu");
+ assert_eq!(gauge_ds.ds_type, "GAUGE");
+ assert_eq!(gauge_ds.heartbeat, 120);
+ assert_eq!(gauge_ds.min, "0");
+ assert_eq!(gauge_ds.max, "U");
+ assert_eq!(gauge_ds.to_arg(), "DS:cpu:GAUGE:120:0:U");
+
+ let derive_ds = RrdDataSource::derive("netin");
+ assert_eq!(derive_ds.name, "netin");
+ assert_eq!(derive_ds.ds_type, "DERIVE");
+ assert_eq!(derive_ds.heartbeat, 120);
+ assert_eq!(derive_ds.min, "0");
+ assert_eq!(derive_ds.max, "U");
+ assert_eq!(derive_ds.to_arg(), "DS:netin:DERIVE:120:0:U");
+ }
+
+ #[test]
+ fn test_node_schema_pve2() {
+ let schema = RrdSchema::node(RrdFormat::Pve2);
+
+ assert_eq!(schema.column_count(), 12);
+ assert_eq!(schema.format, RrdFormat::Pve2);
+
+ let expected_ds = vec![
+ ("loadavg", "GAUGE"),
+ ("maxcpu", "GAUGE"),
+ ("cpu", "GAUGE"),
+ ("iowait", "GAUGE"),
+ ("memtotal", "GAUGE"),
+ ("memused", "GAUGE"),
+ ("swaptotal", "GAUGE"),
+ ("swapused", "GAUGE"),
+ ("roottotal", "GAUGE"),
+ ("rootused", "GAUGE"),
+ ("netin", "DERIVE"),
+ ("netout", "DERIVE"),
+ ];
+
+ for (i, (name, ds_type)) in expected_ds.iter().enumerate() {
+ assert_ds_properties(&schema.data_sources[i], name, ds_type, i);
+ }
+ }
+
+ #[test]
+ fn test_node_schema_pve9() {
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+
+ assert_eq!(schema.column_count(), 19);
+ assert_eq!(schema.format, RrdFormat::Pve9_0);
+
+ let pve2_schema = RrdSchema::node(RrdFormat::Pve2);
+ for i in 0..12 {
+ assert_eq!(
+ schema.data_sources[i].name, pve2_schema.data_sources[i].name,
+ "First 12 DS should match pve2"
+ );
+ assert_eq!(
+ schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type,
+ "First 12 DS types should match pve2"
+ );
+ }
+
+ let pve9_additions = vec![
+ ("memavailable", "GAUGE"),
+ ("arcsize", "GAUGE"),
+ ("pressurecpusome", "GAUGE"),
+ ("pressureiosome", "GAUGE"),
+ ("pressureiofull", "GAUGE"),
+ ("pressurememorysome", "GAUGE"),
+ ("pressurememoryfull", "GAUGE"),
+ ];
+
+ for (i, (name, ds_type)) in pve9_additions.iter().enumerate() {
+ assert_ds_properties(&schema.data_sources[12 + i], name, ds_type, 12 + i);
+ }
+ }
+
+ #[test]
+ fn test_vm_schema_pve2() {
+ let schema = RrdSchema::vm(RrdFormat::Pve2);
+
+ assert_eq!(schema.column_count(), 10);
+ assert_eq!(schema.format, RrdFormat::Pve2);
+
+ let expected_ds = vec![
+ ("maxcpu", "GAUGE"),
+ ("cpu", "GAUGE"),
+ ("maxmem", "GAUGE"),
+ ("mem", "GAUGE"),
+ ("maxdisk", "GAUGE"),
+ ("disk", "GAUGE"),
+ ("netin", "DERIVE"),
+ ("netout", "DERIVE"),
+ ("diskread", "DERIVE"),
+ ("diskwrite", "DERIVE"),
+ ];
+
+ for (i, (name, ds_type)) in expected_ds.iter().enumerate() {
+ assert_ds_properties(&schema.data_sources[i], name, ds_type, i);
+ }
+ }
+
+ #[test]
+ fn test_vm_schema_pve9() {
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+
+ assert_eq!(schema.column_count(), 17);
+ assert_eq!(schema.format, RrdFormat::Pve9_0);
+
+ let pve2_schema = RrdSchema::vm(RrdFormat::Pve2);
+ for i in 0..10 {
+ assert_eq!(
+ schema.data_sources[i].name, pve2_schema.data_sources[i].name,
+ "First 10 DS should match pve2"
+ );
+ assert_eq!(
+ schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type,
+ "First 10 DS types should match pve2"
+ );
+ }
+
+ let pve9_additions = vec![
+ ("memhost", "GAUGE"),
+ ("pressurecpusome", "GAUGE"),
+ ("pressurecpufull", "GAUGE"),
+ ("pressureiosome", "GAUGE"),
+ ("pressureiofull", "GAUGE"),
+ ("pressurememorysome", "GAUGE"),
+ ("pressurememoryfull", "GAUGE"),
+ ];
+
+ for (i, (name, ds_type)) in pve9_additions.iter().enumerate() {
+ assert_ds_properties(&schema.data_sources[10 + i], name, ds_type, 10 + i);
+ }
+ }
+
+ #[test]
+ fn test_storage_schema() {
+ for format in [RrdFormat::Pve2, RrdFormat::Pve9_0] {
+ let schema = RrdSchema::storage(format);
+
+ assert_eq!(schema.column_count(), 2);
+ assert_eq!(schema.format, format);
+
+ assert_ds_properties(&schema.data_sources[0], "total", "GAUGE", 0);
+ assert_ds_properties(&schema.data_sources[1], "used", "GAUGE", 1);
+ }
+ }
+
+ #[test]
+ fn test_rra_archives() {
+ let expected_rras = [
+ "RRA:AVERAGE:0.5:1:1440",
+ "RRA:AVERAGE:0.5:30:1440",
+ "RRA:AVERAGE:0.5:360:1440",
+ "RRA:AVERAGE:0.5:10080:570",
+ "RRA:MAX:0.5:1:1440",
+ "RRA:MAX:0.5:30:1440",
+ "RRA:MAX:0.5:360:1440",
+ "RRA:MAX:0.5:10080:570",
+ ];
+
+ let schemas = vec![
+ RrdSchema::node(RrdFormat::Pve2),
+ RrdSchema::node(RrdFormat::Pve9_0),
+ RrdSchema::vm(RrdFormat::Pve2),
+ RrdSchema::vm(RrdFormat::Pve9_0),
+ RrdSchema::storage(RrdFormat::Pve2),
+ RrdSchema::storage(RrdFormat::Pve9_0),
+ ];
+
+ for schema in schemas {
+ assert_eq!(schema.archives.len(), 8);
+
+ for (i, expected) in expected_rras.iter().enumerate() {
+ assert_eq!(
+ &schema.archives[i], expected,
+ "RRA[{}] mismatch in {:?}",
+ i, schema.format
+ );
+ }
+ }
+ }
+
+ #[test]
+ fn test_heartbeat_consistency() {
+ let schemas = vec![
+ RrdSchema::node(RrdFormat::Pve2),
+ RrdSchema::node(RrdFormat::Pve9_0),
+ RrdSchema::vm(RrdFormat::Pve2),
+ RrdSchema::vm(RrdFormat::Pve9_0),
+ RrdSchema::storage(RrdFormat::Pve2),
+ RrdSchema::storage(RrdFormat::Pve9_0),
+ ];
+
+ for schema in schemas {
+ for ds in &schema.data_sources {
+ assert_eq!(ds.heartbeat, 120);
+ assert_eq!(ds.min, "0");
+ assert_eq!(ds.max, "U");
+ }
+ }
+ }
+
+ #[test]
+ fn test_gauge_vs_derive_correctness() {
+ // GAUGE: instantaneous values (CPU%, memory bytes)
+ // DERIVE: cumulative counters that can wrap (network/disk bytes)
+
+ let node = RrdSchema::node(RrdFormat::Pve2);
+ let node_derive_indices = [10, 11]; // netin, netout
+ for (i, ds) in node.data_sources.iter().enumerate() {
+ if node_derive_indices.contains(&i) {
+ assert_eq!(
+ ds.ds_type, "DERIVE",
+ "Node DS[{}] ({}) should be DERIVE",
+ i, ds.name
+ );
+ } else {
+ assert_eq!(
+ ds.ds_type, "GAUGE",
+ "Node DS[{}] ({}) should be GAUGE",
+ i, ds.name
+ );
+ }
+ }
+
+ let vm = RrdSchema::vm(RrdFormat::Pve2);
+ let vm_derive_indices = [6, 7, 8, 9]; // netin, netout, diskread, diskwrite
+ for (i, ds) in vm.data_sources.iter().enumerate() {
+ if vm_derive_indices.contains(&i) {
+ assert_eq!(
+ ds.ds_type, "DERIVE",
+ "VM DS[{}] ({}) should be DERIVE",
+ i, ds.name
+ );
+ } else {
+ assert_eq!(
+ ds.ds_type, "GAUGE",
+ "VM DS[{}] ({}) should be GAUGE",
+ i, ds.name
+ );
+ }
+ }
+
+ let storage = RrdSchema::storage(RrdFormat::Pve2);
+ for ds in &storage.data_sources {
+ assert_eq!(
+ ds.ds_type, "GAUGE",
+ "Storage DS ({}) should be GAUGE",
+ ds.name
+ );
+ }
+ }
+
+ #[test]
+ fn test_pve9_backward_compatibility() {
+ let node_pve2 = RrdSchema::node(RrdFormat::Pve2);
+ let node_pve9 = RrdSchema::node(RrdFormat::Pve9_0);
+
+ assert!(node_pve9.column_count() > node_pve2.column_count());
+
+ for i in 0..node_pve2.column_count() {
+ assert_eq!(
+ node_pve2.data_sources[i].name, node_pve9.data_sources[i].name,
+ "Node DS[{}] name must match between pve2 and pve9.0",
+ i
+ );
+ assert_eq!(
+ node_pve2.data_sources[i].ds_type, node_pve9.data_sources[i].ds_type,
+ "Node DS[{}] type must match between pve2 and pve9.0",
+ i
+ );
+ }
+
+ let vm_pve2 = RrdSchema::vm(RrdFormat::Pve2);
+ let vm_pve9 = RrdSchema::vm(RrdFormat::Pve9_0);
+
+ assert!(vm_pve9.column_count() > vm_pve2.column_count());
+
+ for i in 0..vm_pve2.column_count() {
+ assert_eq!(
+ vm_pve2.data_sources[i].name, vm_pve9.data_sources[i].name,
+ "VM DS[{}] name must match between pve2 and pve9.0",
+ i
+ );
+ assert_eq!(
+ vm_pve2.data_sources[i].ds_type, vm_pve9.data_sources[i].ds_type,
+ "VM DS[{}] type must match between pve2 and pve9.0",
+ i
+ );
+ }
+
+ let storage_pve2 = RrdSchema::storage(RrdFormat::Pve2);
+ let storage_pve9 = RrdSchema::storage(RrdFormat::Pve9_0);
+ assert_eq!(storage_pve2.column_count(), storage_pve9.column_count());
+ }
+
+ #[test]
+ fn test_schema_display() {
+ let test_cases = vec![
+ (RrdSchema::node(RrdFormat::Pve2), "Pve2", "12 data sources"),
+ (
+ RrdSchema::node(RrdFormat::Pve9_0),
+ "Pve9_0",
+ "19 data sources",
+ ),
+ (RrdSchema::vm(RrdFormat::Pve2), "Pve2", "10 data sources"),
+ (
+ RrdSchema::vm(RrdFormat::Pve9_0),
+ "Pve9_0",
+ "17 data sources",
+ ),
+ (
+ RrdSchema::storage(RrdFormat::Pve2),
+ "Pve2",
+ "2 data sources",
+ ),
+ ];
+
+ for (schema, expected_format, expected_count) in test_cases {
+ let display = format!("{}", schema);
+ assert!(
+ display.contains(expected_format),
+ "Display should contain format: {}",
+ display
+ );
+ assert!(
+ display.contains(expected_count),
+ "Display should contain count: {}",
+ display
+ );
+ }
+ }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
new file mode 100644
index 00000000..79ed202a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
@@ -0,0 +1,397 @@
+/// RRD File Writer
+///
+/// Handles creating and updating RRD files via pluggable backends.
+/// Supports daemon-based (rrdcached) and direct file writing modes.
+use super::key_type::RrdKeyType;
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use chrono::Utc;
+use std::collections::HashMap;
+use std::fs;
+use std::path::{Path, PathBuf};
+
+/// Metric type for determining column skipping rules
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum MetricType {
+ Node,
+ Vm,
+ Storage,
+}
+
+impl MetricType {
+ /// Number of non-archivable columns to skip
+ ///
+ /// C implementation (status.c:1300, 1335):
+ /// - Node: skip 2 (uptime, status)
+ /// - VM: skip 4 (uptime, status, template, pid)
+ /// - Storage: skip 0
+ fn skip_columns(self) -> usize {
+ match self {
+ MetricType::Node => 2,
+ MetricType::Vm => 4,
+ MetricType::Storage => 0,
+ }
+ }
+}
+
+impl RrdFormat {
+ /// Get column count for a specific metric type
+ #[allow(dead_code)]
+ fn column_count(self, metric_type: &MetricType) -> usize {
+ match (self, metric_type) {
+ (RrdFormat::Pve2, MetricType::Node) => 12,
+ (RrdFormat::Pve9_0, MetricType::Node) => 19,
+ (RrdFormat::Pve2, MetricType::Vm) => 10,
+ (RrdFormat::Pve9_0, MetricType::Vm) => 17,
+ (_, MetricType::Storage) => 2, // Same for both formats
+ }
+ }
+}
+
+impl RrdKeyType {
+ /// Get the metric type for this key
+ fn metric_type(&self) -> MetricType {
+ match self {
+ RrdKeyType::Node { .. } => MetricType::Node,
+ RrdKeyType::Vm { .. } => MetricType::Vm,
+ RrdKeyType::Storage { .. } => MetricType::Storage,
+ }
+ }
+}
+
+/// RRD writer for persistent metric storage
+///
+/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
+pub struct RrdWriter {
+ /// Base directory for RRD files (default: /var/lib/rrdcached/db)
+ base_dir: PathBuf,
+ /// Backend for RRD operations (daemon, direct, or fallback)
+ backend: Box<dyn super::backend::RrdBackend>,
+ /// Track which RRD files we've already created
+ created_files: HashMap<String, ()>,
+}
+
+impl RrdWriter {
+ /// Create new RRD writer with default fallback backend
+ ///
+ /// Uses the fallback backend that tries daemon first, then falls back to direct file writes.
+ /// This matches the C implementation's behavior.
+ ///
+ /// # Arguments
+ /// * `base_dir` - Base directory for RRD files
+ pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+ let backend = Self::default_backend().await?;
+ Self::with_backend(base_dir, backend).await
+ }
+
+ /// Create new RRD writer with specific backend
+ ///
+ /// # Arguments
+ /// * `base_dir` - Base directory for RRD files
+ /// * `backend` - RRD backend to use (daemon, direct, or fallback)
+ pub(crate) async fn with_backend<P: AsRef<Path>>(
+ base_dir: P,
+ backend: Box<dyn super::backend::RrdBackend>,
+ ) -> Result<Self> {
+ let base_dir = base_dir.as_ref().to_path_buf();
+
+ // Create base directory if it doesn't exist
+ fs::create_dir_all(&base_dir)
+ .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?;
+
+ tracing::info!("RRD writer using backend: {}", backend.name());
+
+ Ok(Self {
+ base_dir,
+ backend,
+ created_files: HashMap::new(),
+ })
+ }
+
+ /// Create default backend (fallback: daemon + direct)
+ ///
+ /// This matches the C implementation's behavior:
+ /// - Tries rrdcached daemon first for performance
+ /// - Falls back to direct file writes if daemon fails
+ async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
+ let backend = super::backend::RrdFallbackBackend::new("/var/run/rrdcached.sock").await;
+ Ok(Box::new(backend))
+ }
+
+ /// Update RRD file with metric data
+ ///
+ /// This will:
+ /// 1. Transform data from source format to target format (padding/truncation/column skipping)
+ /// 2. Create the RRD file if it doesn't exist
+ /// 3. Update via rrdcached daemon
+ ///
+ /// # Arguments
+ /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
+ /// * `data` - Metric data string (format: "timestamp:value1:value2:...")
+ pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
+ // Parse the key to determine file path and schema
+ let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?;
+
+ // Get source format and target schema
+ let source_format = key_type.source_format();
+ let target_schema = key_type.schema();
+ let metric_type = key_type.metric_type();
+
+ // Transform data from source to target format
+ let transformed_data =
+ Self::transform_data(data, source_format, &target_schema, metric_type)
+ .with_context(|| format!("Failed to transform RRD data for key: {key}"))?;
+
+ // Get the file path (always uses current format)
+ let file_path = key_type.file_path(&self.base_dir);
+
+ // Ensure the RRD file exists
+ if !self.created_files.contains_key(key) && !file_path.exists() {
+ self.create_rrd_file(&key_type, &file_path).await?;
+ self.created_files.insert(key.to_string(), ());
+ }
+
+ // Update the RRD file via backend
+ self.backend.update(&file_path, &transformed_data).await?;
+
+ Ok(())
+ }
+
+ /// Create RRD file with appropriate schema via backend
+ async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> {
+ // Ensure parent directory exists
+ if let Some(parent) = file_path.parent() {
+ fs::create_dir_all(parent)
+ .with_context(|| format!("Failed to create directory: {parent:?}"))?;
+ }
+
+ // Get schema for this RRD type
+ let schema = key_type.schema();
+
+ // Calculate start time (at day boundary, matching C implementation)
+ let now = Utc::now();
+ let start = now
+ .date_naive()
+ .and_hms_opt(0, 0, 0)
+ .expect("00:00:00 is always a valid time")
+ .and_utc();
+ let start_timestamp = start.timestamp();
+
+ tracing::debug!(
+ "Creating RRD file: {:?} with {} data sources via {}",
+ file_path,
+ schema.column_count(),
+ self.backend.name()
+ );
+
+ // Delegate to backend for creation
+ self.backend
+ .create(file_path, &schema, start_timestamp)
+ .await?;
+
+ tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
+
+ Ok(())
+ }
+
+ /// Transform data from source format to target format
+ ///
+ /// This implements the C behavior from status.c:
+ /// 1. Skip non-archivable columns only for old formats (uptime, status for nodes)
+ /// 2. Pad old format data with `:U` for missing columns
+ /// 3. Truncate future format data to known columns
+ ///
+ /// # Arguments
+ /// * `data` - Raw data string from status update (format: "timestamp:v1:v2:...")
+ /// * `source_format` - Format indicated by the input key
+ /// * `target_schema` - Target RRD schema (always Pve9_0 currently)
+ /// * `metric_type` - Type of metric (Node, VM, Storage) for column skipping
+ ///
+ /// # Returns
+ /// Transformed data string ready for RRD update
+ fn transform_data(
+ data: &str,
+ source_format: RrdFormat,
+ target_schema: &RrdSchema,
+ metric_type: MetricType,
+ ) -> Result<String> {
+ let mut parts = data.split(':');
+
+ let timestamp = parts
+ .next()
+ .ok_or_else(|| anyhow::anyhow!("Empty data string"))?;
+
+ // Skip non-archivable columns for old format only (C: status.c:1300, 1335, 1385)
+ let skip_count = if source_format == RrdFormat::Pve2 {
+ metric_type.skip_columns()
+ } else {
+ 0
+ };
+
+ // Build transformed data: timestamp + values (skipped, padded/truncated to target_cols)
+ let target_cols = target_schema.column_count();
+
+ // Join values with ':' separator, efficiently building the string without Vec allocation
+ let mut iter = parts
+ .skip(skip_count)
+ .chain(std::iter::repeat("U"))
+ .take(target_cols);
+ let values = match iter.next() {
+ Some(first) => {
+ // Start with first value, fold remaining values with separator
+ iter.fold(first.to_string(), |mut acc, value| {
+ acc.push(':');
+ acc.push_str(value);
+ acc
+ })
+ }
+ None => String::new(),
+ };
+
+ Ok(format!("{timestamp}:{values}"))
+ }
+
+ /// Flush all pending updates
+ #[allow(dead_code)] // Used via RRD update cycle
+ pub(crate) async fn flush(&mut self) -> Result<()> {
+ self.backend.flush().await
+ }
+
+ /// Get base directory
+ #[allow(dead_code)] // Used for path resolution in updates
+ pub(crate) fn base_dir(&self) -> &Path {
+ &self.base_dir
+ }
+}
+
+impl Drop for RrdWriter {
+ fn drop(&mut self) {
+ // Note: We can't flush in Drop since it's async
+ // Users should call flush() explicitly before dropping if needed
+ tracing::debug!("RrdWriter dropped");
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::super::schema::{RrdFormat, RrdSchema};
+ use super::*;
+
+ #[test]
+ fn test_rrd_file_path_generation() {
+ let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+ let key_node = RrdKeyType::Node {
+ nodename: "testnode".to_string(),
+ format: RrdFormat::Pve9_0,
+ };
+ let path = key_node.file_path(&temp_dir);
+ assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+ }
+
+ // ===== Format Adaptation Tests =====
+
+ #[test]
+ fn test_transform_data_node_pve2_to_pve9() {
+ // Test padding old format (12 cols) to new format (19 cols)
+ // Input: timestamp:uptime:status:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout
+ let data = "1234567890:1000:0:1.5:4:2.0:0.5:8000000000:6000000000:0:0:1000000:500000";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap();
+
+ // After skipping 2 cols (uptime, status) and padding with 7 U's:
+ // timestamp:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout:U:U:U:U:U:U:U
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890", "Timestamp should be preserved");
+ assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); // 1 + 19
+ assert_eq!(parts[1], "1.5", "First value after skip should be load");
+ assert_eq!(parts[2], "4", "Second value should be maxcpu");
+
+ // Check padding
+ for (i, item) in parts.iter().enumerate().take(20).skip(12) {
+ assert_eq!(item, &"U", "Column {} should be padded with U", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_vm_pve2_to_pve9() {
+ // Test VM transformation with 4 columns skipped
+ // Input: timestamp:uptime:status:template:pid:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite
+ let data = "1234567890:1000:1:0:12345:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+ let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts[0], "1234567890");
+ assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+ assert_eq!(parts[1], "4", "First value after skip should be maxcpu");
+
+ // Check padding (last 7 columns)
+ for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+ assert_eq!(item, &"U", "Column {} should be padded", i);
+ }
+ }
+
+ #[test]
+ fn test_transform_data_no_padding_needed() {
+ // Test when source and target have same column count
+ let data = "1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0:0:0:0:0";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+ // No transformation should occur (same format)
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20); // timestamp + 19 values
+ assert_eq!(parts[1], "1.5");
+ }
+
+ #[test]
+ fn test_transform_data_future_format_truncation() {
+ // Test truncation of future format with extra columns
+ let data = "1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+ let schema = RrdSchema::node(RrdFormat::Pve9_0);
+ // Simulating future format that has 25 columns
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+ let parts: Vec<&str> = result.split(':').collect();
+ assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 values");
+ assert_eq!(parts[19], "19", "Last value should be column 19");
+ }
+
+ #[test]
+ fn test_transform_data_storage_no_change() {
+ // Storage format is same for Pve2 and Pve9_0 (2 columns, no skipping)
+ let data = "1234567890:1000000000000:500000000000";
+
+ let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+ let result =
+ RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage).unwrap();
+
+ assert_eq!(result, data, "Storage data should not be transformed");
+ }
+
+ #[test]
+ fn test_metric_type_methods() {
+ assert_eq!(MetricType::Node.skip_columns(), 2);
+ assert_eq!(MetricType::Vm.skip_columns(), 4);
+ assert_eq!(MetricType::Storage.skip_columns(), 0);
+ }
+
+ #[test]
+ fn test_format_column_counts() {
+ assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Node), 12);
+ assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Node), 19);
+ assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Vm), 10);
+ assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Vm), 17);
+ assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Storage), 2);
+ assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Storage), 2);
+ }
+}
--
2.47.3
More information about the pve-devel
mailing list