[pve-devel] [PATCH pve-cluster 04/15] pmxcfs-rs: add pmxcfs-rrd crate

Kefu Chai k.chai at proxmox.com
Tue Jan 6 15:24:28 CET 2026


Add RRD (Round-Robin Database) file persistence system:
- RrdWriter: Main API for RRD operations
- Schema definitions for CPU, memory, network metrics
- Format migration support (v1/v2/v3)
- rrdcached integration for batched writes
- Data transformation for legacy formats

This is an independent crate with no internal dependencies,
only requiring external RRD libraries (rrd, rrdcached-client)
and tokio for async operations. It handles time-series data
storage compatible with the C implementation.

Includes comprehensive unit tests for data transformation,
schema generation, and multi-source data processing.

Signed-off-by: Kefu Chai <k.chai at proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |   1 +
 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml           |  18 +
 src/pmxcfs-rs/pmxcfs-rrd/README.md            |  51 ++
 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs       |  67 ++
 .../pmxcfs-rrd/src/backend/backend_daemon.rs  | 214 +++++++
 .../pmxcfs-rrd/src/backend/backend_direct.rs  | 606 ++++++++++++++++++
 .../src/backend/backend_fallback.rs           | 229 +++++++
 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs        | 140 ++++
 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs      | 313 +++++++++
 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs           |  21 +
 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs        | 577 +++++++++++++++++
 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs        | 397 ++++++++++++
 12 files changed, 2634 insertions(+)
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index 4d17e87e..dd36c81f 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -4,6 +4,7 @@ members = [
     "pmxcfs-api-types",  # Shared types and error definitions
     "pmxcfs-config",     # Configuration management
     "pmxcfs-logger",     # Cluster log with ring buffer and deduplication
+    "pmxcfs-rrd",        # RRD (Round-Robin Database) persistence
 ]
 resolver = "2"
 
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
new file mode 100644
index 00000000..bab71423
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "pmxcfs-rrd"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+async-trait = "0.1"
+chrono = { version = "0.4", default-features = false, features = ["clock"] }
+rrd = "0.2"
+rrdcached-client = "0.1.5"
+tokio.workspace = true
+tracing.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/README.md b/src/pmxcfs-rs/pmxcfs-rrd/README.md
new file mode 100644
index 00000000..800d78cf
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/README.md
@@ -0,0 +1,51 @@
+# pmxcfs-rrd
+
+RRD (Round-Robin Database) persistence for pmxcfs performance metrics.
+
+## Overview
+
+This crate provides RRD file management for storing time-series performance data from Proxmox nodes and VMs. It handles file creation, updates, and integration with rrdcached daemon for efficient writes.
+
+### Key Features
+
+- RRD file creation with schema-based initialization
+- RRD updates (write metrics to disk)
+- rrdcached integration for batched writes
+- Support for both legacy and current schema versions
+- Type-safe key parsing and validation
+- Compatible with existing C-created RRD files
+
+## Module Structure
+
+| Module | Purpose |
+|--------|---------|
+| `writer.rs` | Main RrdWriter API |
+| `schema.rs` | RRD schema definitions (DS, RRA) |
+| `key_type.rs` | RRD key parsing and validation |
+| `daemon.rs` | rrdcached daemon client |
+
+## External Dependencies
+
+- **librrd**: RRDtool library (via FFI bindings)
+- **rrdcached**: Optional daemon for batched writes and improved performance
+
+## Testing
+
+Unit tests verify:
+- Schema generation and validation
+- Key parsing for different RRD types (node, VM, storage)
+- RRD file creation and update operations
+- rrdcached client connection and fallback behavior
+
+Run tests with:
+```bash
+cargo test -p pmxcfs-rrd
+```
+
+## References
+
+- **C Implementation**: `src/pmxcfs/status.c` (RRD code embedded)
+- **Related Crates**:
+  - `pmxcfs-status` - Uses RrdWriter for metrics persistence
+  - `pmxcfs` - FUSE `.rrd` plugin reads RRD files
+- **RRDtool Documentation**: https://oss.oetiker.ch/rrdtool/
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
new file mode 100644
index 00000000..58652831
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend.rs
@@ -0,0 +1,67 @@
+/// RRD Backend Trait and Implementations
+///
+/// This module provides an abstraction over different RRD writing mechanisms:
+/// - Daemon-based (via rrdcached) for performance and batching
+/// - Direct file writing for reliability and fallback scenarios
+/// - Fallback composite that tries daemon first, then falls back to direct
+///
+/// This design matches the C implementation's behavior in status.c where
+/// it attempts daemon update first, then falls back to direct file writes.
+use super::schema::RrdSchema;
+use anyhow::Result;
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Trait for RRD backend implementations
+///
+/// Provides abstraction over different RRD writing mechanisms.
+/// All methods are async to support both async (daemon) and sync (direct file) operations.
+#[async_trait]
+pub trait RrdBackend: Send + Sync {
+    /// Update RRD file with new data
+    ///
+    /// # Arguments
+    /// * `file_path` - Full path to the RRD file
+    /// * `data` - Update data in format "timestamp:value1:value2:..."
+    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()>;
+
+    /// Create new RRD file with schema
+    ///
+    /// # Arguments
+    /// * `file_path` - Full path where RRD file should be created
+    /// * `schema` - RRD schema defining data sources and archives
+    /// * `start_timestamp` - Start time for the RRD file (Unix timestamp)
+    async fn create(
+        &mut self,
+        file_path: &Path,
+        schema: &RrdSchema,
+        start_timestamp: i64,
+    ) -> Result<()>;
+
+    /// Flush pending updates to disk
+    ///
+    /// For daemon backends, this sends a FLUSH command.
+    /// For direct backends, this is a no-op (writes are immediate).
+    #[allow(dead_code)] // Used in backend implementations via trait dispatch
+    async fn flush(&mut self) -> Result<()>;
+
+    /// Check if backend is available and healthy
+    ///
+    /// Returns true if the backend can be used for operations.
+    /// For daemon backends, this checks if the connection is alive.
+    /// For direct backends, this always returns true.
+    #[allow(dead_code)] // Used in fallback backend via trait dispatch
+    async fn is_available(&self) -> bool;
+
+    /// Get a human-readable name for this backend
+    fn name(&self) -> &str;
+}
+
+// Backend implementations
+mod backend_daemon;
+mod backend_direct;
+mod backend_fallback;
+
+pub use backend_daemon::RrdCachedBackend;
+pub use backend_direct::RrdDirectBackend;
+pub use backend_fallback::RrdFallbackBackend;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
new file mode 100644
index 00000000..28c1a99a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_daemon.rs
@@ -0,0 +1,214 @@
+/// RRD Backend: rrdcached daemon
+///
+/// Uses rrdcached for batched, high-performance RRD updates.
+/// This is the preferred backend when the daemon is available.
+use super::super::schema::RrdSchema;
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use rrdcached_client::RRDCachedClient;
+use rrdcached_client::consolidation_function::ConsolidationFunction;
+use rrdcached_client::create::{
+    CreateArguments, CreateDataSource, CreateDataSourceType, CreateRoundRobinArchive,
+};
+use std::path::Path;
+
+/// RRD backend using rrdcached daemon
+pub struct RrdCachedBackend {
+    client: RRDCachedClient<tokio::net::UnixStream>,
+}
+
+impl RrdCachedBackend {
+    /// Connect to rrdcached daemon
+    ///
+    /// # Arguments
+    /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock)
+    pub async fn connect(socket_path: &str) -> Result<Self> {
+        let client = RRDCachedClient::connect_unix(socket_path)
+            .await
+            .with_context(|| format!("Failed to connect to rrdcached at {socket_path}"))?;
+
+        tracing::info!("Connected to rrdcached at {}", socket_path);
+
+        Ok(Self { client })
+    }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdCachedBackend {
+    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+        // Parse the update data
+        let parts: Vec<&str> = data.split(':').collect();
+        if parts.len() < 2 {
+            anyhow::bail!("Invalid update data format: {data}");
+        }
+
+        let timestamp = if parts[0] == "N" {
+            None
+        } else {
+            Some(
+                parts[0]
+                    .parse::<usize>()
+                    .with_context(|| format!("Invalid timestamp: {}", parts[0]))?,
+            )
+        };
+
+        let values: Vec<f64> = parts[1..]
+            .iter()
+            .map(|v| {
+                if *v == "U" {
+                    Ok(f64::NAN)
+                } else {
+                    v.parse::<f64>()
+                        .with_context(|| format!("Invalid value: {v}"))
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Get file path without .rrd extension (rrdcached-client adds it)
+        let path_str = file_path.to_string_lossy();
+        let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str);
+
+        // Send update via rrdcached
+        self.client
+            .update(path_without_ext, timestamp, values)
+            .await
+            .with_context(|| format!("rrdcached update failed for {:?}", file_path))?;
+
+        tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data);
+
+        Ok(())
+    }
+
+    async fn create(
+        &mut self,
+        file_path: &Path,
+        schema: &RrdSchema,
+        start_timestamp: i64,
+    ) -> Result<()> {
+        tracing::debug!(
+            "Creating RRD file via daemon: {:?} with {} data sources",
+            file_path,
+            schema.column_count()
+        );
+
+        // Convert our data sources to rrdcached-client CreateDataSource objects
+        let mut data_sources = Vec::new();
+        for ds in &schema.data_sources {
+            let serie_type = match ds.ds_type {
+                "GAUGE" => CreateDataSourceType::Gauge,
+                "DERIVE" => CreateDataSourceType::Derive,
+                "COUNTER" => CreateDataSourceType::Counter,
+                "ABSOLUTE" => CreateDataSourceType::Absolute,
+                _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type),
+            };
+
+            // Parse min/max values
+            let minimum = if ds.min == "U" {
+                None
+            } else {
+                ds.min.parse().ok()
+            };
+            let maximum = if ds.max == "U" {
+                None
+            } else {
+                ds.max.parse().ok()
+            };
+
+            let data_source = CreateDataSource {
+                name: ds.name.to_string(),
+                minimum,
+                maximum,
+                heartbeat: ds.heartbeat as i64,
+                serie_type,
+            };
+
+            data_sources.push(data_source);
+        }
+
+        // Convert our RRA definitions to rrdcached-client CreateRoundRobinArchive objects
+        let mut archives = Vec::new();
+        for rra in &schema.archives {
+            // Parse RRA string: "RRA:AVERAGE:0.5:1:70"
+            let parts: Vec<&str> = rra.split(':').collect();
+            if parts.len() != 5 || parts[0] != "RRA" {
+                anyhow::bail!("Invalid RRA format: {rra}");
+            }
+
+            let consolidation_function = match parts[1] {
+                "AVERAGE" => ConsolidationFunction::Average,
+                "MIN" => ConsolidationFunction::Min,
+                "MAX" => ConsolidationFunction::Max,
+                "LAST" => ConsolidationFunction::Last,
+                _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]),
+            };
+
+            let xfiles_factor: f64 = parts[2]
+                .parse()
+                .with_context(|| format!("Invalid xff in RRA: {rra}"))?;
+            let steps: i64 = parts[3]
+                .parse()
+                .with_context(|| format!("Invalid steps in RRA: {rra}"))?;
+            let rows: i64 = parts[4]
+                .parse()
+                .with_context(|| format!("Invalid rows in RRA: {rra}"))?;
+
+            let archive = CreateRoundRobinArchive {
+                consolidation_function,
+                xfiles_factor,
+                steps,
+                rows,
+            };
+            archives.push(archive);
+        }
+
+        // Get path without .rrd extension (rrdcached-client adds it)
+        let path_str = file_path.to_string_lossy();
+        let path_without_ext = path_str
+            .strip_suffix(".rrd")
+            .unwrap_or(&path_str)
+            .to_string();
+
+        // Create CreateArguments
+        let create_args = CreateArguments {
+            path: path_without_ext,
+            data_sources,
+            round_robin_archives: archives,
+            start_timestamp: start_timestamp as u64,
+            step_seconds: 60, // 60-second step (1 minute resolution)
+        };
+
+        // Validate before sending
+        create_args.validate().context("Invalid CREATE arguments")?;
+
+        // Send CREATE command via rrdcached
+        self.client
+            .create(create_args)
+            .await
+            .with_context(|| format!("Failed to create RRD file via daemon: {file_path:?}"))?;
+
+        tracing::info!("Created RRD file via daemon: {:?} ({})", file_path, schema);
+
+        Ok(())
+    }
+
+    async fn flush(&mut self) -> Result<()> {
+        self.client
+            .flush_all()
+            .await
+            .context("Failed to flush rrdcached")?;
+
+        tracing::debug!("Flushed all pending RRD updates");
+
+        Ok(())
+    }
+
+    async fn is_available(&self) -> bool {
+        // For now, assume we're available if we have a client
+        // Could add a PING command in the future
+        true
+    }
+
+    fn name(&self) -> &str {
+        "rrdcached"
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
new file mode 100644
index 00000000..6be3eb5d
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_direct.rs
@@ -0,0 +1,606 @@
+/// RRD Backend: Direct file writing
+///
+/// Uses the `rrd` crate (librrd bindings) for direct RRD file operations.
+/// This backend is used as a fallback when rrdcached is unavailable.
+///
+/// This matches the C implementation's behavior in status.c:1416-1420 where
+/// it falls back to rrd_update_r() and rrd_create_r() for direct file access.
+use super::super::schema::RrdSchema;
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use std::path::Path;
+use std::time::Duration;
+
+/// RRD backend using direct file operations via librrd
+pub struct RrdDirectBackend {
+    // Currently stateless, but kept as struct for future enhancements
+}
+
+impl RrdDirectBackend {
+    /// Create a new direct file backend
+    pub fn new() -> Self {
+        tracing::info!("Using direct RRD file backend (via librrd)");
+        Self {}
+    }
+}
+
+impl Default for RrdDirectBackend {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdDirectBackend {
+    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+        let path = file_path.to_path_buf();
+        let data_str = data.to_string();
+
+        // Use tokio::task::spawn_blocking for sync rrd operations
+        // This prevents blocking the async runtime
+        tokio::task::spawn_blocking(move || {
+            // Parse the update data to extract timestamp and values
+            // Format: "timestamp:value1:value2:..."
+            let parts: Vec<&str> = data_str.split(':').collect();
+            if parts.is_empty() {
+                anyhow::bail!("Empty update data");
+            }
+
+            // Use rrd::ops::update::update_all_with_timestamp
+            // This is the most direct way to update RRD files
+            let timestamp_str = parts[0];
+            let timestamp: i64 = if timestamp_str == "N" {
+                // "N" means "now" in RRD terminology
+                chrono::Utc::now().timestamp()
+            } else {
+                timestamp_str
+                    .parse()
+                    .with_context(|| format!("Invalid timestamp: {}", timestamp_str))?
+            };
+
+            let timestamp = chrono::DateTime::from_timestamp(timestamp, 0)
+                .ok_or_else(|| anyhow::anyhow!("Invalid timestamp value: {}", timestamp))?;
+
+            // Convert values to Datum
+            let values: Vec<rrd::ops::update::Datum> = parts[1..]
+                .iter()
+                .map(|v| {
+                    if *v == "U" {
+                        // Unknown/unspecified value
+                        rrd::ops::update::Datum::Unspecified
+                    } else if let Ok(int_val) = v.parse::<u64>() {
+                        rrd::ops::update::Datum::Int(int_val)
+                    } else if let Ok(float_val) = v.parse::<f64>() {
+                        rrd::ops::update::Datum::Float(float_val)
+                    } else {
+                        rrd::ops::update::Datum::Unspecified
+                    }
+                })
+                .collect();
+
+            // Perform the update
+            rrd::ops::update::update_all(
+                &path,
+                rrd::ops::update::ExtraFlags::empty(),
+                &[(
+                    rrd::ops::update::BatchTime::Timestamp(timestamp),
+                    values.as_slice(),
+                )],
+            )
+            .with_context(|| format!("Direct RRD update failed for {:?}", path))?;
+
+            tracing::trace!("Updated RRD via direct file: {:?} -> {}", path, data_str);
+
+            Ok::<(), anyhow::Error>(())
+        })
+        .await
+        .context("Failed to spawn blocking task for RRD update")??;
+
+        Ok(())
+    }
+
+    async fn create(
+        &mut self,
+        file_path: &Path,
+        schema: &RrdSchema,
+        start_timestamp: i64,
+    ) -> Result<()> {
+        tracing::debug!(
+            "Creating RRD file via direct: {:?} with {} data sources",
+            file_path,
+            schema.column_count()
+        );
+
+        let path = file_path.to_path_buf();
+        let schema = schema.clone();
+
+        // Ensure parent directory exists
+        if let Some(parent) = path.parent() {
+            std::fs::create_dir_all(parent)
+                .with_context(|| format!("Failed to create directory: {parent:?}"))?;
+        }
+
+        // Use tokio::task::spawn_blocking for sync rrd operations
+        tokio::task::spawn_blocking(move || {
+            // Convert timestamp
+            let start = chrono::DateTime::from_timestamp(start_timestamp, 0)
+                .ok_or_else(|| anyhow::anyhow!("Invalid start timestamp: {}", start_timestamp))?;
+
+            // Convert data sources
+            let data_sources: Vec<rrd::ops::create::DataSource> = schema
+                .data_sources
+                .iter()
+                .map(|ds| {
+                    let name = rrd::ops::create::DataSourceName::new(ds.name);
+
+                    match ds.ds_type {
+                        "GAUGE" => {
+                            let min = if ds.min == "U" {
+                                None
+                            } else {
+                                Some(ds.min.parse().context("Invalid min value")?)
+                            };
+                            let max = if ds.max == "U" {
+                                None
+                            } else {
+                                Some(ds.max.parse().context("Invalid max value")?)
+                            };
+                            Ok(rrd::ops::create::DataSource::gauge(
+                                name,
+                                ds.heartbeat,
+                                min,
+                                max,
+                            ))
+                        }
+                        "DERIVE" => {
+                            let min = if ds.min == "U" {
+                                None
+                            } else {
+                                Some(ds.min.parse().context("Invalid min value")?)
+                            };
+                            let max = if ds.max == "U" {
+                                None
+                            } else {
+                                Some(ds.max.parse().context("Invalid max value")?)
+                            };
+                            Ok(rrd::ops::create::DataSource::derive(
+                                name,
+                                ds.heartbeat,
+                                min,
+                                max,
+                            ))
+                        }
+                        "COUNTER" => {
+                            let min = if ds.min == "U" {
+                                None
+                            } else {
+                                Some(ds.min.parse().context("Invalid min value")?)
+                            };
+                            let max = if ds.max == "U" {
+                                None
+                            } else {
+                                Some(ds.max.parse().context("Invalid max value")?)
+                            };
+                            Ok(rrd::ops::create::DataSource::counter(
+                                name,
+                                ds.heartbeat,
+                                min,
+                                max,
+                            ))
+                        }
+                        "ABSOLUTE" => {
+                            let min = if ds.min == "U" {
+                                None
+                            } else {
+                                Some(ds.min.parse().context("Invalid min value")?)
+                            };
+                            let max = if ds.max == "U" {
+                                None
+                            } else {
+                                Some(ds.max.parse().context("Invalid max value")?)
+                            };
+                            Ok(rrd::ops::create::DataSource::absolute(
+                                name,
+                                ds.heartbeat,
+                                min,
+                                max,
+                            ))
+                        }
+                        _ => anyhow::bail!("Unsupported data source type: {}", ds.ds_type),
+                    }
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert RRAs
+            let archives: Result<Vec<rrd::ops::create::Archive>> = schema
+                .archives
+                .iter()
+                .map(|rra| {
+                    // Parse RRA string: "RRA:AVERAGE:0.5:1:1440"
+                    let parts: Vec<&str> = rra.split(':').collect();
+                    if parts.len() != 5 || parts[0] != "RRA" {
+                        anyhow::bail!("Invalid RRA format: {}", rra);
+                    }
+
+                    let cf = match parts[1] {
+                        "AVERAGE" => rrd::ConsolidationFn::Avg,
+                        "MIN" => rrd::ConsolidationFn::Min,
+                        "MAX" => rrd::ConsolidationFn::Max,
+                        "LAST" => rrd::ConsolidationFn::Last,
+                        _ => anyhow::bail!("Unsupported consolidation function: {}", parts[1]),
+                    };
+
+                    let xff: f64 = parts[2]
+                        .parse()
+                        .with_context(|| format!("Invalid xff in RRA: {}", rra))?;
+                    let steps: u32 = parts[3]
+                        .parse()
+                        .with_context(|| format!("Invalid steps in RRA: {}", rra))?;
+                    let rows: u32 = parts[4]
+                        .parse()
+                        .with_context(|| format!("Invalid rows in RRA: {}", rra))?;
+
+                    rrd::ops::create::Archive::new(cf, xff, steps, rows)
+                        .map_err(|e| anyhow::anyhow!("Failed to create archive: {}", e))
+                })
+                .collect();
+
+            let archives = archives?;
+
+            // Call rrd::ops::create::create
+            rrd::ops::create::create(
+                &path,
+                start,
+                Duration::from_secs(60), // 60-second step
+                false,                   // no_overwrite = false
+                None,                    // template
+                &[],                     // sources
+                data_sources.iter(),
+                archives.iter(),
+            )
+            .with_context(|| format!("Direct RRD create failed for {:?}", path))?;
+
+            tracing::info!("Created RRD file via direct: {:?} ({})", path, schema);
+
+            Ok::<(), anyhow::Error>(())
+        })
+        .await
+        .context("Failed to spawn blocking task for RRD create")??;
+
+        Ok(())
+    }
+
+    async fn flush(&mut self) -> Result<()> {
+        // No-op for direct backend - writes are immediate
+        tracing::trace!("Flush called on direct backend (no-op)");
+        Ok(())
+    }
+
+    async fn is_available(&self) -> bool {
+        // Direct backend is always available (no external dependencies)
+        true
+    }
+
+    fn name(&self) -> &str {
+        "direct"
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::backend::RrdBackend;
+    use crate::schema::{RrdFormat, RrdSchema};
+    use std::path::PathBuf;
+    use tempfile::TempDir;
+
+    // ===== Test Helpers =====
+
+    /// Create a temporary directory for RRD files
+    fn setup_temp_dir() -> TempDir {
+        TempDir::new().expect("Failed to create temp directory")
+    }
+
+    /// Create a test RRD file path
+    fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf {
+        dir.path().join(format!("{}.rrd", name))
+    }
+
+    // ===== RrdDirectBackend Tests =====
+
+    #[tokio::test]
+    async fn test_direct_backend_create_node_rrd() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "node_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let start_time = 1704067200; // 2024-01-01 00:00:00
+
+        // Create RRD file
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(
+            result.is_ok(),
+            "Failed to create node RRD: {:?}",
+            result.err()
+        );
+
+        // Verify file was created
+        assert!(rrd_path.exists(), "RRD file should exist after create");
+
+        // Verify backend name
+        assert_eq!(backend.name(), "direct");
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_create_vm_rrd() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "vm_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let start_time = 1704067200;
+
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(
+            result.is_ok(),
+            "Failed to create VM RRD: {:?}",
+            result.err()
+        );
+        assert!(rrd_path.exists());
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_create_storage_rrd() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "storage_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(
+            result.is_ok(),
+            "Failed to create storage RRD: {:?}",
+            result.err()
+        );
+        assert!(rrd_path.exists());
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_update_with_timestamp() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "update_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        // Create RRD file
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("Failed to create RRD");
+
+        // Update with explicit timestamp and values
+        // Format: "timestamp:value1:value2"
+        let update_data = "1704067260:1000000:500000"; // total=1MB, used=500KB
+        let result = backend.update(&rrd_path, update_data).await;
+
+        assert!(result.is_ok(), "Failed to update RRD: {:?}", result.err());
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_update_with_n_timestamp() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "update_n_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("Failed to create RRD");
+
+        // Update with "N" (current time) timestamp
+        let update_data = "N:2000000:750000";
+        let result = backend.update(&rrd_path, update_data).await;
+
+        assert!(
+            result.is_ok(),
+            "Failed to update RRD with N timestamp: {:?}",
+            result.err()
+        );
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_update_with_unknown_values() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "update_u_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("Failed to create RRD");
+
+        // Update with "U" (unknown) values
+        let update_data = "N:U:1000000"; // total unknown, used known
+        let result = backend.update(&rrd_path, update_data).await;
+
+        assert!(
+            result.is_ok(),
+            "Failed to update RRD with U values: {:?}",
+            result.err()
+        );
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_update_invalid_data() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "invalid_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("Failed to create RRD");
+
+        // Test truly invalid data formats that MUST fail
+        // Note: Invalid values like "abc" are converted to Unspecified (U), which is valid RRD behavior
+        let invalid_cases = vec![
+            "",            // Empty string
+            ":",           // Only separator
+            "timestamp",   // Missing values
+            "N",           // No colon separator
+            "abc:123:456", // Invalid timestamp (not N or integer)
+        ];
+
+        for invalid_data in invalid_cases {
+            let result = backend.update(&rrd_path, invalid_data).await;
+            assert!(
+                result.is_err(),
+                "Update should fail for invalid data: '{}', but got Ok",
+                invalid_data
+            );
+        }
+
+        // Test lenient data formats that succeed (invalid values become Unspecified)
+        // Use explicit timestamps to avoid "same timestamp" errors
+        let mut timestamp = start_time + 60;
+        let lenient_cases = vec![
+            "abc:456", // Invalid first value -> becomes U
+            "123:def", // Invalid second value -> becomes U
+            "U:U",     // All unknown
+        ];
+
+        for valid_data in lenient_cases {
+            let update_data = format!("{}:{}", timestamp, valid_data);
+            let result = backend.update(&rrd_path, &update_data).await;
+            assert!(
+                result.is_ok(),
+                "Update should succeed for lenient data: '{}', but got Err: {:?}",
+                update_data,
+                result.err()
+            );
+            timestamp += 60; // Increment timestamp for next update
+        }
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_update_nonexistent_file() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "nonexistent");
+
+        let mut backend = RrdDirectBackend::new();
+
+        // Try to update a file that doesn't exist
+        let result = backend.update(&rrd_path, "N:100:200").await;
+
+        assert!(result.is_err(), "Update should fail for nonexistent file");
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_flush() {
+        let mut backend = RrdDirectBackend::new();
+
+        // Flush should always succeed for direct backend (no-op)
+        let result = backend.flush().await;
+        assert!(
+            result.is_ok(),
+            "Flush should always succeed for direct backend"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_is_available() {
+        let backend = RrdDirectBackend::new();
+
+        // Direct backend should always be available
+        assert!(
+            backend.is_available().await,
+            "Direct backend should always be available"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_multiple_updates() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "multi_update_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("Failed to create RRD");
+
+        // Perform multiple updates
+        for i in 0..10 {
+            let timestamp = start_time + 60 * (i + 1); // 1 minute intervals
+            let total = 1000000 + (i * 100000);
+            let used = 500000 + (i * 50000);
+            let update_data = format!("{}:{}:{}", timestamp, total, used);
+
+            let result = backend.update(&rrd_path, &update_data).await;
+            assert!(result.is_ok(), "Update {} failed: {:?}", i, result.err());
+        }
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_overwrite_file() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "overwrite_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        // Create file first time
+        backend
+            .create(&rrd_path, &schema, start_time)
+            .await
+            .expect("First create failed");
+
+        // Create same file again - should succeed (overwrites)
+        // Note: librrd create() with no_overwrite=false allows overwriting
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(
+            result.is_ok(),
+            "Creating file again should succeed (overwrite mode): {:?}",
+            result.err()
+        );
+    }
+
+    #[tokio::test]
+    async fn test_direct_backend_large_schema() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "large_schema_test");
+
+        let mut backend = RrdDirectBackend::new();
+        let schema = RrdSchema::node(RrdFormat::Pve9_0); // 19 data sources
+        let start_time = 1704067200;
+
+        // Create RRD with large schema
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(result.is_ok(), "Failed to create RRD with large schema");
+
+        // Update with all values
+        let values = "100:200:50.5:10.2:8000000:4000000:2000000:500000:50000000:25000000:1000000:2000000:6000000:1000000:0.5:1.2:0.8:0.3:0.1";
+        let update_data = format!("N:{}", values);
+
+        let result = backend.update(&rrd_path, &update_data).await;
+        assert!(result.is_ok(), "Failed to update RRD with large schema");
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
new file mode 100644
index 00000000..7d574e5b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/backend/backend_fallback.rs
@@ -0,0 +1,229 @@
+/// RRD Backend: Fallback (Daemon + Direct)
+///
+/// Composite backend that tries daemon first, falls back to direct file writing.
+/// This matches the C implementation's behavior in status.c:1405-1420 where
+/// it attempts rrdc_update() first, then falls back to rrd_update_r().
+use super::super::schema::RrdSchema;
+use super::{RrdCachedBackend, RrdDirectBackend};
+use anyhow::{Context, Result};
+use async_trait::async_trait;
+use std::path::Path;
+
+/// Composite backend that tries daemon first, falls back to direct
+///
+/// This provides the same behavior as the C implementation:
+/// 1. Try to use rrdcached daemon for performance
+/// 2. If daemon fails or is unavailable, fall back to direct file writes
+pub struct RrdFallbackBackend {
+    /// Optional daemon backend (None if daemon is unavailable/failed)
+    daemon: Option<RrdCachedBackend>,
+    /// Direct backend (always available)
+    direct: RrdDirectBackend,
+}
+
+impl RrdFallbackBackend {
+    /// Create a new fallback backend
+    ///
+    /// Attempts to connect to rrdcached daemon. If successful, will prefer daemon.
+    /// If daemon is unavailable, will use direct mode only.
+    ///
+    /// # Arguments
+    /// * `daemon_socket` - Path to rrdcached Unix socket
+    pub async fn new(daemon_socket: &str) -> Self {
+        let daemon = match RrdCachedBackend::connect(daemon_socket).await {
+            Ok(backend) => {
+                tracing::info!("RRD fallback backend: daemon available, will prefer daemon mode");
+                Some(backend)
+            }
+            Err(e) => {
+                tracing::warn!(
+                    "RRD fallback backend: daemon unavailable ({}), using direct mode only",
+                    e
+                );
+                None
+            }
+        };
+
+        let direct = RrdDirectBackend::new();
+
+        Self { daemon, direct }
+    }
+
+    /// Create a fallback backend with explicit daemon and direct backends
+    ///
+    /// Useful for testing or custom configurations
+    #[allow(dead_code)] // Used in tests for custom backend configurations
+    pub fn with_backends(daemon: Option<RrdCachedBackend>, direct: RrdDirectBackend) -> Self {
+        Self { daemon, direct }
+    }
+
+    /// Check if daemon is currently being used
+    #[allow(dead_code)] // Used for debugging/monitoring daemon status
+    pub fn is_using_daemon(&self) -> bool {
+        self.daemon.is_some()
+    }
+
+    /// Disable daemon mode and switch to direct mode only
+    ///
+    /// Called automatically when daemon operations fail
+    fn disable_daemon(&mut self) {
+        if self.daemon.is_some() {
+            tracing::warn!("Disabling daemon mode, switching to direct file writes");
+            self.daemon = None;
+        }
+    }
+}
+
+#[async_trait]
+impl super::super::backend::RrdBackend for RrdFallbackBackend {
+    async fn update(&mut self, file_path: &Path, data: &str) -> Result<()> {
+        // Try daemon first if available
+        if let Some(daemon) = &mut self.daemon {
+            match daemon.update(file_path, data).await {
+                Ok(()) => {
+                    tracing::trace!("Updated RRD via daemon (fallback backend)");
+                    return Ok(());
+                }
+                Err(e) => {
+                    tracing::warn!("Daemon update failed, falling back to direct: {}", e);
+                    self.disable_daemon();
+                }
+            }
+        }
+
+        // Fallback to direct
+        self.direct
+            .update(file_path, data)
+            .await
+            .context("Both daemon and direct update failed")
+    }
+
+    async fn create(
+        &mut self,
+        file_path: &Path,
+        schema: &RrdSchema,
+        start_timestamp: i64,
+    ) -> Result<()> {
+        // Try daemon first if available
+        if let Some(daemon) = &mut self.daemon {
+            match daemon.create(file_path, schema, start_timestamp).await {
+                Ok(()) => {
+                    tracing::trace!("Created RRD via daemon (fallback backend)");
+                    return Ok(());
+                }
+                Err(e) => {
+                    tracing::warn!("Daemon create failed, falling back to direct: {}", e);
+                    self.disable_daemon();
+                }
+            }
+        }
+
+        // Fallback to direct
+        self.direct
+            .create(file_path, schema, start_timestamp)
+            .await
+            .context("Both daemon and direct create failed")
+    }
+
+    async fn flush(&mut self) -> Result<()> {
+        // Only flush if using daemon
+        if let Some(daemon) = &mut self.daemon {
+            match daemon.flush().await {
+                Ok(()) => return Ok(()),
+                Err(e) => {
+                    tracing::warn!("Daemon flush failed: {}", e);
+                    self.disable_daemon();
+                }
+            }
+        }
+
+        // Direct backend flush is a no-op
+        self.direct.flush().await
+    }
+
+    async fn is_available(&self) -> bool {
+        // Always available - either daemon or direct will work
+        true
+    }
+
+    fn name(&self) -> &str {
+        if self.daemon.is_some() {
+            "fallback(daemon+direct)"
+        } else {
+            "fallback(direct-only)"
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::backend::RrdBackend;
+    use crate::schema::{RrdFormat, RrdSchema};
+    use std::path::PathBuf;
+    use tempfile::TempDir;
+
+    /// Create a temporary directory for RRD files
+    fn setup_temp_dir() -> TempDir {
+        TempDir::new().expect("Failed to create temp directory")
+    }
+
+    /// Create a test RRD file path
+    fn test_rrd_path(dir: &TempDir, name: &str) -> PathBuf {
+        dir.path().join(format!("{}.rrd", name))
+    }
+
+    #[test]
+    fn test_fallback_backend_without_daemon() {
+        let direct = RrdDirectBackend::new();
+        let backend = RrdFallbackBackend::with_backends(None, direct);
+
+        assert!(!backend.is_using_daemon());
+        assert_eq!(backend.name(), "fallback(direct-only)");
+    }
+
+    #[tokio::test]
+    async fn test_fallback_backend_direct_mode_operations() {
+        let temp_dir = setup_temp_dir();
+        let rrd_path = test_rrd_path(&temp_dir, "fallback_test");
+
+        // Create fallback backend without daemon (direct mode only)
+        let direct = RrdDirectBackend::new();
+        let mut backend = RrdFallbackBackend::with_backends(None, direct);
+
+        assert!(!backend.is_using_daemon(), "Should not be using daemon");
+        assert_eq!(backend.name(), "fallback(direct-only)");
+
+        // Test create and update operations work in direct mode
+        let schema = RrdSchema::storage(RrdFormat::Pve2);
+        let start_time = 1704067200;
+
+        let result = backend.create(&rrd_path, &schema, start_time).await;
+        assert!(result.is_ok(), "Create should work in direct mode");
+
+        let result = backend.update(&rrd_path, "N:1000:500").await;
+        assert!(result.is_ok(), "Update should work in direct mode");
+    }
+
+    #[tokio::test]
+    async fn test_fallback_backend_is_always_available() {
+        let direct = RrdDirectBackend::new();
+        let backend = RrdFallbackBackend::with_backends(None, direct);
+
+        // Fallback backend should always be available (even without daemon)
+        assert!(
+            backend.is_available().await,
+            "Fallback backend should always be available"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_fallback_backend_flush_without_daemon() {
+        let direct = RrdDirectBackend::new();
+        let mut backend = RrdFallbackBackend::with_backends(None, direct);
+
+        // Flush should succeed even without daemon (no-op for direct)
+        let result = backend.flush().await;
+        assert!(result.is_ok(), "Flush should succeed without daemon");
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
new file mode 100644
index 00000000..e53b6dad
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/daemon.rs
@@ -0,0 +1,140 @@
+/// RRDCached Daemon Client (wrapper around rrdcached-client crate)
+///
+/// This module provides a thin wrapper around the rrdcached-client crate.
+use anyhow::{Context, Result};
+use std::path::Path;
+
+/// Wrapper around rrdcached-client
+#[allow(dead_code)] // Used in backend_daemon.rs via module-level access
+pub struct RrdCachedClient {
+    pub(crate) client:
+        tokio::sync::Mutex<rrdcached_client::RRDCachedClient<tokio::net::UnixStream>>,
+}
+
+impl RrdCachedClient {
+    /// Connect to rrdcached daemon via Unix socket
+    ///
+    /// # Arguments
+    /// * `socket_path` - Path to rrdcached Unix socket (default: /var/run/rrdcached.sock)
+    #[allow(dead_code)] // Used via backend modules
+    pub async fn connect<P: AsRef<Path>>(socket_path: P) -> Result<Self> {
+        let socket_path = socket_path.as_ref().to_string_lossy().to_string();
+
+        tracing::debug!("Connecting to rrdcached at {}", socket_path);
+
+        // Connect to daemon (async operation)
+        let client = rrdcached_client::RRDCachedClient::connect_unix(&socket_path)
+            .await
+            .with_context(|| format!("Failed to connect to rrdcached: {socket_path}"))?;
+
+        tracing::info!("Connected to rrdcached at {}", socket_path);
+
+        Ok(Self {
+            client: tokio::sync::Mutex::new(client),
+        })
+    }
+
+    /// Update RRD file via rrdcached
+    ///
+    /// # Arguments
+    /// * `file_path` - Full path to RRD file
+    /// * `data` - Update data in format "timestamp:value1:value2:..."
+    #[allow(dead_code)] // Used via backend modules
+    pub async fn update<P: AsRef<Path>>(&self, file_path: P, data: &str) -> Result<()> {
+        let file_path = file_path.as_ref();
+
+        // Parse the update data
+        let parts: Vec<&str> = data.split(':').collect();
+        if parts.len() < 2 {
+            anyhow::bail!("Invalid update data format: {data}");
+        }
+
+        let timestamp = if parts[0] == "N" {
+            None
+        } else {
+            Some(
+                parts[0]
+                    .parse::<usize>()
+                    .with_context(|| format!("Invalid timestamp: {}", parts[0]))?,
+            )
+        };
+
+        let values: Vec<f64> = parts[1..]
+            .iter()
+            .map(|v| {
+                if *v == "U" {
+                    Ok(f64::NAN)
+                } else {
+                    v.parse::<f64>()
+                        .with_context(|| format!("Invalid value: {v}"))
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Get file path without .rrd extension (rrdcached-client adds it)
+        let path_str = file_path.to_string_lossy();
+        let path_without_ext = path_str.strip_suffix(".rrd").unwrap_or(&path_str);
+
+        // Send update via rrdcached
+        let mut client = self.client.lock().await;
+        client
+            .update(path_without_ext, timestamp, values)
+            .await
+            .context("Failed to send update to rrdcached")?;
+
+        tracing::trace!("Updated RRD via daemon: {:?} -> {}", file_path, data);
+
+        Ok(())
+    }
+
+    /// Create RRD file via rrdcached
+    #[allow(dead_code)] // Used via backend modules
+    pub async fn create(&self, args: rrdcached_client::create::CreateArguments) -> Result<()> {
+        let mut client = self.client.lock().await;
+        client
+            .create(args)
+            .await
+            .context("Failed to create RRD via rrdcached")?;
+        Ok(())
+    }
+
+    /// Flush all pending updates
+    #[allow(dead_code)] // Used via backend modules
+    pub async fn flush(&self) -> Result<()> {
+        let mut client = self.client.lock().await;
+        client
+            .flush_all()
+            .await
+            .context("Failed to flush rrdcached")?;
+
+        tracing::debug!("Flushed all RRD files");
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    #[ignore] // Only runs if rrdcached daemon is actually running
+    async fn test_connect_to_daemon() {
+        // This test requires a running rrdcached daemon
+        let result = RrdCachedClient::connect("/var/run/rrdcached.sock").await;
+
+        match result {
+            Ok(client) => {
+                // Try to flush (basic connectivity test)
+                let result = client.flush().await;
+                println!("RRDCached flush result: {:?}", result);
+
+                // Connection successful (flush may fail if no files, that's OK)
+                assert!(result.is_ok() || result.is_err());
+            }
+            Err(e) => {
+                println!("Note: rrdcached not running (expected in test env): {}", e);
+            }
+        }
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
new file mode 100644
index 00000000..54021c14
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/key_type.rs
@@ -0,0 +1,313 @@
+/// RRD Key Type Parsing and Path Resolution
+///
+/// This module handles parsing RRD status update keys and mapping them
+/// to the appropriate file paths and schemas.
+use anyhow::{Context, Result};
+use std::path::{Path, PathBuf};
+
+use super::schema::{RrdFormat, RrdSchema};
+
+/// RRD key types for routing to correct schema and path
+///
+/// This enum represents the different types of RRD metrics that pmxcfs tracks:
+/// - Node metrics (CPU, memory, network for a node)
+/// - VM metrics (CPU, memory, disk, network for a VM/CT)
+/// - Storage metrics (total/used space for a storage)
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub(crate) enum RrdKeyType {
+    /// Node metrics: pve2-node/{nodename} or pve-node-9.0/{nodename}
+    Node { nodename: String, format: RrdFormat },
+    /// VM metrics: pve2.3-vm/{vmid} or pve-vm-9.0/{vmid}
+    Vm { vmid: String, format: RrdFormat },
+    /// Storage metrics: pve2-storage/{node}/{storage} or pve-storage-9.0/{node}/{storage}
+    Storage {
+        nodename: String,
+        storage: String,
+        format: RrdFormat,
+    },
+}
+
+impl RrdKeyType {
+    /// Parse RRD key from status update key
+    ///
+    /// Supported formats:
+    /// - "pve2-node/node1" → Node { nodename: "node1", format: Pve2 }
+    /// - "pve-node-9.0/node1" → Node { nodename: "node1", format: Pve9_0 }
+    /// - "pve2.3-vm/100" → Vm { vmid: "100", format: Pve2 }
+    /// - "pve-storage-9.0/node1/local" → Storage { nodename: "node1", storage: "local", format: Pve9_0 }
+    pub(crate) fn parse(key: &str) -> Result<Self> {
+        let parts: Vec<&str> = key.split('/').collect();
+
+        if parts.is_empty() {
+            anyhow::bail!("Empty RRD key");
+        }
+
+        match parts[0] {
+            "pve2-node" => {
+                let nodename = parts.get(1).context("Missing nodename")?.to_string();
+                Ok(RrdKeyType::Node {
+                    nodename,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-node-") => {
+                let nodename = parts.get(1).context("Missing nodename")?.to_string();
+                Ok(RrdKeyType::Node {
+                    nodename,
+                    format: RrdFormat::Pve9_0,
+                })
+            }
+            "pve2.3-vm" => {
+                let vmid = parts.get(1).context("Missing vmid")?.to_string();
+                Ok(RrdKeyType::Vm {
+                    vmid,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-vm-") => {
+                let vmid = parts.get(1).context("Missing vmid")?.to_string();
+                Ok(RrdKeyType::Vm {
+                    vmid,
+                    format: RrdFormat::Pve9_0,
+                })
+            }
+            "pve2-storage" => {
+                let nodename = parts.get(1).context("Missing nodename")?.to_string();
+                let storage = parts.get(2).context("Missing storage")?.to_string();
+                Ok(RrdKeyType::Storage {
+                    nodename,
+                    storage,
+                    format: RrdFormat::Pve2,
+                })
+            }
+            prefix if prefix.starts_with("pve-storage-") => {
+                let nodename = parts.get(1).context("Missing nodename")?.to_string();
+                let storage = parts.get(2).context("Missing storage")?.to_string();
+                Ok(RrdKeyType::Storage {
+                    nodename,
+                    storage,
+                    format: RrdFormat::Pve9_0,
+                })
+            }
+            _ => anyhow::bail!("Unknown RRD key format: {key}"),
+        }
+    }
+
+    /// Get the RRD file path for this key type
+    ///
+    /// Always returns paths using the current format (9.0), regardless of the input format.
+    /// This enables transparent format migration: old PVE8 nodes can send `pve2-node/` keys,
+    /// and they'll be written to `pve-node-9.0/` files automatically.
+    ///
+    /// # Format Migration Strategy
+    ///
+    /// The C implementation always creates files in the current format directory
+    /// (see status.c:1287). This Rust implementation follows the same approach:
+    /// - Input: `pve2-node/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
+    /// - Input: `pve-node-9.0/node1` → Output: `/var/lib/rrdcached/db/pve-node-9.0/node1`
+    ///
+    /// This allows rolling upgrades where old and new nodes coexist in the same cluster.
+    pub(crate) fn file_path(&self, base_dir: &Path) -> PathBuf {
+        match self {
+            RrdKeyType::Node { nodename, .. } => {
+                // Always use current format path
+                base_dir.join("pve-node-9.0").join(nodename)
+            }
+            RrdKeyType::Vm { vmid, .. } => {
+                // Always use current format path
+                base_dir.join("pve-vm-9.0").join(vmid)
+            }
+            RrdKeyType::Storage {
+                nodename, storage, ..
+            } => {
+                // Always use current format path
+                base_dir
+                    .join("pve-storage-9.0")
+                    .join(nodename)
+                    .join(storage)
+            }
+        }
+    }
+
+    /// Get the source format from the input key
+    ///
+    /// This is used for data transformation (padding/truncation).
+    pub(crate) fn source_format(&self) -> RrdFormat {
+        match self {
+            RrdKeyType::Node { format, .. }
+            | RrdKeyType::Vm { format, .. }
+            | RrdKeyType::Storage { format, .. } => *format,
+        }
+    }
+
+    /// Get the target RRD schema (always current format)
+    ///
+    /// Files are always created using the current format (Pve9_0),
+    /// regardless of the source format in the key.
+    pub(crate) fn schema(&self) -> RrdSchema {
+        match self {
+            RrdKeyType::Node { .. } => RrdSchema::node(RrdFormat::Pve9_0),
+            RrdKeyType::Vm { .. } => RrdSchema::vm(RrdFormat::Pve9_0),
+            RrdKeyType::Storage { .. } => RrdSchema::storage(RrdFormat::Pve9_0),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_parse_node_keys() {
+        let key = RrdKeyType::parse("pve2-node/testnode").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Node {
+                nodename: "testnode".to_string(),
+                format: RrdFormat::Pve2
+            }
+        );
+
+        let key = RrdKeyType::parse("pve-node-9.0/testnode").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Node {
+                nodename: "testnode".to_string(),
+                format: RrdFormat::Pve9_0
+            }
+        );
+    }
+
+    #[test]
+    fn test_parse_vm_keys() {
+        let key = RrdKeyType::parse("pve2.3-vm/100").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Vm {
+                vmid: "100".to_string(),
+                format: RrdFormat::Pve2
+            }
+        );
+
+        let key = RrdKeyType::parse("pve-vm-9.0/100").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Vm {
+                vmid: "100".to_string(),
+                format: RrdFormat::Pve9_0
+            }
+        );
+    }
+
+    #[test]
+    fn test_parse_storage_keys() {
+        let key = RrdKeyType::parse("pve2-storage/node1/local").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Storage {
+                nodename: "node1".to_string(),
+                storage: "local".to_string(),
+                format: RrdFormat::Pve2
+            }
+        );
+
+        let key = RrdKeyType::parse("pve-storage-9.0/node1/local").unwrap();
+        assert_eq!(
+            key,
+            RrdKeyType::Storage {
+                nodename: "node1".to_string(),
+                storage: "local".to_string(),
+                format: RrdFormat::Pve9_0
+            }
+        );
+    }
+
+    #[test]
+    fn test_file_paths() {
+        let base = Path::new("/var/lib/rrdcached/db");
+
+        // New format key → new format path
+        let key = RrdKeyType::Node {
+            nodename: "node1".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        assert_eq!(
+            key.file_path(base),
+            PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1")
+        );
+
+        // Old format key → new format path (auto-upgrade!)
+        let key = RrdKeyType::Node {
+            nodename: "node1".to_string(),
+            format: RrdFormat::Pve2,
+        };
+        assert_eq!(
+            key.file_path(base),
+            PathBuf::from("/var/lib/rrdcached/db/pve-node-9.0/node1"),
+            "Old format keys should create new format files"
+        );
+
+        // VM: Old format → new format
+        let key = RrdKeyType::Vm {
+            vmid: "100".to_string(),
+            format: RrdFormat::Pve2,
+        };
+        assert_eq!(
+            key.file_path(base),
+            PathBuf::from("/var/lib/rrdcached/db/pve-vm-9.0/100"),
+            "Old VM format should upgrade to new format"
+        );
+
+        // Storage: Always uses current format
+        let key = RrdKeyType::Storage {
+            nodename: "node1".to_string(),
+            storage: "local".to_string(),
+            format: RrdFormat::Pve2,
+        };
+        assert_eq!(
+            key.file_path(base),
+            PathBuf::from("/var/lib/rrdcached/db/pve-storage-9.0/node1/local"),
+            "Old storage format should upgrade to new format"
+        );
+    }
+
+    #[test]
+    fn test_source_format() {
+        let key = RrdKeyType::Node {
+            nodename: "node1".to_string(),
+            format: RrdFormat::Pve2,
+        };
+        assert_eq!(key.source_format(), RrdFormat::Pve2);
+
+        let key = RrdKeyType::Vm {
+            vmid: "100".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        assert_eq!(key.source_format(), RrdFormat::Pve9_0);
+    }
+
+    #[test]
+    fn test_schema_always_current_format() {
+        // Even with Pve2 source format, schema should return Pve9_0
+        let key = RrdKeyType::Node {
+            nodename: "node1".to_string(),
+            format: RrdFormat::Pve2,
+        };
+        let schema = key.schema();
+        assert_eq!(
+            schema.format,
+            RrdFormat::Pve9_0,
+            "Schema should always use current format"
+        );
+        assert_eq!(schema.column_count(), 19, "Should have Pve9_0 column count");
+
+        // Pve9_0 source also gets Pve9_0 schema
+        let key = RrdKeyType::Node {
+            nodename: "node1".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        let schema = key.schema();
+        assert_eq!(schema.format, RrdFormat::Pve9_0);
+        assert_eq!(schema.column_count(), 19);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
new file mode 100644
index 00000000..7a439676
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/lib.rs
@@ -0,0 +1,21 @@
+/// RRD (Round-Robin Database) Persistence Module
+///
+/// This module provides RRD file persistence compatible with the C pmxcfs implementation.
+/// It handles:
+/// - RRD file creation with proper schemas (node, VM, storage)
+/// - RRD file updates (writing metrics to disk)
+/// - Multiple backend strategies:
+///   - Daemon mode: High-performance batched updates via rrdcached
+///   - Direct mode: Reliable fallback using direct file writes
+///   - Fallback mode: Tries daemon first, falls back to direct (matches C behavior)
+/// - Version management (pve2 vs pve-9.0 formats)
+///
+/// The implementation matches the C behavior in status.c where it attempts
+/// daemon updates first, then falls back to direct file operations.
+mod backend;
+mod daemon;
+mod key_type;
+pub(crate) mod schema;
+mod writer;
+
+pub use writer::RrdWriter;
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
new file mode 100644
index 00000000..d449bd6e
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/schema.rs
@@ -0,0 +1,577 @@
+/// RRD Schema Definitions
+///
+/// Defines RRD database schemas matching the C pmxcfs implementation.
+/// Each schema specifies data sources (DS) and round-robin archives (RRA).
+use std::fmt;
+
+/// RRD format version
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum RrdFormat {
+    /// Legacy pve2 format (12 columns for node, 10 for VM, 2 for storage)
+    Pve2,
+    /// New pve-9.0 format (19 columns for node, 17 for VM, 2 for storage)
+    Pve9_0,
+}
+
+/// RRD data source definition
+#[derive(Debug, Clone)]
+pub struct RrdDataSource {
+    /// Data source name
+    pub name: &'static str,
+    /// Data source type (GAUGE, COUNTER, DERIVE, ABSOLUTE)
+    pub ds_type: &'static str,
+    /// Heartbeat (seconds before marking as unknown)
+    pub heartbeat: u32,
+    /// Minimum value (U for unknown)
+    pub min: &'static str,
+    /// Maximum value (U for unknown)
+    pub max: &'static str,
+}
+
+impl RrdDataSource {
+    /// Create GAUGE data source with no min/max limits
+    pub(super) const fn gauge(name: &'static str) -> Self {
+        Self {
+            name,
+            ds_type: "GAUGE",
+            heartbeat: 120,
+            min: "0",
+            max: "U",
+        }
+    }
+
+    /// Create DERIVE data source (for counters that can wrap)
+    pub(super) const fn derive(name: &'static str) -> Self {
+        Self {
+            name,
+            ds_type: "DERIVE",
+            heartbeat: 120,
+            min: "0",
+            max: "U",
+        }
+    }
+
+    /// Format as RRD command line argument
+    ///
+    /// Matches C implementation format: "DS:name:TYPE:heartbeat:min:max"
+    /// (see rrd_def_node in src/pmxcfs/status.c:1100)
+    ///
+    /// Currently unused but kept for debugging/testing and C format compatibility.
+    #[allow(dead_code)]
+    pub(super) fn to_arg(&self) -> String {
+        format!(
+            "DS:{}:{}:{}:{}:{}",
+            self.name, self.ds_type, self.heartbeat, self.min, self.max
+        )
+    }
+}
+
+/// RRD schema with data sources and archives
+#[derive(Debug, Clone)]
+pub struct RrdSchema {
+    /// RRD format version
+    pub format: RrdFormat,
+    /// Data sources
+    pub data_sources: Vec<RrdDataSource>,
+    /// Round-robin archives (RRA definitions)
+    pub archives: Vec<String>,
+}
+
+impl RrdSchema {
+    /// Create node RRD schema
+    pub fn node(format: RrdFormat) -> Self {
+        let data_sources = match format {
+            RrdFormat::Pve2 => vec![
+                RrdDataSource::gauge("loadavg"),
+                RrdDataSource::gauge("maxcpu"),
+                RrdDataSource::gauge("cpu"),
+                RrdDataSource::gauge("iowait"),
+                RrdDataSource::gauge("memtotal"),
+                RrdDataSource::gauge("memused"),
+                RrdDataSource::gauge("swaptotal"),
+                RrdDataSource::gauge("swapused"),
+                RrdDataSource::gauge("roottotal"),
+                RrdDataSource::gauge("rootused"),
+                RrdDataSource::derive("netin"),
+                RrdDataSource::derive("netout"),
+            ],
+            RrdFormat::Pve9_0 => vec![
+                RrdDataSource::gauge("loadavg"),
+                RrdDataSource::gauge("maxcpu"),
+                RrdDataSource::gauge("cpu"),
+                RrdDataSource::gauge("iowait"),
+                RrdDataSource::gauge("memtotal"),
+                RrdDataSource::gauge("memused"),
+                RrdDataSource::gauge("swaptotal"),
+                RrdDataSource::gauge("swapused"),
+                RrdDataSource::gauge("roottotal"),
+                RrdDataSource::gauge("rootused"),
+                RrdDataSource::derive("netin"),
+                RrdDataSource::derive("netout"),
+                RrdDataSource::gauge("memavailable"),
+                RrdDataSource::gauge("arcsize"),
+                RrdDataSource::gauge("pressurecpusome"),
+                RrdDataSource::gauge("pressureiosome"),
+                RrdDataSource::gauge("pressureiofull"),
+                RrdDataSource::gauge("pressurememorysome"),
+                RrdDataSource::gauge("pressurememoryfull"),
+            ],
+        };
+
+        Self {
+            format,
+            data_sources,
+            archives: Self::default_archives(),
+        }
+    }
+
+    /// Create VM RRD schema
+    pub fn vm(format: RrdFormat) -> Self {
+        let data_sources = match format {
+            RrdFormat::Pve2 => vec![
+                RrdDataSource::gauge("maxcpu"),
+                RrdDataSource::gauge("cpu"),
+                RrdDataSource::gauge("maxmem"),
+                RrdDataSource::gauge("mem"),
+                RrdDataSource::gauge("maxdisk"),
+                RrdDataSource::gauge("disk"),
+                RrdDataSource::derive("netin"),
+                RrdDataSource::derive("netout"),
+                RrdDataSource::derive("diskread"),
+                RrdDataSource::derive("diskwrite"),
+            ],
+            RrdFormat::Pve9_0 => vec![
+                RrdDataSource::gauge("maxcpu"),
+                RrdDataSource::gauge("cpu"),
+                RrdDataSource::gauge("maxmem"),
+                RrdDataSource::gauge("mem"),
+                RrdDataSource::gauge("maxdisk"),
+                RrdDataSource::gauge("disk"),
+                RrdDataSource::derive("netin"),
+                RrdDataSource::derive("netout"),
+                RrdDataSource::derive("diskread"),
+                RrdDataSource::derive("diskwrite"),
+                RrdDataSource::gauge("memhost"),
+                RrdDataSource::gauge("pressurecpusome"),
+                RrdDataSource::gauge("pressurecpufull"),
+                RrdDataSource::gauge("pressureiosome"),
+                RrdDataSource::gauge("pressureiofull"),
+                RrdDataSource::gauge("pressurememorysome"),
+                RrdDataSource::gauge("pressurememoryfull"),
+            ],
+        };
+
+        Self {
+            format,
+            data_sources,
+            archives: Self::default_archives(),
+        }
+    }
+
+    /// Create storage RRD schema
+    pub fn storage(format: RrdFormat) -> Self {
+        let data_sources = vec![RrdDataSource::gauge("total"), RrdDataSource::gauge("used")];
+
+        Self {
+            format,
+            data_sources,
+            archives: Self::default_archives(),
+        }
+    }
+
+    /// Default RRA (Round-Robin Archive) definitions
+    ///
+    /// These match the C implementation's archives for 60-second step size:
+    /// - RRA:AVERAGE:0.5:1:1440      -> 1 min * 1440 => 1 day
+    /// - RRA:AVERAGE:0.5:30:1440     -> 30 min * 1440 => 30 days
+    /// - RRA:AVERAGE:0.5:360:1440    -> 6 hours * 1440 => 360 days (~1 year)
+    /// - RRA:AVERAGE:0.5:10080:570   -> 1 week * 570 => ~10 years
+    /// - RRA:MAX:0.5:1:1440          -> 1 min * 1440 => 1 day
+    /// - RRA:MAX:0.5:30:1440         -> 30 min * 1440 => 30 days
+    /// - RRA:MAX:0.5:360:1440        -> 6 hours * 1440 => 360 days (~1 year)
+    /// - RRA:MAX:0.5:10080:570       -> 1 week * 570 => ~10 years
+    pub(super) fn default_archives() -> Vec<String> {
+        vec![
+            "RRA:AVERAGE:0.5:1:1440".to_string(),
+            "RRA:AVERAGE:0.5:30:1440".to_string(),
+            "RRA:AVERAGE:0.5:360:1440".to_string(),
+            "RRA:AVERAGE:0.5:10080:570".to_string(),
+            "RRA:MAX:0.5:1:1440".to_string(),
+            "RRA:MAX:0.5:30:1440".to_string(),
+            "RRA:MAX:0.5:360:1440".to_string(),
+            "RRA:MAX:0.5:10080:570".to_string(),
+        ]
+    }
+
+    /// Get number of data sources
+    pub fn column_count(&self) -> usize {
+        self.data_sources.len()
+    }
+}
+
+impl fmt::Display for RrdSchema {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(
+            f,
+            "{:?} schema with {} data sources",
+            self.format,
+            self.column_count()
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn assert_ds_properties(
+        ds: &RrdDataSource,
+        expected_name: &str,
+        expected_type: &str,
+        index: usize,
+    ) {
+        assert_eq!(ds.name, expected_name, "DS[{}] name mismatch", index);
+        assert_eq!(ds.ds_type, expected_type, "DS[{}] type mismatch", index);
+        assert_eq!(ds.heartbeat, 120, "DS[{}] heartbeat should be 120", index);
+        assert_eq!(ds.min, "0", "DS[{}] min should be 0", index);
+        assert_eq!(ds.max, "U", "DS[{}] max should be U", index);
+    }
+
+    #[test]
+    fn test_datasource_construction() {
+        let gauge_ds = RrdDataSource::gauge("cpu");
+        assert_eq!(gauge_ds.name, "cpu");
+        assert_eq!(gauge_ds.ds_type, "GAUGE");
+        assert_eq!(gauge_ds.heartbeat, 120);
+        assert_eq!(gauge_ds.min, "0");
+        assert_eq!(gauge_ds.max, "U");
+        assert_eq!(gauge_ds.to_arg(), "DS:cpu:GAUGE:120:0:U");
+
+        let derive_ds = RrdDataSource::derive("netin");
+        assert_eq!(derive_ds.name, "netin");
+        assert_eq!(derive_ds.ds_type, "DERIVE");
+        assert_eq!(derive_ds.heartbeat, 120);
+        assert_eq!(derive_ds.min, "0");
+        assert_eq!(derive_ds.max, "U");
+        assert_eq!(derive_ds.to_arg(), "DS:netin:DERIVE:120:0:U");
+    }
+
+    #[test]
+    fn test_node_schema_pve2() {
+        let schema = RrdSchema::node(RrdFormat::Pve2);
+
+        assert_eq!(schema.column_count(), 12);
+        assert_eq!(schema.format, RrdFormat::Pve2);
+
+        let expected_ds = vec![
+            ("loadavg", "GAUGE"),
+            ("maxcpu", "GAUGE"),
+            ("cpu", "GAUGE"),
+            ("iowait", "GAUGE"),
+            ("memtotal", "GAUGE"),
+            ("memused", "GAUGE"),
+            ("swaptotal", "GAUGE"),
+            ("swapused", "GAUGE"),
+            ("roottotal", "GAUGE"),
+            ("rootused", "GAUGE"),
+            ("netin", "DERIVE"),
+            ("netout", "DERIVE"),
+        ];
+
+        for (i, (name, ds_type)) in expected_ds.iter().enumerate() {
+            assert_ds_properties(&schema.data_sources[i], name, ds_type, i);
+        }
+    }
+
+    #[test]
+    fn test_node_schema_pve9() {
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+
+        assert_eq!(schema.column_count(), 19);
+        assert_eq!(schema.format, RrdFormat::Pve9_0);
+
+        let pve2_schema = RrdSchema::node(RrdFormat::Pve2);
+        for i in 0..12 {
+            assert_eq!(
+                schema.data_sources[i].name, pve2_schema.data_sources[i].name,
+                "First 12 DS should match pve2"
+            );
+            assert_eq!(
+                schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type,
+                "First 12 DS types should match pve2"
+            );
+        }
+
+        let pve9_additions = vec![
+            ("memavailable", "GAUGE"),
+            ("arcsize", "GAUGE"),
+            ("pressurecpusome", "GAUGE"),
+            ("pressureiosome", "GAUGE"),
+            ("pressureiofull", "GAUGE"),
+            ("pressurememorysome", "GAUGE"),
+            ("pressurememoryfull", "GAUGE"),
+        ];
+
+        for (i, (name, ds_type)) in pve9_additions.iter().enumerate() {
+            assert_ds_properties(&schema.data_sources[12 + i], name, ds_type, 12 + i);
+        }
+    }
+
+    #[test]
+    fn test_vm_schema_pve2() {
+        let schema = RrdSchema::vm(RrdFormat::Pve2);
+
+        assert_eq!(schema.column_count(), 10);
+        assert_eq!(schema.format, RrdFormat::Pve2);
+
+        let expected_ds = vec![
+            ("maxcpu", "GAUGE"),
+            ("cpu", "GAUGE"),
+            ("maxmem", "GAUGE"),
+            ("mem", "GAUGE"),
+            ("maxdisk", "GAUGE"),
+            ("disk", "GAUGE"),
+            ("netin", "DERIVE"),
+            ("netout", "DERIVE"),
+            ("diskread", "DERIVE"),
+            ("diskwrite", "DERIVE"),
+        ];
+
+        for (i, (name, ds_type)) in expected_ds.iter().enumerate() {
+            assert_ds_properties(&schema.data_sources[i], name, ds_type, i);
+        }
+    }
+
+    #[test]
+    fn test_vm_schema_pve9() {
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+
+        assert_eq!(schema.column_count(), 17);
+        assert_eq!(schema.format, RrdFormat::Pve9_0);
+
+        let pve2_schema = RrdSchema::vm(RrdFormat::Pve2);
+        for i in 0..10 {
+            assert_eq!(
+                schema.data_sources[i].name, pve2_schema.data_sources[i].name,
+                "First 10 DS should match pve2"
+            );
+            assert_eq!(
+                schema.data_sources[i].ds_type, pve2_schema.data_sources[i].ds_type,
+                "First 10 DS types should match pve2"
+            );
+        }
+
+        let pve9_additions = vec![
+            ("memhost", "GAUGE"),
+            ("pressurecpusome", "GAUGE"),
+            ("pressurecpufull", "GAUGE"),
+            ("pressureiosome", "GAUGE"),
+            ("pressureiofull", "GAUGE"),
+            ("pressurememorysome", "GAUGE"),
+            ("pressurememoryfull", "GAUGE"),
+        ];
+
+        for (i, (name, ds_type)) in pve9_additions.iter().enumerate() {
+            assert_ds_properties(&schema.data_sources[10 + i], name, ds_type, 10 + i);
+        }
+    }
+
+    #[test]
+    fn test_storage_schema() {
+        for format in [RrdFormat::Pve2, RrdFormat::Pve9_0] {
+            let schema = RrdSchema::storage(format);
+
+            assert_eq!(schema.column_count(), 2);
+            assert_eq!(schema.format, format);
+
+            assert_ds_properties(&schema.data_sources[0], "total", "GAUGE", 0);
+            assert_ds_properties(&schema.data_sources[1], "used", "GAUGE", 1);
+        }
+    }
+
+    #[test]
+    fn test_rra_archives() {
+        let expected_rras = [
+            "RRA:AVERAGE:0.5:1:1440",
+            "RRA:AVERAGE:0.5:30:1440",
+            "RRA:AVERAGE:0.5:360:1440",
+            "RRA:AVERAGE:0.5:10080:570",
+            "RRA:MAX:0.5:1:1440",
+            "RRA:MAX:0.5:30:1440",
+            "RRA:MAX:0.5:360:1440",
+            "RRA:MAX:0.5:10080:570",
+        ];
+
+        let schemas = vec![
+            RrdSchema::node(RrdFormat::Pve2),
+            RrdSchema::node(RrdFormat::Pve9_0),
+            RrdSchema::vm(RrdFormat::Pve2),
+            RrdSchema::vm(RrdFormat::Pve9_0),
+            RrdSchema::storage(RrdFormat::Pve2),
+            RrdSchema::storage(RrdFormat::Pve9_0),
+        ];
+
+        for schema in schemas {
+            assert_eq!(schema.archives.len(), 8);
+
+            for (i, expected) in expected_rras.iter().enumerate() {
+                assert_eq!(
+                    &schema.archives[i], expected,
+                    "RRA[{}] mismatch in {:?}",
+                    i, schema.format
+                );
+            }
+        }
+    }
+
+    #[test]
+    fn test_heartbeat_consistency() {
+        let schemas = vec![
+            RrdSchema::node(RrdFormat::Pve2),
+            RrdSchema::node(RrdFormat::Pve9_0),
+            RrdSchema::vm(RrdFormat::Pve2),
+            RrdSchema::vm(RrdFormat::Pve9_0),
+            RrdSchema::storage(RrdFormat::Pve2),
+            RrdSchema::storage(RrdFormat::Pve9_0),
+        ];
+
+        for schema in schemas {
+            for ds in &schema.data_sources {
+                assert_eq!(ds.heartbeat, 120);
+                assert_eq!(ds.min, "0");
+                assert_eq!(ds.max, "U");
+            }
+        }
+    }
+
+    #[test]
+    fn test_gauge_vs_derive_correctness() {
+        // GAUGE: instantaneous values (CPU%, memory bytes)
+        // DERIVE: cumulative counters that can wrap (network/disk bytes)
+
+        let node = RrdSchema::node(RrdFormat::Pve2);
+        let node_derive_indices = [10, 11]; // netin, netout
+        for (i, ds) in node.data_sources.iter().enumerate() {
+            if node_derive_indices.contains(&i) {
+                assert_eq!(
+                    ds.ds_type, "DERIVE",
+                    "Node DS[{}] ({}) should be DERIVE",
+                    i, ds.name
+                );
+            } else {
+                assert_eq!(
+                    ds.ds_type, "GAUGE",
+                    "Node DS[{}] ({}) should be GAUGE",
+                    i, ds.name
+                );
+            }
+        }
+
+        let vm = RrdSchema::vm(RrdFormat::Pve2);
+        let vm_derive_indices = [6, 7, 8, 9]; // netin, netout, diskread, diskwrite
+        for (i, ds) in vm.data_sources.iter().enumerate() {
+            if vm_derive_indices.contains(&i) {
+                assert_eq!(
+                    ds.ds_type, "DERIVE",
+                    "VM DS[{}] ({}) should be DERIVE",
+                    i, ds.name
+                );
+            } else {
+                assert_eq!(
+                    ds.ds_type, "GAUGE",
+                    "VM DS[{}] ({}) should be GAUGE",
+                    i, ds.name
+                );
+            }
+        }
+
+        let storage = RrdSchema::storage(RrdFormat::Pve2);
+        for ds in &storage.data_sources {
+            assert_eq!(
+                ds.ds_type, "GAUGE",
+                "Storage DS ({}) should be GAUGE",
+                ds.name
+            );
+        }
+    }
+
+    #[test]
+    fn test_pve9_backward_compatibility() {
+        let node_pve2 = RrdSchema::node(RrdFormat::Pve2);
+        let node_pve9 = RrdSchema::node(RrdFormat::Pve9_0);
+
+        assert!(node_pve9.column_count() > node_pve2.column_count());
+
+        for i in 0..node_pve2.column_count() {
+            assert_eq!(
+                node_pve2.data_sources[i].name, node_pve9.data_sources[i].name,
+                "Node DS[{}] name must match between pve2 and pve9.0",
+                i
+            );
+            assert_eq!(
+                node_pve2.data_sources[i].ds_type, node_pve9.data_sources[i].ds_type,
+                "Node DS[{}] type must match between pve2 and pve9.0",
+                i
+            );
+        }
+
+        let vm_pve2 = RrdSchema::vm(RrdFormat::Pve2);
+        let vm_pve9 = RrdSchema::vm(RrdFormat::Pve9_0);
+
+        assert!(vm_pve9.column_count() > vm_pve2.column_count());
+
+        for i in 0..vm_pve2.column_count() {
+            assert_eq!(
+                vm_pve2.data_sources[i].name, vm_pve9.data_sources[i].name,
+                "VM DS[{}] name must match between pve2 and pve9.0",
+                i
+            );
+            assert_eq!(
+                vm_pve2.data_sources[i].ds_type, vm_pve9.data_sources[i].ds_type,
+                "VM DS[{}] type must match between pve2 and pve9.0",
+                i
+            );
+        }
+
+        let storage_pve2 = RrdSchema::storage(RrdFormat::Pve2);
+        let storage_pve9 = RrdSchema::storage(RrdFormat::Pve9_0);
+        assert_eq!(storage_pve2.column_count(), storage_pve9.column_count());
+    }
+
+    #[test]
+    fn test_schema_display() {
+        let test_cases = vec![
+            (RrdSchema::node(RrdFormat::Pve2), "Pve2", "12 data sources"),
+            (
+                RrdSchema::node(RrdFormat::Pve9_0),
+                "Pve9_0",
+                "19 data sources",
+            ),
+            (RrdSchema::vm(RrdFormat::Pve2), "Pve2", "10 data sources"),
+            (
+                RrdSchema::vm(RrdFormat::Pve9_0),
+                "Pve9_0",
+                "17 data sources",
+            ),
+            (
+                RrdSchema::storage(RrdFormat::Pve2),
+                "Pve2",
+                "2 data sources",
+            ),
+        ];
+
+        for (schema, expected_format, expected_count) in test_cases {
+            let display = format!("{}", schema);
+            assert!(
+                display.contains(expected_format),
+                "Display should contain format: {}",
+                display
+            );
+            assert!(
+                display.contains(expected_count),
+                "Display should contain count: {}",
+                display
+            );
+        }
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
new file mode 100644
index 00000000..79ed202a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-rrd/src/writer.rs
@@ -0,0 +1,397 @@
+/// RRD File Writer
+///
+/// Handles creating and updating RRD files via pluggable backends.
+/// Supports daemon-based (rrdcached) and direct file writing modes.
+use super::key_type::RrdKeyType;
+use super::schema::{RrdFormat, RrdSchema};
+use anyhow::{Context, Result};
+use chrono::Utc;
+use std::collections::HashMap;
+use std::fs;
+use std::path::{Path, PathBuf};
+
+/// Metric type for determining column skipping rules
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum MetricType {
+    Node,
+    Vm,
+    Storage,
+}
+
+impl MetricType {
+    /// Number of non-archivable columns to skip
+    ///
+    /// C implementation (status.c:1300, 1335):
+    /// - Node: skip 2 (uptime, status)
+    /// - VM: skip 4 (uptime, status, template, pid)
+    /// - Storage: skip 0
+    fn skip_columns(self) -> usize {
+        match self {
+            MetricType::Node => 2,
+            MetricType::Vm => 4,
+            MetricType::Storage => 0,
+        }
+    }
+}
+
+impl RrdFormat {
+    /// Get column count for a specific metric type
+    #[allow(dead_code)]
+    fn column_count(self, metric_type: &MetricType) -> usize {
+        match (self, metric_type) {
+            (RrdFormat::Pve2, MetricType::Node) => 12,
+            (RrdFormat::Pve9_0, MetricType::Node) => 19,
+            (RrdFormat::Pve2, MetricType::Vm) => 10,
+            (RrdFormat::Pve9_0, MetricType::Vm) => 17,
+            (_, MetricType::Storage) => 2, // Same for both formats
+        }
+    }
+}
+
+impl RrdKeyType {
+    /// Get the metric type for this key
+    fn metric_type(&self) -> MetricType {
+        match self {
+            RrdKeyType::Node { .. } => MetricType::Node,
+            RrdKeyType::Vm { .. } => MetricType::Vm,
+            RrdKeyType::Storage { .. } => MetricType::Storage,
+        }
+    }
+}
+
+/// RRD writer for persistent metric storage
+///
+/// Uses pluggable backends (daemon, direct, or fallback) for RRD operations.
+pub struct RrdWriter {
+    /// Base directory for RRD files (default: /var/lib/rrdcached/db)
+    base_dir: PathBuf,
+    /// Backend for RRD operations (daemon, direct, or fallback)
+    backend: Box<dyn super::backend::RrdBackend>,
+    /// Track which RRD files we've already created
+    created_files: HashMap<String, ()>,
+}
+
+impl RrdWriter {
+    /// Create new RRD writer with default fallback backend
+    ///
+    /// Uses the fallback backend that tries daemon first, then falls back to direct file writes.
+    /// This matches the C implementation's behavior.
+    ///
+    /// # Arguments
+    /// * `base_dir` - Base directory for RRD files
+    pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
+        let backend = Self::default_backend().await?;
+        Self::with_backend(base_dir, backend).await
+    }
+
+    /// Create new RRD writer with specific backend
+    ///
+    /// # Arguments
+    /// * `base_dir` - Base directory for RRD files
+    /// * `backend` - RRD backend to use (daemon, direct, or fallback)
+    pub(crate) async fn with_backend<P: AsRef<Path>>(
+        base_dir: P,
+        backend: Box<dyn super::backend::RrdBackend>,
+    ) -> Result<Self> {
+        let base_dir = base_dir.as_ref().to_path_buf();
+
+        // Create base directory if it doesn't exist
+        fs::create_dir_all(&base_dir)
+            .with_context(|| format!("Failed to create RRD base directory: {base_dir:?}"))?;
+
+        tracing::info!("RRD writer using backend: {}", backend.name());
+
+        Ok(Self {
+            base_dir,
+            backend,
+            created_files: HashMap::new(),
+        })
+    }
+
+    /// Create default backend (fallback: daemon + direct)
+    ///
+    /// This matches the C implementation's behavior:
+    /// - Tries rrdcached daemon first for performance
+    /// - Falls back to direct file writes if daemon fails
+    async fn default_backend() -> Result<Box<dyn super::backend::RrdBackend>> {
+        let backend = super::backend::RrdFallbackBackend::new("/var/run/rrdcached.sock").await;
+        Ok(Box::new(backend))
+    }
+
+    /// Update RRD file with metric data
+    ///
+    /// This will:
+    /// 1. Transform data from source format to target format (padding/truncation/column skipping)
+    /// 2. Create the RRD file if it doesn't exist
+    /// 3. Update via rrdcached daemon
+    ///
+    /// # Arguments
+    /// * `key` - RRD key (e.g., "pve2-node/node1", "pve-vm-9.0/100")
+    /// * `data` - Metric data string (format: "timestamp:value1:value2:...")
+    pub async fn update(&mut self, key: &str, data: &str) -> Result<()> {
+        // Parse the key to determine file path and schema
+        let key_type = RrdKeyType::parse(key).with_context(|| format!("Invalid RRD key: {key}"))?;
+
+        // Get source format and target schema
+        let source_format = key_type.source_format();
+        let target_schema = key_type.schema();
+        let metric_type = key_type.metric_type();
+
+        // Transform data from source to target format
+        let transformed_data =
+            Self::transform_data(data, source_format, &target_schema, metric_type)
+                .with_context(|| format!("Failed to transform RRD data for key: {key}"))?;
+
+        // Get the file path (always uses current format)
+        let file_path = key_type.file_path(&self.base_dir);
+
+        // Ensure the RRD file exists
+        if !self.created_files.contains_key(key) && !file_path.exists() {
+            self.create_rrd_file(&key_type, &file_path).await?;
+            self.created_files.insert(key.to_string(), ());
+        }
+
+        // Update the RRD file via backend
+        self.backend.update(&file_path, &transformed_data).await?;
+
+        Ok(())
+    }
+
+    /// Create RRD file with appropriate schema via backend
+    async fn create_rrd_file(&mut self, key_type: &RrdKeyType, file_path: &Path) -> Result<()> {
+        // Ensure parent directory exists
+        if let Some(parent) = file_path.parent() {
+            fs::create_dir_all(parent)
+                .with_context(|| format!("Failed to create directory: {parent:?}"))?;
+        }
+
+        // Get schema for this RRD type
+        let schema = key_type.schema();
+
+        // Calculate start time (at day boundary, matching C implementation)
+        let now = Utc::now();
+        let start = now
+            .date_naive()
+            .and_hms_opt(0, 0, 0)
+            .expect("00:00:00 is always a valid time")
+            .and_utc();
+        let start_timestamp = start.timestamp();
+
+        tracing::debug!(
+            "Creating RRD file: {:?} with {} data sources via {}",
+            file_path,
+            schema.column_count(),
+            self.backend.name()
+        );
+
+        // Delegate to backend for creation
+        self.backend
+            .create(file_path, &schema, start_timestamp)
+            .await?;
+
+        tracing::info!("Created RRD file: {:?} ({})", file_path, schema);
+
+        Ok(())
+    }
+
+    /// Transform data from source format to target format
+    ///
+    /// This implements the C behavior from status.c:
+    /// 1. Skip non-archivable columns only for old formats (uptime, status for nodes)
+    /// 2. Pad old format data with `:U` for missing columns
+    /// 3. Truncate future format data to known columns
+    ///
+    /// # Arguments
+    /// * `data` - Raw data string from status update (format: "timestamp:v1:v2:...")
+    /// * `source_format` - Format indicated by the input key
+    /// * `target_schema` - Target RRD schema (always Pve9_0 currently)
+    /// * `metric_type` - Type of metric (Node, VM, Storage) for column skipping
+    ///
+    /// # Returns
+    /// Transformed data string ready for RRD update
+    fn transform_data(
+        data: &str,
+        source_format: RrdFormat,
+        target_schema: &RrdSchema,
+        metric_type: MetricType,
+    ) -> Result<String> {
+        let mut parts = data.split(':');
+
+        let timestamp = parts
+            .next()
+            .ok_or_else(|| anyhow::anyhow!("Empty data string"))?;
+
+        // Skip non-archivable columns for old format only (C: status.c:1300, 1335, 1385)
+        let skip_count = if source_format == RrdFormat::Pve2 {
+            metric_type.skip_columns()
+        } else {
+            0
+        };
+
+        // Build transformed data: timestamp + values (skipped, padded/truncated to target_cols)
+        let target_cols = target_schema.column_count();
+
+        // Join values with ':' separator, efficiently building the string without Vec allocation
+        let mut iter = parts
+            .skip(skip_count)
+            .chain(std::iter::repeat("U"))
+            .take(target_cols);
+        let values = match iter.next() {
+            Some(first) => {
+                // Start with first value, fold remaining values with separator
+                iter.fold(first.to_string(), |mut acc, value| {
+                    acc.push(':');
+                    acc.push_str(value);
+                    acc
+                })
+            }
+            None => String::new(),
+        };
+
+        Ok(format!("{timestamp}:{values}"))
+    }
+
+    /// Flush all pending updates
+    #[allow(dead_code)] // Used via RRD update cycle
+    pub(crate) async fn flush(&mut self) -> Result<()> {
+        self.backend.flush().await
+    }
+
+    /// Get base directory
+    #[allow(dead_code)] // Used for path resolution in updates
+    pub(crate) fn base_dir(&self) -> &Path {
+        &self.base_dir
+    }
+}
+
+impl Drop for RrdWriter {
+    fn drop(&mut self) {
+        // Note: We can't flush in Drop since it's async
+        // Users should call flush() explicitly before dropping if needed
+        tracing::debug!("RrdWriter dropped");
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::super::schema::{RrdFormat, RrdSchema};
+    use super::*;
+
+    #[test]
+    fn test_rrd_file_path_generation() {
+        let temp_dir = std::path::PathBuf::from("/tmp/test");
+
+        let key_node = RrdKeyType::Node {
+            nodename: "testnode".to_string(),
+            format: RrdFormat::Pve9_0,
+        };
+        let path = key_node.file_path(&temp_dir);
+        assert_eq!(path, temp_dir.join("pve-node-9.0").join("testnode"));
+    }
+
+    // ===== Format Adaptation Tests =====
+
+    #[test]
+    fn test_transform_data_node_pve2_to_pve9() {
+        // Test padding old format (12 cols) to new format (19 cols)
+        // Input: timestamp:uptime:status:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout
+        let data = "1234567890:1000:0:1.5:4:2.0:0.5:8000000000:6000000000:0:0:1000000:500000";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Node).unwrap();
+
+        // After skipping 2 cols (uptime, status) and padding with 7 U's:
+        // timestamp:load:maxcpu:cpu:iowait:memtotal:memused:swap_t:swap_u:netin:netout:U:U:U:U:U:U:U
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1234567890", "Timestamp should be preserved");
+        assert_eq!(parts.len(), 20, "Should have timestamp + 19 values"); // 1 + 19
+        assert_eq!(parts[1], "1.5", "First value after skip should be load");
+        assert_eq!(parts[2], "4", "Second value should be maxcpu");
+
+        // Check padding
+        for (i, item) in parts.iter().enumerate().take(20).skip(12) {
+            assert_eq!(item, &"U", "Column {} should be padded with U", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_vm_pve2_to_pve9() {
+        // Test VM transformation with 4 columns skipped
+        // Input: timestamp:uptime:status:template:pid:maxcpu:cpu:maxmem:mem:maxdisk:disk:netin:netout:diskread:diskwrite
+        let data = "1234567890:1000:1:0:12345:4:2:4096:2048:100000:50000:1000:500:100:50";
+
+        let schema = RrdSchema::vm(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Vm).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts[0], "1234567890");
+        assert_eq!(parts.len(), 18, "Should have timestamp + 17 values");
+        assert_eq!(parts[1], "4", "First value after skip should be maxcpu");
+
+        // Check padding (last 7 columns)
+        for (i, item) in parts.iter().enumerate().take(18).skip(11) {
+            assert_eq!(item, &"U", "Column {} should be padded", i);
+        }
+    }
+
+    #[test]
+    fn test_transform_data_no_padding_needed() {
+        // Test when source and target have same column count
+        let data = "1234567890:1.5:4:2.0:0.5:8000000000:6000000000:0:0:0:0:1000000:500000:7000000000:0:0:0:0:0:0";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+        // No transformation should occur (same format)
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts.len(), 20); // timestamp + 19 values
+        assert_eq!(parts[1], "1.5");
+    }
+
+    #[test]
+    fn test_transform_data_future_format_truncation() {
+        // Test truncation of future format with extra columns
+        let data = "1234567890:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15:16:17:18:19:20:21:22:23:24:25";
+
+        let schema = RrdSchema::node(RrdFormat::Pve9_0);
+        // Simulating future format that has 25 columns
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve9_0, &schema, MetricType::Node).unwrap();
+
+        let parts: Vec<&str> = result.split(':').collect();
+        assert_eq!(parts.len(), 20, "Should truncate to timestamp + 19 values");
+        assert_eq!(parts[19], "19", "Last value should be column 19");
+    }
+
+    #[test]
+    fn test_transform_data_storage_no_change() {
+        // Storage format is same for Pve2 and Pve9_0 (2 columns, no skipping)
+        let data = "1234567890:1000000000000:500000000000";
+
+        let schema = RrdSchema::storage(RrdFormat::Pve9_0);
+        let result =
+            RrdWriter::transform_data(data, RrdFormat::Pve2, &schema, MetricType::Storage).unwrap();
+
+        assert_eq!(result, data, "Storage data should not be transformed");
+    }
+
+    #[test]
+    fn test_metric_type_methods() {
+        assert_eq!(MetricType::Node.skip_columns(), 2);
+        assert_eq!(MetricType::Vm.skip_columns(), 4);
+        assert_eq!(MetricType::Storage.skip_columns(), 0);
+    }
+
+    #[test]
+    fn test_format_column_counts() {
+        assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Node), 12);
+        assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Node), 19);
+        assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Vm), 10);
+        assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Vm), 17);
+        assert_eq!(RrdFormat::Pve2.column_count(&MetricType::Storage), 2);
+        assert_eq!(RrdFormat::Pve9_0.column_count(&MetricType::Storage), 2);
+    }
+}
-- 
2.47.3





More information about the pve-devel mailing list