[pve-devel] [PATCH pve-cluster 10/15] pmxcfs-rs: add pmxcfs-dfsm crate

Kefu Chai k.chai at proxmox.com
Tue Jan 6 15:24:34 CET 2026


Add Distributed Finite State Machine for cluster synchronization:
- Dfsm: Core state machine implementation
- ClusterDatabaseService: MemDb sync (pmxcfs_v1 CPG group)
- StatusSyncService: Status sync (pve_kvstore_v1 CPG group)
- Protocol: SyncStart, State, Update, UpdateComplete, Verify
- Leader election based on version and mtime
- Incremental updates for efficiency

This integrates pmxcfs-memdb, pmxcfs-services, and rust-corosync
to provide cluster-wide database synchronization. It implements
the wire-compatible protocol used by the C version.

Includes unit tests for:
- Index serialization and comparison
- Leader election logic
- Tree entry serialization
- Diff computation between indices

Signed-off-by: Kefu Chai <k.chai at proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |    1 +
 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml          |   45 +
 src/pmxcfs-rs/pmxcfs-dfsm/README.md           |  340 ++++++
 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs    |   52 +
 .../src/cluster_database_service.rs           |  116 ++
 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs  |  163 +++
 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs |  728 ++++++++++++
 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs |  185 +++
 .../pmxcfs-dfsm/src/kv_store_message.rs       |  329 ++++++
 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs          |   32 +
 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs      |   21 +
 .../pmxcfs-dfsm/src/state_machine.rs          | 1013 +++++++++++++++++
 .../pmxcfs-dfsm/src/status_sync_service.rs    |  118 ++
 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs        |  107 ++
 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs  |  220 ++++
 .../tests/multi_node_sync_tests.rs            |  565 +++++++++
 16 files changed, 4035 insertions(+)
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index f4497d58..4d18aa93 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -10,6 +10,7 @@ members = [
     "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
     "pmxcfs-services",   # Service framework for automatic retry and lifecycle management
     "pmxcfs-ipc",        # libqb-compatible IPC server
+    "pmxcfs-dfsm",       # Distributed Finite State Machine (owns CPG)
 ]
 resolver = "2"
 
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
new file mode 100644
index 00000000..12a8e7f6
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/Cargo.toml
@@ -0,0 +1,45 @@
+[package]
+name = "pmxcfs-dfsm"
+description = "Distributed Finite State Machine for cluster state synchronization"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+# Internal dependencies
+pmxcfs-api-types.workspace = true
+pmxcfs-memdb.workspace = true
+pmxcfs-services.workspace = true
+
+# Corosync integration
+rust-corosync.workspace = true
+
+# Error handling
+anyhow.workspace = true
+thiserror.workspace = true
+
+# Async and concurrency
+parking_lot.workspace = true
+async-trait.workspace = true
+tokio.workspace = true
+
+# Serialization
+serde.workspace = true
+bincode.workspace = true
+bytemuck.workspace = true
+
+# Logging
+tracing.workspace = true
+
+# Utilities
+num_enum.workspace = true
+
+[dev-dependencies]
+tempfile.workspace = true
+libc.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/README.md b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
new file mode 100644
index 00000000..560827a7
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/README.md
@@ -0,0 +1,340 @@
+# pmxcfs-dfsm
+
+**Distributed Finite State Machine** for cluster-wide state synchronization in pmxcfs.
+
+This crate implements the DFSM protocol used to replicate configuration changes and status updates across all nodes in a Proxmox cluster via Corosync CPG (Closed Process Group).
+
+## Overview
+
+The DFSM is the core mechanism for maintaining consistency across cluster nodes. It ensures that:
+
+- All nodes see filesystem operations (writes, creates, deletes) in the same order
+- Database state remains synchronized even after network partitions
+- Status information (VM states, RRD data) is broadcast to all nodes
+- State verification catches inconsistencies
+
+## Architecture
+
+### Key Components
+
+### Module Structure
+
+| Module | Purpose | C Equivalent |
+|--------|---------|--------------|
+| `state_machine.rs` | Core DFSM logic, state transitions | `dfsm.c` |
+| `cluster_database_service.rs` | MemDb sync service | `dcdb.c`, `loop.c:service_dcdb` |
+| `status_sync_service.rs` | Status/kvstore sync service | `loop.c:service_status` |
+| `cpg_service.rs` | Corosync CPG integration | `dfsm.c:cpg_callbacks` |
+| `dfsm_message.rs` | Protocol message types | `dfsm.c:dfsm_message_*_header_t` |
+| `message.rs` | Message trait and serialization | (inline in C) |
+| `wire_format.rs` | C-compatible wire format | `dcdb.c:c_fuse_message_header_t` |
+| `broadcast.rs` | Cluster-wide message broadcast | `dcdb.c:dcdb_send_fuse_message` |
+| `types.rs` | Type definitions (modes, epochs) | `dfsm.c:dfsm_mode_t` |
+
+## C to Rust Mapping
+
+### Data Structures
+
+| C Type | Rust Type | Notes |
+|--------|-----------|-------|
+| `dfsm_t` | `Dfsm` | Main state machine |
+| `dfsm_mode_t` | `DfsmMode` | Enum with type safety |
+| `dfsm_node_info_t` | (internal) | Node state tracking |
+| `dfsm_sync_info_t` | (internal) | Sync session info |
+| `dfsm_callbacks_t` | Trait-based callbacks | Type-safe callbacks via traits |
+| `dfsm_message_*_header_t` | `DfsmMessage` | Type-safe enum variants |
+
+### Functions
+
+#### Core DFSM Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dfsm_new()` | `Dfsm::new()` | state_machine.rs |
+| `dfsm_initialize()` | `Dfsm::init_cpg()` | state_machine.rs |
+| `dfsm_join()` | (part of init_cpg) | state_machine.rs |
+| `dfsm_dispatch()` | `Dfsm::dispatch_events()` | state_machine.rs |
+| `dfsm_send_message()` | `Dfsm::send_message()` | state_machine.rs |
+| `dfsm_send_update()` | `Dfsm::send_update()` | state_machine.rs |
+| `dfsm_verify_request()` | `Dfsm::verify_request()` | state_machine.rs |
+| `dfsm_finalize()` | `Dfsm::stop_services()` | state_machine.rs |
+
+#### DCDB (Cluster Database) Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `dcdb_new()` | `ClusterDatabaseService::new()` | cluster_database_service.rs |
+| `dcdb_send_fuse_message()` | `broadcast()` | broadcast.rs |
+| `dcdb_send_unlock()` | `FuseMessage::Unlock` + broadcast | broadcast.rs |
+| `service_dcdb()` | `ClusterDatabaseService` | cluster_database_service.rs |
+
+#### Status Sync Operations
+
+| C Function | Rust Equivalent | Location |
+|-----------|-----------------|----------|
+| `service_status()` | `StatusSyncService` | status_sync_service.rs |
+| (kvstore CPG group) | `StatusSyncService` | Uses separate CPG group |
+
+### Callback System
+
+**C Implementation:**
+
+**Rust Implementation:**
+- Uses trait-based callbacks instead of function pointers
+- Callbacks are implemented by `MemDbCallbacks` (memdb integration)
+- Defined in external crates (pmxcfs-memdb)
+
+## Synchronization Protocol
+
+The DFSM ensures all nodes maintain consistent database state through a multi-phase synchronization protocol:
+
+### Protocol Phases
+
+#### Phase 1: Membership Change
+
+When nodes join or leave the cluster:
+
+1. **Corosync CPG** delivers membership change notification
+2. **DFSM invalidates** cached checksums
+3. **Message queues** are cleared
+4. **Epoch counter** is incremented
+
+**CPG Leader** (lowest node ID):
+- Initiates sync by sending `SyncStart` message
+- Sends its own `State` (CPG doesn't loop back messages)
+
+**All Followers**:
+- Respond to `SyncStart` by sending their `State`
+- Wait for other nodes' states
+
+#### Phase 2: State Exchange
+
+Each node collects `State` messages containing serialized **MemDbIndex** (compact state summary using C-compatible wire format).
+
+State digests are computed using SHA-256 hashing to detect differences between nodes.
+
+#### Phase 3: Leader Election
+
+When all states are collected, `process_state_update()` is called:
+
+1. **Parse indices** from all node states
+2. **Elect data leader** (may differ from CPG leader):
+   - Highest `version` wins
+   - If tied, highest `mtime` wins
+3. **Identify synced nodes**: Nodes whose index matches leader exactly
+4. **Determine own status**:
+   - If we're the data leader → send updates to followers
+   - If we're synced with leader → mark as Synced
+   - Otherwise → enter Update mode and wait
+
+**Leader Election Algorithm**:
+
+#### Phase 4: Incremental Updates
+
+**Data Leader** (node with highest version):
+
+1. **Compare indices** using `find_differences()` for each follower
+2. **Serialize differing entries** to C-compatible TreeEntry format
+3. **Send Update messages** via CPG
+4. **Send UpdateComplete** when all updates sent
+
+**Followers** (out-of-sync nodes):
+
+1. **Receive Update messages**
+2. **Deserialize TreeEntry** via `TreeEntry::deserialize_from_update()`
+3. **Apply to database** via `MemDb::apply_tree_entry()`:
+   - INSERT OR REPLACE in SQLite
+   - Update in-memory structures
+   - Handle entry moves (parent/name changes)
+4. **On UpdateComplete**: Transition to Synced mode
+
+#### Phase 5: Normal Operations
+
+When in **Synced** mode:
+
+- FUSE operations are broadcast via `send_fuse_message()`
+- Messages are delivered immediately via `deliver_message()`
+- Leader periodically sends `VerifyRequest` for checksum comparison
+- Nodes respond with `Verify` containing SHA-256 of entire database
+- Mismatches trigger cluster resync
+
+---
+
+## Protocol Details
+
+### State Machine Transitions
+
+Based on analysis of C implementation (`dfsm.c` lines 795-1209):
+
+#### Critical Protocol Rules
+
+1. **Epoch Management**:
+   - Each node creates local epoch during confchg: `(counter++, time, own_nodeid, own_pid)`
+   - **Leader sends SYNC_START with its epoch**
+   - **Followers MUST adopt leader's epoch from SYNC_START** (`dfsm->sync_epoch = header->epoch`)
+   - All STATE messages in sync round use adopted epoch
+   - Epoch mismatch → message discarded (may lead to LEAVE)
+
+2. **Member List Validation**:
+   - Built from `member_list` in confchg callback
+   - Stored in `dfsm->sync_info->nodes[]`
+   - STATE sender MUST be in this list
+   - Non-member STATE → immediate LEAVE
+
+3. **Duplicate Detection**:
+   - Each node sends STATE exactly once per sync round
+   - Tracked via `ni->state` pointer (NULL = not received, non-NULL = received)
+   - Duplicate STATE from same nodeid/pid → immediate LEAVE
+   - **Root cause of current Rust/C sync failure**
+
+4. **Message Ordering** (one sync round):
+   
+5. **Leader Selection**:
+   - Determined by `lowest_nodeid` from member list
+   - Set in confchg callback before any messages sent
+   - Used to validate SYNC_START sender (logged but not enforced)
+   - Re-elected during state processing based on DB versions
+
+### DFSM States (DfsmMode)
+
+| State | Value | Description | C Equivalent |
+|-------|-------|-------------|--------------|
+| `Start` | 0 | Initial connection | `DFSM_MODE_START` |
+| `StartSync` | 1 | Beginning sync | `DFSM_MODE_START_SYNC` |
+| `Synced` | 2 | Fully synchronized | `DFSM_MODE_SYNCED` |
+| `Update` | 3 | Receiving updates | `DFSM_MODE_UPDATE` |
+| `Leave` | 253 | Leaving group | `DFSM_MODE_LEAVE` |
+| `VersionError` | 254 | Protocol mismatch | `DFSM_MODE_VERSION_ERROR` |
+| `Error` | 255 | Error state | `DFSM_MODE_ERROR` |
+
+### Message Types (DfsmMessageType)
+
+| Type | Value | Purpose |
+|------|-------|---------|
+| `Normal` | 0 | Application messages (with header + payload) |
+| `SyncStart` | 1 | Start sync (from leader) |
+| `State` | 2 | Full state data |
+| `Update` | 3 | Incremental update |
+| `UpdateComplete` | 4 | End of updates |
+| `VerifyRequest` | 5 | Request state verification |
+| `Verify` | 6 | State checksum response |
+
+All messages use C-compatible wire format with headers and payloads.
+
+### Application Message Types
+
+The DFSM can carry two types of application messages:
+
+1. **Fuse Messages** (Filesystem operations)
+   - CPG Group: `pmxcfs_v1` (DCDB)
+   - Message types: `Write`, `Create`, `Delete`, `Mkdir`, `Rename`, `SetMtime`, `Unlock`
+   - Defined in: `pmxcfs-api-types::FuseMessage`
+
+2. **KvStore Messages** (Status/RRD sync)
+   - CPG Group: `pve_kvstore_v1`
+   - Message types: `Data` (key-value pairs for status sync)
+   - Defined in: `pmxcfs-api-types::KvStoreMessage`
+
+### Wire Format Compatibility
+
+All wire formats are **byte-compatible** with the C implementation. Messages include appropriate headers and payloads as defined in the C protocol.
+
+## Synchronization Flow
+
+### 1. Node Join
+
+### 2. Normal Operation
+
+### 3. State Verification (Periodic)
+
+## Key Differences from C Implementation
+
+### Event Loop Architecture
+
+**C Version:**
+- Uses libqb's `qb_loop` for event loop
+- CPG fd registered with `qb_loop_poll_add()`
+- Dispatch called from qb_loop when fd is readable
+
+**Rust Version:**
+- Uses tokio async runtime
+- Service trait provides `dispatch()` method
+- ServiceManager polls fd using tokio's async I/O
+- No qb_loop dependency
+
+### CPG Instance Management
+
+**C Version:**
+- Single DFSM struct with callbacks
+- Two different CPG groups created separately
+
+**Rust Version:**
+- Each CPG group gets its own `Dfsm` instance
+- `ClusterDatabaseService` - manages `pmxcfs_v1` CPG group (MemDb)
+- `StatusSyncService` - manages `pve_kvstore_v1` CPG group (Status/RRD)
+- Both use same DFSM protocol but different callbacks
+
+## Error Handling
+
+### Split-Brain Prevention
+
+- Checksum verification detects divergence
+- Automatic resync on mismatch
+- Version monotonicity ensures forward progress
+
+### Network Partition Recovery
+
+- Membership changes trigger sync
+- Highest version always wins
+- Stale data is safely replaced
+
+### Consistency Guarantees
+
+- SQLite transactions ensure atomic updates
+- In-memory structures updated atomically
+- Version increments are monotonic
+- All nodes converge to same state
+
+## Compatibility Matrix
+
+| Feature | C Version | Rust Version | Compatible |
+|---------|-----------|--------------|------------|
+| Wire format | `dfsm_message_*_header_t` | `DfsmMessage::serialize()` | Yes |
+| CPG protocol | libcorosync | rust-corosync | Yes |
+| Message types | 0-6 | `DfsmMessageType` | Yes |
+| State machine | `dfsm_mode_t` | `DfsmMode` | Yes |
+| Protocol version | 1 | 1 | Yes |
+| Group names | `pmxcfs_v1`, `pve_kvstore_v1` | Same | Yes |
+
+## Known Issues / TODOs
+
+### Missing Features
+- [ ] **Sync message batching**: C version can batch updates, Rust sends individually
+- [ ] **Message queue limits**: C has MAX_QUEUE_LEN, Rust unbounded (potential memory issue)
+- [ ] **Detailed error codes**: C returns specific CS_ERR_* codes, Rust uses anyhow errors
+
+### Behavioral Differences (Benign)
+- **Logging**: Rust uses `tracing` instead of `qb_log` (compatible with journald)
+- **Threading**: Rust uses tokio tasks, C uses qb_loop single-threaded model
+- **Timers**: Rust uses tokio timers, C uses qb_loop timers (same timeout values)
+
+### Incompatibilities (None Known)
+No incompatibilities have been identified. The Rust implementation is fully wire-compatible and can operate in a mixed C/Rust cluster.
+
+## References
+
+### C Implementation
+- `src/pmxcfs/dfsm.c` / `dfsm.h` - Core DFSM implementation
+- `src/pmxcfs/dcdb.c` / `dcdb.h` - Distributed database coordination
+- `src/pmxcfs/loop.c` / `loop.h` - Service loop and management
+
+### Related Crates
+- **pmxcfs-memdb**: Database callbacks for DFSM
+- **pmxcfs-status**: Status tracking and kvstore
+- **pmxcfs-api-types**: Message type definitions
+- **pmxcfs-services**: Service framework for lifecycle management
+- **rust-corosync**: CPG bindings (external dependency)
+
+### Corosync Documentation
+- CPG (Closed Process Group) API: https://github.com/corosync/corosync
+- Group communication semantics: Total order, virtual synchrony
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
new file mode 100644
index 00000000..7e35b8d4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/callbacks.rs
@@ -0,0 +1,52 @@
+/// DFSM application callbacks
+///
+/// This module defines the callback trait that application layers implement
+/// to integrate with the DFSM state machine.
+use crate::NodeSyncInfo;
+
+/// Callback trait for DFSM operations
+///
+/// The application layer implements this to receive DFSM events.
+/// The generic parameter `M` specifies the message type this callback handles:
+/// - `Callbacks<FuseMessage>` for main database operations
+/// - `Callbacks<KvStoreMessage>` for status synchronization
+///
+/// This provides type safety by ensuring each DFSM instance only delivers
+/// the correct message type to its callbacks.
+pub trait Callbacks<M>: Send + Sync {
+    /// Deliver an application message
+    ///
+    /// The message type is determined by the generic parameter:
+    /// - FuseMessage for main database operations
+    /// - KvStoreMessage for status synchronization
+    fn deliver_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        message: M,
+        timestamp: u64,
+    ) -> anyhow::Result<(i32, bool)>;
+
+    /// Compute state checksum for verification
+    fn compute_checksum(&self, output: &mut [u8; 32]) -> anyhow::Result<()>;
+
+    /// Get current state for synchronization
+    ///
+    /// Called when we need to send our state to other nodes during sync.
+    fn get_state(&self) -> anyhow::Result<Vec<u8>>;
+
+    /// Process state update during synchronization
+    fn process_state_update(&self, states: &[NodeSyncInfo]) -> anyhow::Result<bool>;
+
+    /// Process incremental update from leader
+    ///
+    /// The leader sends individual TreeEntry updates during synchronization.
+    /// The data is serialized TreeEntry in C-compatible wire format.
+    fn process_update(&self, nodeid: u32, pid: u32, data: &[u8]) -> anyhow::Result<()>;
+
+    /// Commit synchronized state
+    fn commit_state(&self) -> anyhow::Result<()>;
+
+    /// Called when cluster becomes synced
+    fn on_synced(&self);
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
new file mode 100644
index 00000000..dc85a392
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cluster_database_service.rs
@@ -0,0 +1,116 @@
+//! Cluster Database Service
+//!
+//! This service synchronizes the distributed cluster database (pmxcfs-memdb) across
+//! all cluster nodes using DFSM (Distributed Finite State Machine).
+//!
+//! Equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+//! Provides automatic retry, event-driven CPG dispatching, and periodic state verification.
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{debug, error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Cluster Database Service
+///
+/// Synchronizes the distributed cluster database (pmxcfs-memdb) across all nodes.
+/// Implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for database replication
+/// - Periodic state verification via timer callback
+///
+/// This is equivalent to C implementation's service_dcdb (Distributed Cluster DataBase).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct ClusterDatabaseService<M> {
+    dfsm: Arc<Dfsm<M>>,
+    fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> ClusterDatabaseService<M> {
+    /// Create a new cluster database service
+    pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+        Self { dfsm, fd: None }
+    }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for ClusterDatabaseService<M> {
+    fn name(&self) -> &str {
+        "cluster-database"
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+        info!("Initializing cluster database service (dcdb)");
+
+        // Initialize CPG connection (this also joins the group)
+        self.dfsm.init_cpg().map_err(|e| {
+            ServiceError::InitializationFailed(format!("DFSM CPG initialization failed: {e}"))
+        })?;
+
+        // Get file descriptor for event monitoring
+        let fd = self.dfsm.fd_get().map_err(|e| {
+            self.dfsm.stop_services().ok();
+            ServiceError::InitializationFailed(format!("Failed to get DFSM fd: {e}"))
+        })?;
+
+        self.fd = Some(fd);
+
+        info!(
+            "Cluster database service initialized successfully with fd {}",
+            fd
+        );
+        Ok(InitResult::WithFileDescriptor(fd))
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+        match self.dfsm.dispatch_events() {
+            Ok(_) => Ok(DispatchAction::Continue),
+            Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+                warn!("DFSM connection lost, requesting reinitialization");
+                Ok(DispatchAction::Reinitialize)
+            }
+            Err(e) => {
+                error!("DFSM dispatch failed: {}", e);
+                Err(ServiceError::DispatchFailed(format!(
+                    "DFSM dispatch failed: {e}"
+                )))
+            }
+        }
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        info!("Finalizing cluster database service");
+
+        self.fd = None;
+
+        if let Err(e) = self.dfsm.stop_services() {
+            warn!("Error stopping cluster database services: {}", e);
+        }
+
+        info!("Cluster database service finalized");
+        Ok(())
+    }
+
+    async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+        debug!("Cluster database timer callback: initiating state verification");
+
+        // Request state verification
+        if let Err(e) = self.dfsm.verify_request() {
+            warn!("DFSM state verification request failed: {}", e);
+        }
+
+        Ok(())
+    }
+
+    fn timer_period(&self) -> Option<Duration> {
+        // Match C implementation's DCDB_VERIFY_TIME (60 * 60 seconds)
+        // Periodic state verification happens once per hour
+        Some(Duration::from_secs(3600))
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
new file mode 100644
index 00000000..d7964259
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/cpg_service.rs
@@ -0,0 +1,163 @@
+//! Safe, idiomatic wrapper for Corosync CPG (Closed Process Group)
+//!
+//! This module provides a trait-based abstraction over the Corosync CPG C API,
+//! handling the unsafe FFI boundary and callback lifecycle management internally.
+
+use anyhow::Result;
+use rust_corosync::{NodeId, cpg};
+use std::sync::Arc;
+
+/// Helper to extract CpgHandler from CPG context
+///
+/// # Safety
+/// Assumes context was set to a valid *const Arc<dyn CpgHandler> pointer
+unsafe fn handler_from_context(handle: cpg::Handle) -> &'static dyn CpgHandler {
+    let context = cpg::context_get(handle).expect("BUG: Failed to get CPG context");
+
+    assert_ne!(
+        context, 0,
+        "BUG: CPG context is null - CpgService not properly initialized"
+    );
+
+    // Context points to a leaked Arc<dyn CpgHandler>
+    // We borrow the Arc to get a reference to the handler
+    let arc_ptr = context as *const Arc<dyn CpgHandler>;
+    let arc_ref: &Arc<dyn CpgHandler> = unsafe { &*arc_ptr };
+    arc_ref.as_ref()
+}
+
+/// Trait for handling CPG events in a safe, idiomatic way
+///
+/// Implementors receive callbacks when CPG events occur. The trait handles
+/// all unsafe pointer conversion and context management internally.
+pub trait CpgHandler: Send + Sync + 'static {
+    fn on_deliver(&self, group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]);
+
+    fn on_confchg(
+        &self,
+        group_name: &str,
+        member_list: &[cpg::Address],
+        left_list: &[cpg::Address],
+        joined_list: &[cpg::Address],
+    );
+}
+
+/// Safe wrapper for CPG handle that manages callback lifecycle
+///
+/// This service registers callbacks with the CPG handle and ensures proper
+/// cleanup when dropped. It uses Arc reference counting to safely manage
+/// the handler lifetime across the FFI boundary.
+pub struct CpgService {
+    handle: cpg::Handle,
+    handler: Arc<dyn CpgHandler>,
+}
+
+impl CpgService {
+    pub fn new<T: CpgHandler>(handler: Arc<T>) -> Result<Self> {
+        fn cpg_deliver_callback(
+            handle: &cpg::Handle,
+            group_name: String,
+            nodeid: NodeId,
+            pid: u32,
+            msg: &[u8],
+            _msg_len: usize,
+        ) {
+            unsafe {
+                let handler = handler_from_context(*handle);
+                handler.on_deliver(&group_name, nodeid, pid, msg);
+            }
+        }
+
+        fn cpg_confchg_callback(
+            handle: &cpg::Handle,
+            group_name: &str,
+            member_list: Vec<cpg::Address>,
+            left_list: Vec<cpg::Address>,
+            joined_list: Vec<cpg::Address>,
+        ) {
+            unsafe {
+                let handler = handler_from_context(*handle);
+                handler.on_confchg(group_name, &member_list, &left_list, &joined_list);
+            }
+        }
+
+        let model_data = cpg::ModelData::ModelV1(cpg::Model1Data {
+            flags: cpg::Model1Flags::None,
+            deliver_fn: Some(cpg_deliver_callback),
+            confchg_fn: Some(cpg_confchg_callback),
+            totem_confchg_fn: None,
+        });
+
+        let handle = cpg::initialize(&model_data, 0)?;
+
+        let handler_dyn: Arc<dyn CpgHandler> = handler;
+        let leaked_arc = Box::new(Arc::clone(&handler_dyn));
+        let arc_ptr = Box::into_raw(leaked_arc) as *const _ as u64;
+        cpg::context_set(handle, arc_ptr)?;
+
+        Ok(Self {
+            handle,
+            handler: handler_dyn,
+        })
+    }
+
+    pub fn join(&self, group_name: &str) -> Result<()> {
+        // IMPORTANT: C implementation uses strlen(name) + 1 for CPG name length,
+        // which includes the trailing nul. To ensure compatibility with C nodes,
+        // we must add \0 to the group name.
+        // See src/pmxcfs/dfsm.c: dfsm->cpg_group_name.length = strlen(group_name) + 1;
+        let group_string = format!("{}\0", group_name);
+        tracing::warn!(
+            "CPG JOIN: Joining group '{}' (verify matches C's DCDB_CPG_GROUP_NAME='pve_dcdb_v1')",
+            group_name
+        );
+        cpg::join(self.handle, &group_string)?;
+        tracing::info!("CPG JOIN: Successfully joined group '{}'", group_name);
+        Ok(())
+    }
+
+    pub fn leave(&self, group_name: &str) -> Result<()> {
+        // Include trailing nul to match C's behavior (see join() comment)
+        let group_string = format!("{}\0", group_name);
+        cpg::leave(self.handle, &group_string)?;
+        Ok(())
+    }
+
+    pub fn mcast(&self, guarantee: cpg::Guarantee, msg: &[u8]) -> Result<()> {
+        cpg::mcast_joined(self.handle, guarantee, msg)?;
+        Ok(())
+    }
+
+    pub fn dispatch(&self) -> Result<(), rust_corosync::CsError> {
+        cpg::dispatch(self.handle, rust_corosync::DispatchFlags::All)
+    }
+
+    pub fn fd(&self) -> Result<i32> {
+        Ok(cpg::fd_get(self.handle)?)
+    }
+
+    pub fn handler(&self) -> &Arc<dyn CpgHandler> {
+        &self.handler
+    }
+
+    pub fn handle(&self) -> cpg::Handle {
+        self.handle
+    }
+}
+
+impl Drop for CpgService {
+    fn drop(&mut self) {
+        if let Ok(context) = cpg::context_get(self.handle)
+            && context != 0
+        {
+            unsafe {
+                let _boxed = Box::from_raw(context as *mut Arc<dyn CpgHandler>);
+            }
+        }
+
+        let _ = cpg::finalize(self.handle);
+    }
+}
+
+unsafe impl Send for CpgService {}
+unsafe impl Sync for CpgService {}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
new file mode 100644
index 00000000..054f06b8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/dfsm_message.rs
@@ -0,0 +1,728 @@
+/// DFSM Protocol Message Types
+///
+/// This module defines the DfsmMessage enum which encapsulates all DFSM protocol messages
+/// with their associated data, providing type-safe serialization and deserialization.
+///
+/// Wire format matches C implementation's dfsm_message_*_header_t structures for compatibility.
+use anyhow::Result;
+use pmxcfs_memdb::TreeEntry;
+
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMessageType, SyncEpoch};
+
+/// DFSM protocol message with typed variants
+///
+/// Each variant corresponds to a message type in the DFSM protocol and carries
+/// the appropriate payload data. The wire format matches the C implementation:
+///
+/// For Normal messages: dfsm_message_normal_header_t (24 bytes) + fuse_data
+/// ```text
+/// [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][fuse_data...]
+/// ```
+///
+/// The generic parameter `M` specifies the application message type and must implement
+/// the `Message` trait for serialization/deserialization:
+/// - `DfsmMessage<FuseMessage>` for database operations
+/// - `DfsmMessage<KvStoreMessage>` for status synchronization
+#[derive(Debug, Clone)]
+pub enum DfsmMessage<M>
+where
+    M: MessageTrait,
+{
+    /// Regular application message
+    ///
+    /// Contains a typed application message (FuseMessage or KvStoreMessage).
+    /// C wire format: dfsm_message_normal_header_t + application_message data
+    Normal {
+        msg_count: u64,
+        timestamp: u32,        // Unix timestamp (matches C's u32)
+        protocol_version: u32, // Protocol version
+        message: M,            // Typed message (FuseMessage or KvStoreMessage)
+    },
+
+    /// Start synchronization signal from leader (no payload)
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+    SyncStart { sync_epoch: SyncEpoch },
+
+    /// State data from another node during sync
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [state_data: raw bytes]
+    State {
+        sync_epoch: SyncEpoch,
+        data: Vec<u8>,
+    },
+
+    /// State update from leader
+    ///
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + TreeEntry fields
+    /// This is sent by the leader during synchronization to update followers
+    /// with individual database entries that differ from their state.
+    Update {
+        sync_epoch: SyncEpoch,
+        tree_entry: TreeEntry,
+    },
+
+    /// Update complete signal from leader (no payload)
+    /// C wire format: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch)
+    UpdateComplete { sync_epoch: SyncEpoch },
+
+    /// Verification request from leader
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64]
+    VerifyRequest { sync_epoch: SyncEpoch, csum_id: u64 },
+
+    /// Verification response with checksum
+    ///
+    /// Wire format: dfsm_message_state_header_t (32 bytes) + [csum_id: u64][checksum: [u8; 32]]
+    Verify {
+        sync_epoch: SyncEpoch,
+        csum_id: u64,
+        checksum: [u8; 32],
+    },
+}
+
+impl<M> DfsmMessage<M>
+where
+    M: MessageTrait,
+{
+    /// Protocol version (should match cluster-wide)
+    pub const DEFAULT_PROTOCOL_VERSION: u32 = 1;
+
+    /// Get the message type discriminant
+    pub fn message_type(&self) -> DfsmMessageType {
+        match self {
+            DfsmMessage::Normal { .. } => DfsmMessageType::Normal,
+            DfsmMessage::SyncStart { .. } => DfsmMessageType::SyncStart,
+            DfsmMessage::State { .. } => DfsmMessageType::State,
+            DfsmMessage::Update { .. } => DfsmMessageType::Update,
+            DfsmMessage::UpdateComplete { .. } => DfsmMessageType::UpdateComplete,
+            DfsmMessage::VerifyRequest { .. } => DfsmMessageType::VerifyRequest,
+            DfsmMessage::Verify { .. } => DfsmMessageType::Verify,
+        }
+    }
+
+    /// Serialize message to C-compatible wire format
+    ///
+    /// For Normal/Update: dfsm_message_normal_header_t (24 bytes) + application_data
+    /// Format: [type: u16][subtype: u16][protocol: u32][time: u32][reserved: u32][count: u64][data...]
+    pub fn serialize(&self) -> Vec<u8> {
+        match self {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version,
+                message,
+            } => self.serialize_normal_message(*msg_count, *timestamp, *protocol_version, message),
+            _ => self.serialize_state_message(),
+        }
+    }
+
+    /// Serialize a Normal message with C-compatible header
+    fn serialize_normal_message(
+        &self,
+        msg_count: u64,
+        timestamp: u32,
+        protocol_version: u32,
+        message: &M,
+    ) -> Vec<u8> {
+        let msg_type = self.message_type() as u16;
+        let subtype = message.message_type();
+        let app_data = message.serialize();
+
+        // C header: type (u16) + subtype (u16) + protocol (u32) + time (u32) + reserved (u32) + count (u64) = 24 bytes
+        let mut message = Vec::with_capacity(24 + app_data.len());
+
+        // dfsm_message_header_t fields
+        message.extend_from_slice(&msg_type.to_le_bytes());
+        message.extend_from_slice(&subtype.to_le_bytes());
+        message.extend_from_slice(&protocol_version.to_le_bytes());
+        message.extend_from_slice(&timestamp.to_le_bytes());
+        message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // count field
+        message.extend_from_slice(&msg_count.to_le_bytes());
+
+        // application message data
+        message.extend_from_slice(&app_data);
+
+        message
+    }
+
+    /// Serialize state messages (non-Normal) with C-compatible header
+    /// C wire format: dfsm_message_state_header_t (32 bytes) + payload
+    /// Header breakdown: base (16 bytes) + epoch (16 bytes)
+    fn serialize_state_message(&self) -> Vec<u8> {
+        let msg_type = self.message_type() as u16;
+        let (sync_epoch, payload) = self.extract_epoch_and_payload();
+
+        // For state messages: dfsm_message_state_header_t (32 bytes: 16 base + 16 epoch) + payload
+        let mut message = Vec::with_capacity(32 + payload.len());
+
+        // Base header (16 bytes): type, subtype, protocol, time, reserved
+        message.extend_from_slice(&msg_type.to_le_bytes());
+        message.extend_from_slice(&0u16.to_le_bytes()); // subtype (unused)
+        message.extend_from_slice(&Self::DEFAULT_PROTOCOL_VERSION.to_le_bytes());
+
+        let timestamp = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs() as u32;
+        message.extend_from_slice(&timestamp.to_le_bytes());
+        message.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // Epoch header (16 bytes): epoch, time, nodeid, pid
+        message.extend_from_slice(&sync_epoch.serialize());
+
+        // Payload
+        message.extend_from_slice(&payload);
+
+        message
+    }
+
+    /// Extract sync_epoch and payload from state messages
+    fn extract_epoch_and_payload(&self) -> (SyncEpoch, Vec<u8>) {
+        match self {
+            DfsmMessage::Normal { .. } => {
+                unreachable!("Normal messages use serialize_normal_message")
+            }
+            DfsmMessage::SyncStart { sync_epoch } => (*sync_epoch, Vec::new()),
+            DfsmMessage::State { sync_epoch, data } => (*sync_epoch, data.clone()),
+            DfsmMessage::Update {
+                sync_epoch,
+                tree_entry,
+            } => (*sync_epoch, tree_entry.serialize_for_update()),
+            DfsmMessage::UpdateComplete { sync_epoch } => (*sync_epoch, Vec::new()),
+            DfsmMessage::VerifyRequest {
+                sync_epoch,
+                csum_id,
+            } => (*sync_epoch, csum_id.to_le_bytes().to_vec()),
+            DfsmMessage::Verify {
+                sync_epoch,
+                csum_id,
+                checksum,
+            } => {
+                let mut data = Vec::with_capacity(8 + 32);
+                data.extend_from_slice(&csum_id.to_le_bytes());
+                data.extend_from_slice(checksum);
+                (*sync_epoch, data)
+            }
+        }
+    }
+
+    /// Deserialize message from C-compatible wire format
+    ///
+    /// Normal messages: [base header: 16 bytes][count: u64][app data]
+    /// State messages:  [base header: 16 bytes][epoch: 16 bytes][payload]
+    ///
+    /// # Arguments
+    /// * `data` - Raw message bytes from CPG
+    pub fn deserialize(data: &[u8]) -> Result<Self> {
+        if data.len() < 16 {
+            anyhow::bail!(
+                "Message too short: {} bytes (need at least 16 for header)",
+                data.len()
+            );
+        }
+
+        // Parse dfsm_message_header_t (16 bytes)
+        let msg_type = u16::from_le_bytes([data[0], data[1]]);
+        let subtype = u16::from_le_bytes([data[2], data[3]]);
+        let protocol_version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
+        let timestamp = u32::from_le_bytes([data[8], data[9], data[10], data[11]]);
+        let _reserved = u32::from_le_bytes([data[12], data[13], data[14], data[15]]);
+
+        let dfsm_type = DfsmMessageType::try_from(msg_type)?;
+
+        // Normal messages have different structure than state messages
+        if dfsm_type == DfsmMessageType::Normal {
+            // Normal: [base: 16][count: 8][app_data: ...]
+            let payload = &data[16..];
+            Self::deserialize_normal_message(subtype, protocol_version, timestamp, payload)
+        } else {
+            // State messages: [base: 16][epoch: 16][payload: ...]
+            if data.len() < 32 {
+                anyhow::bail!(
+                    "State message too short: {} bytes (need at least 32 for state header)",
+                    data.len()
+                );
+            }
+            let sync_epoch = SyncEpoch::deserialize(&data[16..32])
+                .map_err(|e| anyhow::anyhow!("Failed to deserialize sync epoch: {e}"))?;
+            let payload = &data[32..];
+            Self::deserialize_state_message(dfsm_type, sync_epoch, payload)
+        }
+    }
+
+    /// Deserialize a Normal message
+    fn deserialize_normal_message(
+        subtype: u16,
+        protocol_version: u32,
+        timestamp: u32,
+        payload: &[u8],
+    ) -> Result<Self> {
+        // Normal messages have count field (u64) after base header
+        if payload.len() < 8 {
+            anyhow::bail!("Normal message too short: need count field");
+        }
+        let msg_count = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+        let app_data = &payload[8..];
+
+        // Deserialize using the MessageTrait
+        let message = M::deserialize(subtype, app_data)?;
+
+        Ok(DfsmMessage::Normal {
+            msg_count,
+            timestamp,
+            protocol_version,
+            message,
+        })
+    }
+
+    /// Deserialize a state message (with epoch)
+    fn deserialize_state_message(
+        dfsm_type: DfsmMessageType,
+        sync_epoch: SyncEpoch,
+        payload: &[u8],
+    ) -> Result<Self> {
+        match dfsm_type {
+            DfsmMessageType::Normal => {
+                unreachable!("Normal messages use deserialize_normal_message")
+            }
+            DfsmMessageType::Update => {
+                let tree_entry = TreeEntry::deserialize_from_update(payload)?;
+                Ok(DfsmMessage::Update {
+                    sync_epoch,
+                    tree_entry,
+                })
+            }
+            DfsmMessageType::SyncStart => Ok(DfsmMessage::SyncStart { sync_epoch }),
+            DfsmMessageType::State => Ok(DfsmMessage::State {
+                sync_epoch,
+                data: payload.to_vec(),
+            }),
+            DfsmMessageType::UpdateComplete => Ok(DfsmMessage::UpdateComplete { sync_epoch }),
+            DfsmMessageType::VerifyRequest => {
+                if payload.len() < 8 {
+                    anyhow::bail!("VerifyRequest message too short");
+                }
+                let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+                Ok(DfsmMessage::VerifyRequest {
+                    sync_epoch,
+                    csum_id,
+                })
+            }
+            DfsmMessageType::Verify => {
+                if payload.len() < 40 {
+                    anyhow::bail!("Verify message too short");
+                }
+                let csum_id = u64::from_le_bytes(payload[0..8].try_into().unwrap());
+                let mut checksum = [0u8; 32];
+                checksum.copy_from_slice(&payload[8..40]);
+                Ok(DfsmMessage::Verify {
+                    sync_epoch,
+                    csum_id,
+                    checksum,
+                })
+            }
+        }
+    }
+
+    /// Helper to create a Normal message from an application message
+    pub fn from_message(msg_count: u64, message: M, protocol_version: u32) -> Self {
+        let timestamp = std::time::SystemTime::now()
+            .duration_since(std::time::UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs() as u32;
+
+        DfsmMessage::Normal {
+            msg_count,
+            timestamp,
+            protocol_version,
+            message,
+        }
+    }
+
+    /// Helper to create an Update message from a TreeEntry
+    ///
+    /// Used by the leader during synchronization to send individual database entries
+    /// to nodes that need to catch up. Matches C's dcdb_send_update_inode().
+    pub fn from_tree_entry(tree_entry: TreeEntry, sync_epoch: SyncEpoch) -> Self {
+        DfsmMessage::Update {
+            sync_epoch,
+            tree_entry,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::FuseMessage;
+
+    #[test]
+    fn test_sync_start_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 1234567890,
+            nodeid: 1,
+            pid: 1000,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        assert!(
+            matches!(deserialized, DfsmMessage::SyncStart { sync_epoch: e } if e == sync_epoch)
+        );
+    }
+
+    #[test]
+    fn test_normal_roundtrip() {
+        let fuse_msg = FuseMessage::Create {
+            path: "/test/file".to_string(),
+        };
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Normal {
+            msg_count: 42,
+            timestamp: 1234567890,
+            protocol_version: DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION,
+            message: fuse_msg.clone(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version,
+                message,
+            } => {
+                assert_eq!(msg_count, 42);
+                assert_eq!(timestamp, 1234567890);
+                assert_eq!(
+                    protocol_version,
+                    DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION
+                );
+                assert_eq!(message, fuse_msg);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_verify_request_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 2,
+            time: 1234567891,
+            nodeid: 2,
+            pid: 2000,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: 0x123456789ABCDEF0,
+        };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::VerifyRequest {
+                sync_epoch: e,
+                csum_id,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, 0x123456789ABCDEF0);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_verify_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 3,
+            time: 1234567892,
+            nodeid: 3,
+            pid: 3000,
+        };
+        let checksum = [42u8; 32];
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Verify {
+            sync_epoch,
+            csum_id: 0x1122334455667788,
+            checksum,
+        };
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Verify {
+                sync_epoch: e,
+                csum_id,
+                checksum: recv_checksum,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, 0x1122334455667788);
+                assert_eq!(recv_checksum, checksum);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_invalid_magic() {
+        let data = vec![0xAA, 0x00, 0x01, 0x02];
+        assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+    }
+
+    #[test]
+    fn test_too_short() {
+        let data = vec![0xFF];
+        assert!(DfsmMessage::<FuseMessage>::deserialize(&data).is_err());
+    }
+
+    // ===== Edge Case Tests =====
+
+    #[test]
+    fn test_state_message_too_short() {
+        // State messages need at least 32 bytes (16 base + 16 epoch)
+        let mut data = vec![0u8; 31]; // One byte short
+        // Set message type to State (2)
+        data[0..2].copy_from_slice(&2u16.to_le_bytes());
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(result.is_err(), "State message with 31 bytes should fail");
+        assert!(result.unwrap_err().to_string().contains("too short"));
+    }
+
+    #[test]
+    fn test_normal_message_missing_count() {
+        // Normal messages need count field (u64) after 16-byte header
+        let mut data = vec![0u8; 20]; // Header + 4 bytes (not enough for u64 count)
+        // Set message type to Normal (0)
+        data[0..2].copy_from_slice(&0u16.to_le_bytes());
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(
+            result.is_err(),
+            "Normal message without full count field should fail"
+        );
+    }
+
+    #[test]
+    fn test_verify_message_truncated_checksum() {
+        // Verify messages need csum_id (8 bytes) + checksum (32 bytes) = 40 bytes payload
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 123,
+            nodeid: 1,
+            pid: 100,
+        };
+        let mut data = Vec::new();
+
+        // Base header (16 bytes)
+        data.extend_from_slice(&6u16.to_le_bytes()); // Verify message type
+        data.extend_from_slice(&0u16.to_le_bytes()); // subtype
+        data.extend_from_slice(&1u32.to_le_bytes()); // protocol
+        data.extend_from_slice(&123u32.to_le_bytes()); // time
+        data.extend_from_slice(&0u32.to_le_bytes()); // reserved
+
+        // Epoch (16 bytes)
+        data.extend_from_slice(&sync_epoch.serialize());
+
+        // Truncated payload (only 39 bytes instead of 40)
+        data.extend_from_slice(&0x12345678u64.to_le_bytes());
+        data.extend_from_slice(&[0u8; 31]); // Only 31 bytes of checksum
+
+        let result = DfsmMessage::<FuseMessage>::deserialize(&data);
+        assert!(
+            result.is_err(),
+            "Verify message with truncated checksum should fail"
+        );
+    }
+
+    #[test]
+    fn test_update_message_with_tree_entry() {
+        use pmxcfs_memdb::TreeEntry;
+
+        // Create a valid tree entry with matching size
+        let data = vec![1, 2, 3, 4, 5];
+        let tree_entry = TreeEntry {
+            inode: 42,
+            parent: 0,
+            version: 1,
+            writer: 0,
+            name: "testfile".to_string(),
+            mtime: 1234567890,
+            size: data.len(), // size must match data.len()
+            entry_type: 8,    // DT_REG (regular file)
+            data,
+        };
+
+        let sync_epoch = SyncEpoch {
+            epoch: 5,
+            time: 999,
+            nodeid: 2,
+            pid: 200,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::Update {
+            sync_epoch,
+            tree_entry: tree_entry.clone(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::Update {
+                sync_epoch: e,
+                tree_entry: recv_entry,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(recv_entry.inode, tree_entry.inode);
+                assert_eq!(recv_entry.name, tree_entry.name);
+                assert_eq!(recv_entry.size, tree_entry.size);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_update_complete_roundtrip() {
+        let sync_epoch = SyncEpoch {
+            epoch: 10,
+            time: 5555,
+            nodeid: 3,
+            pid: 300,
+        };
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+
+        let serialized = msg.serialize();
+        assert_eq!(
+            serialized.len(),
+            32,
+            "UpdateComplete should be exactly 32 bytes (header + epoch)"
+        );
+
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        assert!(
+            matches!(deserialized, DfsmMessage::UpdateComplete { sync_epoch: e } if e == sync_epoch)
+        );
+    }
+
+    #[test]
+    fn test_state_message_with_large_payload() {
+        let sync_epoch = SyncEpoch {
+            epoch: 7,
+            time: 7777,
+            nodeid: 4,
+            pid: 400,
+        };
+        // Create a large payload (1MB)
+        let large_data = vec![0xAB; 1024 * 1024];
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::State {
+            sync_epoch,
+            data: large_data.clone(),
+        };
+
+        let serialized = msg.serialize();
+        // Should be 32 bytes header + 1MB data
+        assert_eq!(serialized.len(), 32 + 1024 * 1024);
+
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::State {
+                sync_epoch: e,
+                data,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(data.len(), large_data.len());
+                assert_eq!(data, large_data);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+
+    #[test]
+    fn test_message_type_detection() {
+        let sync_epoch = SyncEpoch {
+            epoch: 1,
+            time: 100,
+            nodeid: 1,
+            pid: 50,
+        };
+
+        let sync_start: DfsmMessage<FuseMessage> = DfsmMessage::SyncStart { sync_epoch };
+        assert_eq!(sync_start.message_type(), DfsmMessageType::SyncStart);
+
+        let state: DfsmMessage<FuseMessage> = DfsmMessage::State {
+            sync_epoch,
+            data: vec![1, 2, 3],
+        };
+        assert_eq!(state.message_type(), DfsmMessageType::State);
+
+        let update_complete: DfsmMessage<FuseMessage> = DfsmMessage::UpdateComplete { sync_epoch };
+        assert_eq!(
+            update_complete.message_type(),
+            DfsmMessageType::UpdateComplete
+        );
+    }
+
+    #[test]
+    fn test_from_message_helper() {
+        let fuse_msg = FuseMessage::Mkdir {
+            path: "/new/dir".to_string(),
+        };
+        let msg_count = 123;
+        let protocol_version = DfsmMessage::<FuseMessage>::DEFAULT_PROTOCOL_VERSION;
+
+        let dfsm_msg = DfsmMessage::from_message(msg_count, fuse_msg.clone(), protocol_version);
+
+        match dfsm_msg {
+            DfsmMessage::Normal {
+                msg_count: count,
+                timestamp: _,
+                protocol_version: pv,
+                message,
+            } => {
+                assert_eq!(count, msg_count);
+                assert_eq!(pv, protocol_version);
+                assert_eq!(message, fuse_msg);
+            }
+            _ => panic!("from_message should create Normal variant"),
+        }
+    }
+
+    #[test]
+    fn test_verify_request_with_max_csum_id() {
+        let sync_epoch = SyncEpoch {
+            epoch: 99,
+            time: 9999,
+            nodeid: 5,
+            pid: 500,
+        };
+        let max_csum_id = u64::MAX; // Test with maximum value
+
+        let msg: DfsmMessage<FuseMessage> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: max_csum_id,
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = DfsmMessage::<FuseMessage>::deserialize(&serialized).unwrap();
+
+        match deserialized {
+            DfsmMessage::VerifyRequest {
+                sync_epoch: e,
+                csum_id,
+            } => {
+                assert_eq!(e, sync_epoch);
+                assert_eq!(csum_id, max_csum_id);
+            }
+            _ => panic!("Wrong message type"),
+        }
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
new file mode 100644
index 00000000..ee5d28f8
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/fuse_message.rs
@@ -0,0 +1,185 @@
+/// FUSE message types for cluster synchronization
+///
+/// These are the high-level operations that get broadcast through the cluster
+/// via the main database DFSM (pmxcfs_v1 CPG group).
+use anyhow::{Context, Result};
+
+use crate::message::Message;
+use crate::wire_format::{CFuseMessage, CMessageType};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum FuseMessage {
+    /// Create a regular file
+    Create { path: String },
+    /// Create a directory
+    Mkdir { path: String },
+    /// Write data to a file
+    Write {
+        path: String,
+        offset: u64,
+        data: Vec<u8>,
+    },
+    /// Delete a file or directory
+    Delete { path: String },
+    /// Rename/move a file or directory
+    Rename { from: String, to: String },
+    /// Update modification time
+    Mtime { path: String },
+    /// Request unlock (not yet implemented)
+    UnlockRequest { path: String },
+    /// Unlock (not yet implemented)
+    Unlock { path: String },
+}
+
+impl Message for FuseMessage {
+    fn message_type(&self) -> u16 {
+        match self {
+            FuseMessage::Create { .. } => CMessageType::Create as u16,
+            FuseMessage::Mkdir { .. } => CMessageType::Mkdir as u16,
+            FuseMessage::Write { .. } => CMessageType::Write as u16,
+            FuseMessage::Delete { .. } => CMessageType::Delete as u16,
+            FuseMessage::Rename { .. } => CMessageType::Rename as u16,
+            FuseMessage::Mtime { .. } => CMessageType::Mtime as u16,
+            FuseMessage::UnlockRequest { .. } => CMessageType::UnlockRequest as u16,
+            FuseMessage::Unlock { .. } => CMessageType::Unlock as u16,
+        }
+    }
+
+    fn serialize(&self) -> Vec<u8> {
+        let c_msg = match self {
+            FuseMessage::Create { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+            FuseMessage::Mkdir { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+            FuseMessage::Write { path, offset, data } => CFuseMessage {
+                size: data.len() as u32,
+                offset: *offset as u32,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: data.clone(),
+            },
+            FuseMessage::Delete { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+            FuseMessage::Rename { from, to } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: from.clone(),
+                to: Some(to.clone()),
+                data: Vec::new(),
+            },
+            FuseMessage::Mtime { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+            FuseMessage::UnlockRequest { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+            FuseMessage::Unlock { path } => CFuseMessage {
+                size: 0,
+                offset: 0,
+                flags: 0,
+                path: path.clone(),
+                to: None,
+                data: Vec::new(),
+            },
+        };
+
+        c_msg.serialize()
+    }
+
+    fn deserialize(message_type: u16, data: &[u8]) -> Result<Self> {
+        let c_msg = CFuseMessage::parse(data).context("Failed to parse C FUSE message")?;
+        let msg_type = CMessageType::try_from(message_type).context("Invalid C message type")?;
+
+        Ok(match msg_type {
+            CMessageType::Create => FuseMessage::Create { path: c_msg.path },
+            CMessageType::Mkdir => FuseMessage::Mkdir { path: c_msg.path },
+            CMessageType::Write => FuseMessage::Write {
+                path: c_msg.path,
+                offset: c_msg.offset as u64,
+                data: c_msg.data,
+            },
+            CMessageType::Delete => FuseMessage::Delete { path: c_msg.path },
+            CMessageType::Rename => FuseMessage::Rename {
+                from: c_msg.path,
+                to: c_msg.to.unwrap_or_default(),
+            },
+            CMessageType::Mtime => FuseMessage::Mtime { path: c_msg.path },
+            CMessageType::UnlockRequest => FuseMessage::UnlockRequest { path: c_msg.path },
+            CMessageType::Unlock => FuseMessage::Unlock { path: c_msg.path },
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_fuse_message_create() {
+        let msg = FuseMessage::Create {
+            path: "/test/file".to_string(),
+        };
+        assert_eq!(msg.message_type(), CMessageType::Create as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_write() {
+        let msg = FuseMessage::Write {
+            path: "/test/file".to_string(),
+            offset: 100,
+            data: vec![1, 2, 3, 4, 5],
+        };
+        assert_eq!(msg.message_type(), CMessageType::Write as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_fuse_message_rename() {
+        let msg = FuseMessage::Rename {
+            from: "/old/path".to_string(),
+            to: "/new/path".to_string(),
+        };
+        assert_eq!(msg.message_type(), CMessageType::Rename as u16);
+
+        let serialized = msg.serialize();
+        let deserialized = FuseMessage::deserialize(msg.message_type(), &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
new file mode 100644
index 00000000..db49a469
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/kv_store_message.rs
@@ -0,0 +1,329 @@
+/// KvStore message types for DFSM status synchronization
+///
+/// This module defines the KvStore message types that are delivered through
+/// the status DFSM state machine (pve_kvstore_v1 CPG group).
+use anyhow::Context;
+
+use crate::message::Message;
+
+/// KvStore message type IDs (matches C's kvstore_message_t enum)
+#[derive(
+    Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive, num_enum::IntoPrimitive,
+)]
+#[repr(u16)]
+enum KvStoreMessageType {
+    Update = 1,         // KVSTORE_MESSAGE_UPDATE
+    UpdateComplete = 2, // KVSTORE_MESSAGE_UPDATE_COMPLETE
+    Log = 3,            // KVSTORE_MESSAGE_LOG
+}
+
+/// KvStore message types for ephemeral status synchronization
+///
+/// These messages are used by the kvstore DFSM (pve_kvstore_v1 CPG group)
+/// to synchronize ephemeral data like RRD metrics, node IPs, and cluster logs.
+///
+/// Matches C implementation's KVSTORE_MESSAGE_* types in status.c
+#[derive(Debug, Clone, PartialEq)]
+pub enum KvStoreMessage {
+    /// Update key-value data from a node
+    ///
+    /// Wire format: key (256 bytes, null-terminated) + value (variable length)
+    /// Matches C's KVSTORE_MESSAGE_UPDATE
+    Update { key: String, value: Vec<u8> },
+
+    /// Cluster log entry
+    ///
+    /// Wire format: clog_entry_t struct
+    /// Matches C's KVSTORE_MESSAGE_LOG
+    Log {
+        time: u32,
+        priority: u8,
+        node: String,
+        ident: String,
+        tag: String,
+        message: String,
+    },
+
+    /// Update complete signal (not currently used)
+    ///
+    /// Matches C's KVSTORE_MESSAGE_UPDATE_COMPLETE
+    UpdateComplete,
+}
+
+impl KvStoreMessage {
+    /// Get message type ID (matches C's kvstore_message_t enum)
+    pub fn message_type(&self) -> u16 {
+        let msg_type = match self {
+            KvStoreMessage::Update { .. } => KvStoreMessageType::Update,
+            KvStoreMessage::UpdateComplete => KvStoreMessageType::UpdateComplete,
+            KvStoreMessage::Log { .. } => KvStoreMessageType::Log,
+        };
+        msg_type.into()
+    }
+
+    /// Serialize to C-compatible wire format
+    ///
+    /// Update format: key (256 bytes, null-terminated) + value (variable)
+    /// Log format: clog_entry_t struct
+    pub fn serialize(&self) -> Vec<u8> {
+        match self {
+            KvStoreMessage::Update { key, value } => {
+                // C format: char key[256] + data
+                let mut buf = vec![0u8; 256];
+                let key_bytes = key.as_bytes();
+                let copy_len = key_bytes.len().min(255); // Leave room for null terminator
+                buf[..copy_len].copy_from_slice(&key_bytes[..copy_len]);
+                // buf is already zero-filled, so null terminator is automatic
+
+                buf.extend_from_slice(value);
+                buf
+            }
+            KvStoreMessage::Log {
+                time,
+                priority,
+                node,
+                ident,
+                tag,
+                message,
+            } => {
+                // C format: clog_entry_t
+                // struct clog_entry_t {
+                //     uint32_t time;
+                //     uint8_t priority;
+                //     uint8_t padding[3];
+                //     uint32_t node_len, ident_len, tag_len, msg_len;
+                //     char data[];  // node + ident + tag + message (all null-terminated)
+                // }
+
+                let node_bytes = node.as_bytes();
+                let ident_bytes = ident.as_bytes();
+                let tag_bytes = tag.as_bytes();
+                let msg_bytes = message.as_bytes();
+
+                let node_len = (node_bytes.len() + 1) as u32; // +1 for null
+                let ident_len = (ident_bytes.len() + 1) as u32;
+                let tag_len = (tag_bytes.len() + 1) as u32;
+                let msg_len = (msg_bytes.len() + 1) as u32;
+
+                let total_len = 4 + 1 + 3 + 16 + node_len + ident_len + tag_len + msg_len;
+                let mut buf = Vec::with_capacity(total_len as usize);
+
+                buf.extend_from_slice(&time.to_le_bytes());
+                buf.push(*priority);
+                buf.extend_from_slice(&[0u8; 3]); // padding
+                buf.extend_from_slice(&node_len.to_le_bytes());
+                buf.extend_from_slice(&ident_len.to_le_bytes());
+                buf.extend_from_slice(&tag_len.to_le_bytes());
+                buf.extend_from_slice(&msg_len.to_le_bytes());
+
+                buf.extend_from_slice(node_bytes);
+                buf.push(0); // null terminator
+                buf.extend_from_slice(ident_bytes);
+                buf.push(0);
+                buf.extend_from_slice(tag_bytes);
+                buf.push(0);
+                buf.extend_from_slice(msg_bytes);
+                buf.push(0);
+
+                buf
+            }
+            KvStoreMessage::UpdateComplete => {
+                // No payload
+                Vec::new()
+            }
+        }
+    }
+
+    /// Deserialize from C-compatible wire format
+    pub fn deserialize(msg_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+        use KvStoreMessageType::*;
+
+        let msg_type = KvStoreMessageType::try_from(msg_type)
+            .map_err(|_| anyhow::anyhow!("Unknown kvstore message type: {msg_type}"))?;
+
+        match msg_type {
+            Update => {
+                if data.len() < 256 {
+                    anyhow::bail!("UPDATE message too short: {} < 256", data.len());
+                }
+
+                // Find null terminator in first 256 bytes
+                let key_end = data[..256]
+                    .iter()
+                    .position(|&b| b == 0)
+                    .ok_or_else(|| anyhow::anyhow!("UPDATE key not null-terminated"))?;
+
+                let key = std::str::from_utf8(&data[..key_end])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in UPDATE key: {e}"))?
+                    .to_string();
+
+                let value = data[256..].to_vec();
+
+                Ok(KvStoreMessage::Update { key, value })
+            }
+            UpdateComplete => Ok(KvStoreMessage::UpdateComplete),
+            Log => {
+                if data.len() < 20 {
+                    // Minimum: 4+1+3+16 = 24 bytes header
+                    anyhow::bail!("LOG message too short");
+                }
+
+                let time = u32::from_le_bytes([data[0], data[1], data[2], data[3]]);
+                let priority = data[4];
+                // data[5..8] is padding
+
+                let node_len = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
+                let ident_len =
+                    u32::from_le_bytes([data[12], data[13], data[14], data[15]]) as usize;
+                let tag_len = u32::from_le_bytes([data[16], data[17], data[18], data[19]]) as usize;
+                let msg_len = u32::from_le_bytes([data[20], data[21], data[22], data[23]]) as usize;
+
+                let expected_len = 24 + node_len + ident_len + tag_len + msg_len;
+                if data.len() != expected_len {
+                    anyhow::bail!(
+                        "LOG message size mismatch: {} != {}",
+                        data.len(),
+                        expected_len
+                    );
+                }
+
+                let mut offset = 24;
+
+                let node = std::str::from_utf8(&data[offset..offset + node_len - 1])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG node: {e}"))?
+                    .to_string();
+                offset += node_len;
+
+                let ident = std::str::from_utf8(&data[offset..offset + ident_len - 1])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG ident: {e}"))?
+                    .to_string();
+                offset += ident_len;
+
+                let tag = std::str::from_utf8(&data[offset..offset + tag_len - 1])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG tag: {e}"))?
+                    .to_string();
+                offset += tag_len;
+
+                let message = std::str::from_utf8(&data[offset..offset + msg_len - 1])
+                    .map_err(|e| anyhow::anyhow!("Invalid UTF-8 in LOG message: {e}"))?
+                    .to_string();
+
+                Ok(KvStoreMessage::Log {
+                    time,
+                    priority,
+                    node,
+                    ident,
+                    tag,
+                    message,
+                })
+            }
+        }
+    }
+}
+
+impl Message for KvStoreMessage {
+    fn message_type(&self) -> u16 {
+        // Delegate to the existing method
+        KvStoreMessage::message_type(self)
+    }
+
+    fn serialize(&self) -> Vec<u8> {
+        // Delegate to the existing method
+        KvStoreMessage::serialize(self)
+    }
+
+    fn deserialize(message_type: u16, data: &[u8]) -> anyhow::Result<Self> {
+        // Delegate to the existing method
+        KvStoreMessage::deserialize(message_type, data)
+            .context("Failed to deserialize KvStoreMessage")
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_kvstore_message_update_serialization() {
+        let msg = KvStoreMessage::Update {
+            key: "test_key".to_string(),
+            value: vec![1, 2, 3, 4, 5],
+        };
+
+        let serialized = msg.serialize();
+        assert_eq!(serialized.len(), 256 + 5);
+        assert_eq!(&serialized[..8], b"test_key");
+        assert_eq!(serialized[8], 0); // null terminator
+        assert_eq!(&serialized[256..], &[1, 2, 3, 4, 5]);
+
+        let deserialized = KvStoreMessage::deserialize(1, &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_kvstore_message_log_serialization() {
+        let msg = KvStoreMessage::Log {
+            time: 1234567890,
+            priority: 5,
+            node: "node1".to_string(),
+            ident: "pmxcfs".to_string(),
+            tag: "info".to_string(),
+            message: "test message".to_string(),
+        };
+
+        let serialized = msg.serialize();
+        let deserialized = KvStoreMessage::deserialize(3, &serialized).unwrap();
+        assert_eq!(msg, deserialized);
+    }
+
+    #[test]
+    fn test_kvstore_message_type() {
+        assert_eq!(
+            KvStoreMessage::Update {
+                key: "".into(),
+                value: vec![]
+            }
+            .message_type(),
+            1
+        );
+        assert_eq!(KvStoreMessage::UpdateComplete.message_type(), 2);
+        assert_eq!(
+            KvStoreMessage::Log {
+                time: 0,
+                priority: 0,
+                node: "".into(),
+                ident: "".into(),
+                tag: "".into(),
+                message: "".into()
+            }
+            .message_type(),
+            3
+        );
+    }
+
+    #[test]
+    fn test_kvstore_message_type_roundtrip() {
+        // Test that message_type() and deserialize() are consistent
+        use super::KvStoreMessageType;
+
+        assert_eq!(u16::from(KvStoreMessageType::Update), 1);
+        assert_eq!(u16::from(KvStoreMessageType::UpdateComplete), 2);
+        assert_eq!(u16::from(KvStoreMessageType::Log), 3);
+
+        assert_eq!(
+            KvStoreMessageType::try_from(1).unwrap(),
+            KvStoreMessageType::Update
+        );
+        assert_eq!(
+            KvStoreMessageType::try_from(2).unwrap(),
+            KvStoreMessageType::UpdateComplete
+        );
+        assert_eq!(
+            KvStoreMessageType::try_from(3).unwrap(),
+            KvStoreMessageType::Log
+        );
+
+        assert!(KvStoreMessageType::try_from(0).is_err());
+        assert!(KvStoreMessageType::try_from(4).is_err());
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
new file mode 100644
index 00000000..89240483
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/lib.rs
@@ -0,0 +1,32 @@
+/// Distributed Finite State Machine (DFSM) for cluster state synchronization
+///
+/// This crate implements the state machine for synchronizing configuration
+/// changes across the cluster nodes using Corosync CPG.
+///
+/// The DFSM handles:
+/// - State synchronization between nodes
+/// - Message ordering and queuing
+/// - Leader-based state updates
+/// - Split-brain prevention
+/// - Membership change handling
+mod callbacks;
+pub mod cluster_database_service;
+mod cpg_service;
+mod dfsm_message;
+mod fuse_message;
+mod kv_store_message;
+mod message;
+mod state_machine;
+pub mod status_sync_service;
+mod types;
+mod wire_format;
+
+// Re-export public API
+pub use callbacks::Callbacks;
+pub use cluster_database_service::ClusterDatabaseService;
+pub use cpg_service::{CpgHandler, CpgService};
+pub use fuse_message::FuseMessage;
+pub use kv_store_message::KvStoreMessage;
+pub use state_machine::{Dfsm, DfsmBroadcast};
+pub use status_sync_service::StatusSyncService;
+pub use types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
new file mode 100644
index 00000000..24e6847b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/message.rs
@@ -0,0 +1,21 @@
+/// High-level message abstraction for DFSM
+///
+/// This module provides a Message trait for working with cluster messages
+/// at a higher abstraction level than raw bytes.
+use anyhow::Result;
+
+/// Trait for messages that can be sent through DFSM
+pub trait Message: Sized {
+    /// Get the message type identifier
+    fn message_type(&self) -> u16;
+
+    /// Serialize the message to bytes (application message payload only)
+    ///
+    /// This serializes only the application-level payload. The DFSM protocol
+    /// headers (msg_count, timestamp, protocol_version, etc.) are added by
+    /// DfsmMessage::serialize() when wrapping in DfsmMessage::Normal.
+    fn serialize(&self) -> Vec<u8>;
+
+    /// Deserialize from bytes given a message type
+    fn deserialize(message_type: u16, data: &[u8]) -> Result<Self>;
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
new file mode 100644
index 00000000..2c90e4ea
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/state_machine.rs
@@ -0,0 +1,1013 @@
+/// DFSM state machine implementation
+///
+/// This module contains the main Dfsm struct and its implementation
+/// for managing distributed state synchronization.
+use anyhow::{Context, Result};
+use parking_lot::{Mutex, RwLock};
+use pmxcfs_api_types::MemberInfo;
+use rust_corosync::{NodeId, cpg};
+use std::collections::{BTreeMap, VecDeque};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use super::cpg_service::{CpgHandler, CpgService};
+use super::dfsm_message::DfsmMessage;
+use super::message::Message as MessageTrait;
+use super::types::{DfsmMode, QueuedMessage, SyncEpoch};
+use crate::{Callbacks, FuseMessage, NodeSyncInfo};
+
+/// Extension trait to add broadcast() method to Option<Arc<Dfsm<FuseMessage>>>
+///
+/// This allows calling `.broadcast()` directly on Option<Arc<Dfsm<FuseMessage>>> fields
+/// without explicit None checking at call sites.
+pub trait DfsmBroadcast {
+    fn broadcast(&self, msg: FuseMessage);
+}
+
+impl DfsmBroadcast for Option<Arc<Dfsm<FuseMessage>>> {
+    fn broadcast(&self, msg: FuseMessage) {
+        if let Some(dfsm) = self {
+            let _ = dfsm.broadcast(msg);
+        }
+    }
+}
+
+/// DFSM state machine
+///
+/// The generic parameter `M` specifies the message type this DFSM handles:
+/// - `Dfsm<FuseMessage>` for main database operations
+/// - `Dfsm<KvStoreMessage>` for status synchronization
+pub struct Dfsm<M> {
+    /// CPG service for cluster communication (matching C's dfsm_t->cpg_handle)
+    cpg_service: RwLock<Option<Arc<CpgService>>>,
+
+    /// Cluster group name for CPG
+    cluster_name: String,
+
+    /// Callbacks for application integration
+    callbacks: Arc<dyn Callbacks<M>>,
+
+    /// Current operating mode
+    mode: RwLock<DfsmMode>,
+
+    /// Current sync epoch
+    sync_epoch: RwLock<SyncEpoch>,
+
+    /// Local epoch counter
+    local_epoch_counter: Mutex<u32>,
+
+    /// Node synchronization info
+    sync_nodes: RwLock<Vec<NodeSyncInfo>>,
+
+    /// Message queue (ordered by count)
+    msg_queue: Mutex<BTreeMap<u64, QueuedMessage<M>>>,
+
+    /// Sync queue for messages during update mode
+    sync_queue: Mutex<VecDeque<QueuedMessage<M>>>,
+
+    /// Message counter for ordering (atomic for lock-free increment)
+    msg_counter: AtomicU64,
+
+    /// Lowest node ID in cluster (leader)
+    lowest_nodeid: RwLock<u32>,
+
+    /// Our node ID (set during init_cpg via cpg_local_get)
+    nodeid: AtomicU32,
+
+    /// Our process ID
+    pid: u32,
+
+    /// Protocol version for cluster compatibility
+    protocol_version: u32,
+
+    /// State verification - SHA-256 checksum
+    checksum: Mutex<[u8; 32]>,
+
+    /// Checksum epoch (when it was computed)
+    checksum_epoch: Mutex<SyncEpoch>,
+
+    /// Checksum ID for verification
+    checksum_id: Mutex<u64>,
+
+    /// Checksum counter for verify requests
+    checksum_counter: Mutex<u64>,
+}
+
+impl<M> Dfsm<M>
+where
+    M: MessageTrait,
+{
+    /// Create a new DFSM instance
+    ///
+    /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+    pub fn new(cluster_name: String, callbacks: Arc<dyn Callbacks<M>>) -> Result<Self> {
+        Self::new_with_protocol_version(cluster_name, callbacks, DfsmMessage::<M>::DEFAULT_PROTOCOL_VERSION)
+    }
+
+    /// Create a new DFSM instance with a specific protocol version
+    ///
+    /// This is used when the DFSM needs to use a non-default protocol version,
+    /// such as the status/kvstore DFSM which uses protocol version 0 for
+    /// compatibility with the C implementation.
+    ///
+    /// Note: nodeid will be obtained from CPG via cpg_local_get() during init_cpg()
+    pub fn new_with_protocol_version(
+        cluster_name: String,
+        callbacks: Arc<dyn Callbacks<M>>,
+        protocol_version: u32,
+    ) -> Result<Self> {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs() as u32;
+        let pid = std::process::id();
+
+        Ok(Self {
+            cpg_service: RwLock::new(None),
+            cluster_name,
+            callbacks,
+            mode: RwLock::new(DfsmMode::Start),
+            sync_epoch: RwLock::new(SyncEpoch {
+                epoch: 0,
+                time: now,
+                nodeid: 0,
+                pid,
+            }),
+            local_epoch_counter: Mutex::new(0),
+            sync_nodes: RwLock::new(Vec::new()),
+            msg_queue: Mutex::new(BTreeMap::new()),
+            sync_queue: Mutex::new(VecDeque::new()),
+            msg_counter: AtomicU64::new(0),
+            lowest_nodeid: RwLock::new(0),
+            nodeid: AtomicU32::new(0), // Will be set by init_cpg() using cpg_local_get()
+            pid,
+            protocol_version,
+            checksum: Mutex::new([0u8; 32]),
+            checksum_epoch: Mutex::new(SyncEpoch {
+                epoch: 0,
+                time: 0,
+                nodeid: 0,
+                pid: 0,
+            }),
+            checksum_id: Mutex::new(0),
+            checksum_counter: Mutex::new(0),
+        })
+    }
+
+    pub fn get_mode(&self) -> DfsmMode {
+        *self.mode.read()
+    }
+
+    pub fn set_mode(&self, new_mode: DfsmMode) {
+        let mut mode = self.mode.write();
+        let old_mode = *mode;
+
+        if old_mode.is_error() && !new_mode.is_error() {
+            return;
+        }
+
+        if old_mode == new_mode {
+            return;
+        }
+
+        *mode = new_mode;
+        drop(mode);
+
+        if new_mode.is_error() {
+            tracing::error!("DFSM: {}", new_mode);
+        } else {
+            tracing::info!("DFSM: {}", new_mode);
+        }
+    }
+
+    pub fn is_leader(&self) -> bool {
+        let lowest = *self.lowest_nodeid.read();
+        lowest > 0 && lowest == self.nodeid.load(Ordering::Relaxed)
+    }
+
+    pub fn get_nodeid(&self) -> u32 {
+        self.nodeid.load(Ordering::Relaxed)
+    }
+
+    pub fn get_pid(&self) -> u32 {
+        self.pid
+    }
+
+    /// Check if DFSM is synced and ready
+    pub fn is_synced(&self) -> bool {
+        self.get_mode() == DfsmMode::Synced
+    }
+
+    /// Check if DFSM encountered an error
+    pub fn is_error(&self) -> bool {
+        self.get_mode().is_error()
+    }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+    fn send_sync_start(&self) -> Result<()> {
+        tracing::debug!("DFSM: sending SYNC_START message");
+        let sync_epoch = *self.sync_epoch.read();
+        self.send_dfsm_message(&DfsmMessage::<M>::SyncStart { sync_epoch })
+    }
+
+    fn send_state(&self) -> Result<()> {
+        tracing::debug!("DFSM: generating and sending state");
+
+        let state_data = self
+            .callbacks
+            .get_state()
+            .context("Failed to get state from callbacks")?;
+
+        tracing::info!("DFSM: sending state ({} bytes)", state_data.len());
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::State {
+            sync_epoch,
+            data: state_data,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    pub(super) fn send_dfsm_message(&self, message: &DfsmMessage<M>) -> Result<()> {
+        let serialized = message.serialize();
+
+        if let Some(ref service) = *self.cpg_service.read() {
+            service
+                .mcast(cpg::Guarantee::TypeAgreed, &serialized)
+                .context("Failed to broadcast DFSM message")?;
+            Ok(())
+        } else {
+            anyhow::bail!("CPG not initialized")
+        }
+    }
+
+    pub fn process_state(&self, nodeid: u32, pid: u32, state: &[u8]) -> Result<()> {
+        tracing::debug!(
+            "DFSM: processing state from node {}/{} ({} bytes)",
+            nodeid,
+            pid,
+            state.len()
+        );
+
+        let mut sync_nodes = self.sync_nodes.write();
+
+        if let Some(node) = sync_nodes
+            .iter_mut()
+            .find(|n| n.nodeid == nodeid && n.pid == pid)
+        {
+            node.state = Some(state.to_vec());
+        } else {
+            tracing::warn!("DFSM: received state from unknown node {}/{}", nodeid, pid);
+            return Ok(());
+        }
+
+        let all_received = sync_nodes.iter().all(|n| n.state.is_some());
+        drop(sync_nodes);
+
+        if all_received {
+            tracing::info!("DFSM: received all states, processing synchronization");
+            self.process_state_sync()?;
+        }
+
+        Ok(())
+    }
+
+    fn process_state_sync(&self) -> Result<()> {
+        tracing::info!("DFSM: processing state synchronization");
+
+        let sync_nodes = self.sync_nodes.read().clone();
+
+        match self.callbacks.process_state_update(&sync_nodes) {
+            Ok(synced) => {
+                if synced {
+                    tracing::info!("DFSM: state synchronization successful");
+
+                    let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+                    let mut sync_nodes_write = self.sync_nodes.write();
+                    if let Some(node) = sync_nodes_write
+                        .iter_mut()
+                        .find(|n| n.nodeid == my_nodeid && n.pid == self.pid)
+                    {
+                        node.synced = true;
+                    }
+                    drop(sync_nodes_write);
+
+                    self.set_mode(DfsmMode::Synced);
+                    self.callbacks.on_synced();
+                    self.deliver_message_queue()?;
+                } else {
+                    tracing::info!("DFSM: entering UPDATE mode, waiting for leader");
+                    self.set_mode(DfsmMode::Update);
+                    self.deliver_message_queue()?;
+                }
+            }
+            Err(e) => {
+                tracing::error!("DFSM: state synchronization failed: {}", e);
+                self.set_mode(DfsmMode::Error);
+                return Err(e);
+            }
+        }
+
+        Ok(())
+    }
+
+    pub fn queue_message(&self, nodeid: u32, pid: u32, msg_count: u64, message: M, timestamp: u64)
+    where
+        M: Clone,
+    {
+        tracing::debug!(
+            "DFSM: queueing message {} from {}/{}",
+            msg_count,
+            nodeid,
+            pid
+        );
+
+        let qm = QueuedMessage {
+            nodeid,
+            pid,
+            _msg_count: msg_count,
+            message,
+            timestamp,
+        };
+
+        let mode = self.get_mode();
+
+        let node_synced = self
+            .sync_nodes
+            .read()
+            .iter()
+            .find(|n| n.nodeid == nodeid && n.pid == pid)
+            .map(|n| n.synced)
+            .unwrap_or(false);
+
+        if mode == DfsmMode::Update && node_synced {
+            self.sync_queue.lock().push_back(qm);
+        } else {
+            self.msg_queue.lock().insert(msg_count, qm);
+        }
+    }
+
+    pub(super) fn deliver_message_queue(&self) -> Result<()>
+    where
+        M: Clone,
+    {
+        let mut queue = self.msg_queue.lock();
+        if queue.is_empty() {
+            return Ok(());
+        }
+
+        tracing::info!("DFSM: delivering {} queued messages", queue.len());
+
+        let mode = self.get_mode();
+        let sync_nodes = self.sync_nodes.read().clone();
+
+        let mut to_remove = Vec::new();
+
+        for (count, qm) in queue.iter() {
+            let node_info = sync_nodes
+                .iter()
+                .find(|n| n.nodeid == qm.nodeid && n.pid == qm.pid);
+
+            let Some(info) = node_info else {
+                tracing::debug!(
+                    "DFSM: removing message from non-member {}/{}",
+                    qm.nodeid,
+                    qm.pid
+                );
+                to_remove.push(*count);
+                continue;
+            };
+
+            if mode == DfsmMode::Synced && info.synced {
+                tracing::debug!("DFSM: delivering message {}", count);
+
+                match self.callbacks.deliver_message(
+                    qm.nodeid,
+                    qm.pid,
+                    qm.message.clone(),
+                    qm.timestamp,
+                ) {
+                    Ok((result, processed)) => {
+                        tracing::debug!(
+                            "DFSM: message delivered, result={}, processed={}",
+                            result,
+                            processed
+                        );
+                    }
+                    Err(e) => {
+                        tracing::error!("DFSM: failed to deliver message: {}", e);
+                    }
+                }
+
+                to_remove.push(*count);
+            } else if mode == DfsmMode::Update && info.synced {
+                self.sync_queue.lock().push_back(qm.clone());
+                to_remove.push(*count);
+            }
+        }
+
+        for count in to_remove {
+            queue.remove(&count);
+        }
+
+        Ok(())
+    }
+
+    pub(super) fn deliver_sync_queue(&self) -> Result<()> {
+        let mut sync_queue = self.sync_queue.lock();
+        let queue_len = sync_queue.len();
+
+        if queue_len == 0 {
+            return Ok(());
+        }
+
+        tracing::info!("DFSM: delivering {} sync queue messages", queue_len);
+
+        while let Some(qm) = sync_queue.pop_front() {
+            tracing::debug!(
+                "DFSM: delivering sync message from {}/{}",
+                qm.nodeid,
+                qm.pid
+            );
+
+            match self
+                .callbacks
+                .deliver_message(qm.nodeid, qm.pid, qm.message, qm.timestamp)
+            {
+                Ok((result, processed)) => {
+                    tracing::debug!(
+                        "DFSM: sync message delivered, result={}, processed={}",
+                        result,
+                        processed
+                    );
+                }
+                Err(e) => {
+                    tracing::error!("DFSM: failed to deliver sync message: {}", e);
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Send a message to the cluster
+    ///
+    /// Creates a properly formatted Normal message with C-compatible headers.
+    pub fn send_message(&self, message: M) -> Result<u64> {
+        let msg_count = self.msg_counter.fetch_add(1, Ordering::SeqCst) + 1;
+
+        tracing::debug!("DFSM: sending message {}", msg_count);
+
+        let dfsm_msg = DfsmMessage::from_message(msg_count, message, self.protocol_version);
+
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(msg_count)
+    }
+
+    /// Send a TreeEntry update to the cluster (leader only, during synchronization)
+    ///
+    /// This is used by the leader to send individual database entries to followers
+    /// that need to catch up. Matches C's dfsm_send_update().
+    pub fn send_update(&self, tree_entry: pmxcfs_memdb::TreeEntry) -> Result<()> {
+        tracing::debug!("DFSM: sending Update for inode {}", tree_entry.inode);
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::from_tree_entry(tree_entry, sync_epoch);
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Send UpdateComplete signal to cluster (leader only, after sending all updates)
+    ///
+    /// Signals to followers that all Update messages have been sent and they can
+    /// now transition to Synced mode. Matches C's dfsm_send_update_complete().
+    pub fn send_update_complete(&self) -> Result<()> {
+        tracing::info!("DFSM: sending UpdateComplete");
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::UpdateComplete { sync_epoch };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Request checksum verification (leader only)
+    /// This should be called periodically by the leader to verify cluster state consistency
+    pub fn verify_request(&self) -> Result<()> {
+        // Only leader should send verify requests
+        if !self.is_leader() {
+            return Ok(());
+        }
+
+        // Only verify when synced
+        if self.get_mode() != DfsmMode::Synced {
+            return Ok(());
+        }
+
+        // Check if we need to wait for previous verification to complete
+        let checksum_counter = *self.checksum_counter.lock();
+        let checksum_id = *self.checksum_id.lock();
+
+        if checksum_counter != checksum_id {
+            tracing::debug!(
+                "DFSM: delaying verify request {:016x}",
+                checksum_counter + 1
+            );
+            return Ok(());
+        }
+
+        // Increment counter and send verify request
+        *self.checksum_counter.lock() = checksum_counter + 1;
+        let new_counter = checksum_counter + 1;
+
+        tracing::debug!("DFSM: sending verify request {:016x}", new_counter);
+
+        // Send VERIFY_REQUEST message with counter
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg: DfsmMessage<M> = DfsmMessage::VerifyRequest {
+            sync_epoch,
+            csum_id: new_counter,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Handle verify request from leader
+    pub fn handle_verify_request(&self, message_epoch: SyncEpoch, csum_id: u64) -> Result<()> {
+        tracing::debug!("DFSM: received verify request {:016x}", csum_id);
+
+        // Compute current state checksum
+        let mut checksum = [0u8; 32];
+        self.callbacks.compute_checksum(&mut checksum)?;
+
+        // Save checksum info
+        // Store the epoch FROM THE MESSAGE (matching C: dfsm.c:736)
+        *self.checksum.lock() = checksum;
+        *self.checksum_epoch.lock() = message_epoch;
+        *self.checksum_id.lock() = csum_id;
+
+        // Send the checksum verification response
+        tracing::debug!("DFSM: sending verify response");
+
+        let sync_epoch = *self.sync_epoch.read();
+        let dfsm_msg = DfsmMessage::Verify {
+            sync_epoch,
+            csum_id,
+            checksum,
+        };
+        self.send_dfsm_message(&dfsm_msg)?;
+
+        Ok(())
+    }
+
+    /// Handle verify response from a node
+    pub fn handle_verify(
+        &self,
+        message_epoch: SyncEpoch,
+        csum_id: u64,
+        received_checksum: &[u8; 32],
+    ) -> Result<()> {
+        tracing::debug!("DFSM: received verify response");
+
+        let our_checksum_id = *self.checksum_id.lock();
+        let our_checksum_epoch = *self.checksum_epoch.lock();
+
+        // Check if this verification matches our saved checksum
+        // Compare with MESSAGE epoch, not current epoch (matching C: dfsm.c:766-767)
+        if our_checksum_id == csum_id && our_checksum_epoch == message_epoch {
+            let our_checksum = *self.checksum.lock();
+
+            // Compare checksums
+            if our_checksum != *received_checksum {
+                tracing::error!(
+                    "DFSM: checksum mismatch! Expected {:016x?}, got {:016x?}",
+                    &our_checksum[..8],
+                    &received_checksum[..8]
+                );
+                tracing::error!("DFSM: data divergence detected - restarting cluster sync");
+                self.set_mode(DfsmMode::Leave);
+                return Err(anyhow::anyhow!("Checksum verification failed"));
+            } else {
+                tracing::info!("DFSM: data verification successful");
+            }
+        } else {
+            tracing::debug!("DFSM: skipping verification - no checksum saved or epoch mismatch");
+        }
+
+        Ok(())
+    }
+
+    /// Invalidate saved checksum (called on membership changes)
+    pub fn invalidate_checksum(&self) {
+        let counter = *self.checksum_counter.lock();
+        *self.checksum_id.lock() = counter;
+
+        // Reset checksum epoch
+        *self.checksum_epoch.lock() = SyncEpoch {
+            epoch: 0,
+            time: 0,
+            nodeid: 0,
+            pid: 0,
+        };
+
+        tracing::debug!("DFSM: checksum invalidated");
+    }
+}
+
+/// FuseMessage-specific methods
+impl Dfsm<FuseMessage> {
+    /// Broadcast a filesystem operation to the cluster
+    ///
+    /// Checks if the cluster is synced before broadcasting.
+    /// If not synced, the message is silently dropped.
+    pub fn broadcast(&self, msg: FuseMessage) -> Result<()> {
+        if !self.is_synced() {
+            return Ok(());
+        }
+
+        tracing::debug!("Broadcasting {:?}", msg);
+        self.send_message(msg)?;
+        tracing::debug!("Broadcast successful");
+
+        Ok(())
+    }
+}
+
+impl<M: MessageTrait + Clone> Dfsm<M> {
+    /// Handle incoming DFSM message from cluster (called by CpgHandler)
+    fn handle_dfsm_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        message: DfsmMessage<M>,
+    ) -> anyhow::Result<()> {
+        // Validate epoch for state messages (all except Normal and SyncStart)
+        // This matches C implementation's epoch checking in dfsm.c:665-673
+        let should_validate_epoch = !matches!(
+            message,
+            DfsmMessage::Normal { .. } | DfsmMessage::SyncStart { .. }
+        );
+
+        if should_validate_epoch {
+            let current_epoch = *self.sync_epoch.read();
+            let message_epoch = match &message {
+                DfsmMessage::State { sync_epoch, .. }
+                | DfsmMessage::Update { sync_epoch, .. }
+                | DfsmMessage::UpdateComplete { sync_epoch }
+                | DfsmMessage::VerifyRequest { sync_epoch, .. }
+                | DfsmMessage::Verify { sync_epoch, .. } => *sync_epoch,
+                _ => unreachable!(),
+            };
+
+            if message_epoch != current_epoch {
+                tracing::debug!(
+                    "DFSM: ignoring message with wrong epoch (expected {:?}, got {:?})",
+                    current_epoch,
+                    message_epoch
+                );
+                return Ok(());
+            }
+        }
+
+        // Match on typed message variants
+        match message {
+            DfsmMessage::Normal {
+                msg_count,
+                timestamp,
+                protocol_version: _,
+                message: app_msg,
+            } => self.handle_normal_message(nodeid, pid, msg_count, timestamp, app_msg),
+            DfsmMessage::SyncStart { sync_epoch } => self.handle_sync_start(nodeid, sync_epoch),
+            DfsmMessage::State {
+                sync_epoch: _,
+                data,
+            } => self.process_state(nodeid, pid, &data),
+            DfsmMessage::Update {
+                sync_epoch: _,
+                tree_entry,
+            } => self.handle_update(nodeid, pid, tree_entry),
+            DfsmMessage::UpdateComplete { sync_epoch: _ } => self.handle_update_complete(),
+            DfsmMessage::VerifyRequest {
+                sync_epoch,
+                csum_id,
+            } => self.handle_verify_request(sync_epoch, csum_id),
+            DfsmMessage::Verify {
+                sync_epoch,
+                csum_id,
+                checksum,
+            } => self.handle_verify(sync_epoch, csum_id, &checksum),
+        }
+    }
+
+    /// Handle membership change notification (called by CpgHandler)
+    fn handle_membership_change(&self, members: &[MemberInfo]) -> anyhow::Result<()> {
+        tracing::info!(
+            "DFSM: handling membership change ({} members)",
+            members.len()
+        );
+
+        // Invalidate saved checksum
+        self.invalidate_checksum();
+
+        // Update epoch
+        let mut counter = self.local_epoch_counter.lock();
+        *counter += 1;
+
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs() as u32;
+
+        let new_epoch = SyncEpoch {
+            epoch: *counter,
+            time: now,
+            nodeid: self.nodeid.load(Ordering::Relaxed),
+            pid: self.pid,
+        };
+
+        *self.sync_epoch.write() = new_epoch;
+        drop(counter);
+
+        // Find lowest node ID (leader)
+        let lowest = members.iter().map(|m| m.node_id).min().unwrap_or(0);
+        *self.lowest_nodeid.write() = lowest;
+
+        // Initialize sync nodes
+        let mut sync_nodes = self.sync_nodes.write();
+        sync_nodes.clear();
+
+        for member in members {
+            sync_nodes.push(NodeSyncInfo {
+                nodeid: member.node_id,
+                pid: member.pid,
+                state: None,
+                synced: false,
+            });
+        }
+        drop(sync_nodes);
+
+        // Clear queues
+        self.sync_queue.lock().clear();
+
+        // Determine next mode
+        if members.len() == 1 {
+            // Single node - already synced
+            tracing::info!("DFSM: single node cluster, marking as synced");
+            self.set_mode(DfsmMode::Synced);
+
+            // Mark ourselves as synced
+            let mut sync_nodes = self.sync_nodes.write();
+            if let Some(node) = sync_nodes.first_mut() {
+                node.synced = true;
+            }
+
+            // Deliver queued messages
+            self.deliver_message_queue()?;
+        } else {
+            // Multi-node - start synchronization
+            tracing::info!("DFSM: multi-node cluster, starting sync");
+            self.set_mode(DfsmMode::StartSync);
+
+            // If we're the leader, initiate sync
+            if self.is_leader() {
+                tracing::info!("DFSM: we are leader, sending sync start");
+                self.send_sync_start()?;
+
+                // Leader also needs to send its own state
+                // (CPG doesn't loop back messages to sender)
+                self.send_state().context("Failed to send leader state")?;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Handle normal application message
+    fn handle_normal_message(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        msg_count: u64,
+        timestamp: u32,
+        message: M,
+    ) -> Result<()> {
+        // C version: deliver immediately if in Synced mode, otherwise queue
+        if self.get_mode() == DfsmMode::Synced {
+            // Deliver immediately - message is already deserialized
+            match self.callbacks.deliver_message(
+                nodeid,
+                pid,
+                message,
+                timestamp as u64, // Convert back to u64 for callback compatibility
+            ) {
+                Ok((result, processed)) => {
+                    tracing::debug!(
+                        "DFSM: message delivered immediately, result={}, processed={}",
+                        result,
+                        processed
+                    );
+                }
+                Err(e) => {
+                    tracing::error!("DFSM: failed to deliver message: {}", e);
+                }
+            }
+        } else {
+            // Queue for later delivery - store typed message directly
+            self.queue_message(nodeid, pid, msg_count, message, timestamp as u64);
+        }
+        Ok(())
+    }
+
+    /// Handle SyncStart message from leader
+    fn handle_sync_start(&self, nodeid: u32, new_epoch: SyncEpoch) -> Result<()> {
+        tracing::info!(
+            "DFSM: received SyncStart from node {} with epoch {:?}",
+            nodeid,
+            new_epoch
+        );
+
+        // Adopt the new epoch from the leader (critical for sync protocol!)
+        // This matches C implementation which updates dfsm->sync_epoch
+        *self.sync_epoch.write() = new_epoch;
+        tracing::debug!("DFSM: adopted new sync epoch from leader");
+
+        // Send our state back to the cluster
+        // BUT: don't send if we're the leader (we already sent our state in handle_membership_change)
+        let my_nodeid = self.nodeid.load(Ordering::Relaxed);
+        if nodeid != my_nodeid {
+            self.send_state()
+                .context("Failed to send state in response to SyncStart")?;
+            tracing::debug!("DFSM: sent state in response to SyncStart");
+        } else {
+            tracing::debug!("DFSM: skipping state send (we're the leader who already sent state)");
+        }
+
+        Ok(())
+    }
+
+    /// Handle Update message from leader
+    fn handle_update(
+        &self,
+        nodeid: u32,
+        pid: u32,
+        tree_entry: pmxcfs_memdb::TreeEntry,
+    ) -> Result<()> {
+        // Serialize TreeEntry for callback (process_update expects raw bytes for now)
+        let serialized = tree_entry.serialize_for_update();
+        if let Err(e) = self.callbacks.process_update(nodeid, pid, &serialized) {
+            tracing::error!("DFSM: failed to process update: {}", e);
+        }
+        Ok(())
+    }
+
+    /// Handle UpdateComplete message
+    fn handle_update_complete(&self) -> Result<()> {
+        tracing::info!("DFSM: received UpdateComplete from leader");
+        self.deliver_sync_queue()?;
+        self.set_mode(DfsmMode::Synced);
+        self.callbacks.on_synced();
+        Ok(())
+    }
+}
+
+/// Implementation of CpgHandler trait for DFSM
+///
+/// This allows Dfsm to receive CPG callbacks in an idiomatic Rust way,
+/// with all unsafe pointer handling managed by the CpgService.
+impl<M: MessageTrait + Clone + Send + Sync + 'static> CpgHandler for Dfsm<M> {
+    fn on_deliver(&self, _group_name: &str, nodeid: NodeId, pid: u32, msg: &[u8]) {
+        tracing::debug!(
+            "DFSM CPG message from node {} (pid {}): {} bytes",
+            u32::from(nodeid),
+            pid,
+            msg.len()
+        );
+
+        // Deserialize DFSM protocol message
+        match DfsmMessage::<M>::deserialize(msg) {
+            Ok(dfsm_msg) => {
+                if let Err(e) = self.handle_dfsm_message(u32::from(nodeid), pid, dfsm_msg) {
+                    tracing::error!("Error handling DFSM message: {}", e);
+                }
+            }
+            Err(e) => {
+                tracing::error!("Failed to deserialize DFSM message: {}", e);
+            }
+        }
+    }
+
+    fn on_confchg(
+        &self,
+        _group_name: &str,
+        member_list: &[cpg::Address],
+        _left_list: &[cpg::Address],
+        _joined_list: &[cpg::Address],
+    ) {
+        tracing::info!("DFSM CPG membership change: {} members", member_list.len());
+
+        // Build MemberInfo list from CPG addresses
+        let members: Vec<MemberInfo> = member_list
+            .iter()
+            .map(|addr| MemberInfo {
+                node_id: u32::from(addr.nodeid),
+                pid: addr.pid,
+                joined_at: SystemTime::now()
+                    .duration_since(UNIX_EPOCH)
+                    .unwrap_or_default()
+                    .as_secs(),
+            })
+            .collect();
+
+        // Notify DFSM of membership change
+        if let Err(e) = self.handle_membership_change(&members) {
+            tracing::error!("Failed to handle membership change: {}", e);
+        }
+    }
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Dfsm<M> {
+    /// Initialize CPG (Closed Process Group) for cluster communication
+    ///
+    /// Uses the idiomatic CpgService wrapper which handles all unsafe FFI
+    /// and callback management internally.
+    pub fn init_cpg(self: &Arc<Self>) -> Result<()> {
+        tracing::info!("DFSM: Initializing CPG");
+
+        // Create CPG service with this Dfsm as the handler
+        // CpgService handles all callback registration and context management
+        let cpg_service = Arc::new(CpgService::new(Arc::clone(self))?);
+
+        // Get our node ID from CPG (matches C's cpg_local_get)
+        // This MUST be done after cpg_initialize but before joining the group
+        let nodeid = cpg::local_get(cpg_service.handle())?;
+        let nodeid_u32 = u32::from(nodeid);
+        self.nodeid.store(nodeid_u32, Ordering::Relaxed);
+        tracing::info!("DFSM: Got node ID {} from CPG", nodeid_u32);
+
+        // Join the CPG group
+        let group_name = &self.cluster_name;
+        cpg_service
+            .join(group_name)
+            .context("Failed to join CPG group")?;
+
+        tracing::info!("DFSM joined CPG group '{}'", group_name);
+
+        // Store the service
+        *self.cpg_service.write() = Some(cpg_service);
+
+        // Dispatch once to get initial membership
+        if let Some(ref service) = *self.cpg_service.read()
+            && let Err(e) = service.dispatch()
+        {
+            tracing::warn!("Failed to dispatch CPG events: {:?}", e);
+        }
+
+        tracing::info!("DFSM CPG initialized successfully");
+        Ok(())
+    }
+
+    /// Dispatch CPG events (should be called periodically from event loop)
+    /// Matching C's service_dfsm_dispatch
+    pub fn dispatch_events(&self) -> Result<(), rust_corosync::CsError> {
+        if let Some(ref service) = *self.cpg_service.read() {
+            service.dispatch()
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Get CPG file descriptor for event monitoring
+    pub fn fd_get(&self) -> Result<i32> {
+        if let Some(ref service) = *self.cpg_service.read() {
+            service.fd()
+        } else {
+            Err(anyhow::anyhow!("CPG service not initialized"))
+        }
+    }
+
+    /// Stop DFSM services (leave CPG group and finalize)
+    pub fn stop_services(&self) -> Result<()> {
+        tracing::info!("DFSM: Stopping services");
+
+        // Leave the CPG group before dropping the service
+        let group_name = self.cluster_name.clone();
+        if let Some(ref service) = *self.cpg_service.read()
+            && let Err(e) = service.leave(&group_name)
+        {
+            tracing::warn!("Error leaving CPG group: {:?}", e);
+        }
+
+        // Drop the service (CpgService::drop handles finalization)
+        *self.cpg_service.write() = None;
+
+        tracing::info!("DFSM services stopped");
+        Ok(())
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
new file mode 100644
index 00000000..877058a4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/status_sync_service.rs
@@ -0,0 +1,118 @@
+//! Status Sync Service
+//!
+//! This service synchronizes ephemeral status data across the cluster using a separate
+//! DFSM instance with the "pve_kvstore_v1" CPG group.
+//!
+//! Equivalent to C implementation's service_status (the kvstore DFSM).
+//! Handles synchronization of:
+//! - RRD data (performance metrics from each node)
+//! - Node IP addresses
+//! - Cluster log entries
+//! - Other ephemeral status key-value data
+
+use async_trait::async_trait;
+use pmxcfs_services::{DispatchAction, InitResult, Service, ServiceError};
+use rust_corosync::CsError;
+use std::sync::Arc;
+use std::time::Duration;
+use tracing::{error, info, warn};
+
+use crate::Dfsm;
+use crate::message::Message as MessageTrait;
+
+/// Status Sync Service
+///
+/// Synchronizes ephemeral status data across all nodes using a separate DFSM instance.
+/// Uses CPG group "pve_kvstore_v1" (separate from main config database "pmxcfs_v1").
+///
+/// This implements the Service trait to provide:
+/// - Automatic retry if CPG initialization fails
+/// - Event-driven CPG dispatching for status replication
+/// - Separation of status data from config data for better performance
+///
+/// This is equivalent to C implementation's service_status (the kvstore DFSM).
+///
+/// The generic parameter `M` specifies the message type this service handles.
+pub struct StatusSyncService<M> {
+    dfsm: Arc<Dfsm<M>>,
+    fd: Option<i32>,
+}
+
+impl<M: MessageTrait + Clone + Send + Sync + 'static> StatusSyncService<M> {
+    /// Create a new status sync service
+    pub fn new(dfsm: Arc<Dfsm<M>>) -> Self {
+        Self { dfsm, fd: None }
+    }
+}
+
+#[async_trait]
+impl<M: MessageTrait + Clone + Send + Sync + 'static> Service for StatusSyncService<M> {
+    fn name(&self) -> &str {
+        "status-sync"
+    }
+
+    async fn initialize(&mut self) -> pmxcfs_services::Result<InitResult> {
+        info!("Initializing status sync service (kvstore)");
+
+        // Initialize CPG connection for kvstore group
+        self.dfsm.init_cpg().map_err(|e| {
+            ServiceError::InitializationFailed(format!(
+                "Status sync CPG initialization failed: {e}"
+            ))
+        })?;
+
+        // Get file descriptor for event monitoring
+        let fd = self.dfsm.fd_get().map_err(|e| {
+            self.dfsm.stop_services().ok();
+            ServiceError::InitializationFailed(format!("Failed to get status sync fd: {e}"))
+        })?;
+
+        self.fd = Some(fd);
+
+        info!(
+            "Status sync service initialized successfully with fd {}",
+            fd
+        );
+        Ok(InitResult::WithFileDescriptor(fd))
+    }
+
+    async fn dispatch(&mut self) -> pmxcfs_services::Result<DispatchAction> {
+        match self.dfsm.dispatch_events() {
+            Ok(_) => Ok(DispatchAction::Continue),
+            Err(CsError::CsErrLibrary) | Err(CsError::CsErrBadHandle) => {
+                warn!("Status sync connection lost, requesting reinitialization");
+                Ok(DispatchAction::Reinitialize)
+            }
+            Err(e) => {
+                error!("Status sync dispatch failed: {}", e);
+                Err(ServiceError::DispatchFailed(format!(
+                    "Status sync dispatch failed: {e}"
+                )))
+            }
+        }
+    }
+
+    async fn finalize(&mut self) -> pmxcfs_services::Result<()> {
+        info!("Finalizing status sync service");
+
+        self.fd = None;
+
+        if let Err(e) = self.dfsm.stop_services() {
+            warn!("Error stopping status sync services: {}", e);
+        }
+
+        info!("Status sync service finalized");
+        Ok(())
+    }
+
+    async fn timer_callback(&mut self) -> pmxcfs_services::Result<()> {
+        // Status sync doesn't need periodic verification like the main database
+        // Status data is ephemeral and doesn't require the same consistency guarantees
+        Ok(())
+    }
+
+    fn timer_period(&self) -> Option<Duration> {
+        // No periodic timer needed for status sync
+        None
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
new file mode 100644
index 00000000..5a2eb964
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/types.rs
@@ -0,0 +1,107 @@
+/// DFSM type definitions
+///
+/// This module contains all type definitions used by the DFSM state machine.
+/// DFSM operating modes
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum DfsmMode {
+    /// Initial state - starting cluster connection
+    Start = 0,
+
+    /// Starting data synchronization
+    StartSync = 1,
+
+    /// All data is up to date
+    Synced = 2,
+
+    /// Waiting for updates from leader
+    Update = 3,
+
+    /// Error states (>= 128)
+    Leave = 253,
+    VersionError = 254,
+    Error = 255,
+}
+
+impl DfsmMode {
+    /// Check if this is an error mode
+    pub fn is_error(&self) -> bool {
+        (*self as u8) >= 128
+    }
+}
+
+impl std::fmt::Display for DfsmMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            DfsmMode::Start => write!(f, "start cluster connection"),
+            DfsmMode::StartSync => write!(f, "starting data synchronization"),
+            DfsmMode::Synced => write!(f, "all data is up to date"),
+            DfsmMode::Update => write!(f, "waiting for updates from leader"),
+            DfsmMode::Leave => write!(f, "leaving cluster"),
+            DfsmMode::VersionError => write!(f, "protocol version mismatch"),
+            DfsmMode::Error => write!(f, "serious internal error"),
+        }
+    }
+}
+
+/// DFSM message types (internal protocol messages)
+/// Matches C's dfsm_message_t enum values
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum DfsmMessageType {
+    Normal = 0,
+    SyncStart = 1,
+    State = 2,
+    Update = 3,
+    UpdateComplete = 4,
+    VerifyRequest = 5,
+    Verify = 6,
+}
+
+/// Sync epoch - identifies a synchronization session
+/// Matches C's dfsm_sync_epoch_t structure (16 bytes total)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct SyncEpoch {
+    pub epoch: u32,
+    pub time: u32,
+    pub nodeid: u32,
+    pub pid: u32,
+}
+
+impl SyncEpoch {
+    /// Serialize to C-compatible wire format (16 bytes)
+    /// Format: [epoch: u32][time: u32][nodeid: u32][pid: u32]
+    pub fn serialize(&self) -> [u8; 16] {
+        let mut bytes = [0u8; 16];
+        bytes[0..4].copy_from_slice(&self.epoch.to_le_bytes());
+        bytes[4..8].copy_from_slice(&self.time.to_le_bytes());
+        bytes[8..12].copy_from_slice(&self.nodeid.to_le_bytes());
+        bytes[12..16].copy_from_slice(&self.pid.to_le_bytes());
+        bytes
+    }
+
+    /// Deserialize from C-compatible wire format (16 bytes)
+    pub fn deserialize(bytes: &[u8]) -> Result<Self, &'static str> {
+        if bytes.len() < 16 {
+            return Err("SyncEpoch requires 16 bytes");
+        }
+        Ok(SyncEpoch {
+            epoch: u32::from_le_bytes(bytes[0..4].try_into().unwrap()),
+            time: u32::from_le_bytes(bytes[4..8].try_into().unwrap()),
+            nodeid: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
+            pid: u32::from_le_bytes(bytes[12..16].try_into().unwrap()),
+        })
+    }
+}
+
+/// Queued message awaiting delivery
+#[derive(Debug, Clone)]
+pub(super) struct QueuedMessage<M> {
+    pub nodeid: u32,
+    pub pid: u32,
+    pub _msg_count: u64,
+    pub message: M,
+    pub timestamp: u64,
+}
+
+// Re-export NodeSyncInfo from pmxcfs-api-types for use in Callbacks trait
+pub use pmxcfs_api_types::NodeSyncInfo;
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
new file mode 100644
index 00000000..2750b281
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/src/wire_format.rs
@@ -0,0 +1,220 @@
+/// C-compatible wire format for cluster communication
+///
+/// This module implements the exact wire protocol used by the C version of pmxcfs
+/// to ensure compatibility with C-based cluster nodes.
+///
+/// The C version uses a simple format with iovec arrays containing raw C types.
+use anyhow::{Context, Result};
+use bytemuck::{Pod, Zeroable};
+use std::ffi::CStr;
+
+/// C message types (must match dcdb.h)
+#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
+#[repr(u16)]
+pub enum CMessageType {
+    Write = 1,
+    Mkdir = 2,
+    Delete = 3,
+    Rename = 4,
+    Create = 5,
+    Mtime = 6,
+    UnlockRequest = 7,
+    Unlock = 8,
+}
+
+/// C-compatible FUSE message header
+/// Layout matches the iovec array from C: [size][offset][pathlen][tolen][flags]
+#[derive(Debug, Clone, Copy, Pod, Zeroable)]
+#[repr(C)]
+struct CFuseMessageHeader {
+    size: u32,
+    offset: u32,
+    pathlen: u32,
+    tolen: u32,
+    flags: u32,
+}
+
+/// Parsed C FUSE message
+#[derive(Debug, Clone)]
+pub struct CFuseMessage {
+    pub size: u32,
+    pub offset: u32,
+    pub flags: u32,
+    pub path: String,
+    pub to: Option<String>,
+    pub data: Vec<u8>,
+}
+
+impl CFuseMessage {
+    /// Parse a C FUSE message from raw bytes
+    pub fn parse(data: &[u8]) -> Result<Self> {
+        if data.len() < std::mem::size_of::<CFuseMessageHeader>() {
+            return Err(anyhow::anyhow!(
+                "Message too short: {} < {}",
+                data.len(),
+                std::mem::size_of::<CFuseMessageHeader>()
+            ));
+        }
+
+        // Parse header manually to avoid alignment issues
+        let header = CFuseMessageHeader {
+            size: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
+            offset: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
+            pathlen: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
+            tolen: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
+            flags: u32::from_le_bytes([data[16], data[17], data[18], data[19]]),
+        };
+
+        let mut offset = std::mem::size_of::<CFuseMessageHeader>();
+
+        // Parse path
+        let path = if header.pathlen > 0 {
+            if offset + header.pathlen as usize > data.len() {
+                return Err(anyhow::anyhow!("Invalid path length"));
+            }
+            let path_bytes = &data[offset..offset + header.pathlen as usize];
+            offset += header.pathlen as usize;
+
+            // C strings are null-terminated
+            CStr::from_bytes_until_nul(path_bytes)
+                .context("Invalid path string")?
+                .to_str()
+                .context("Path not valid UTF-8")?
+                .to_string()
+        } else {
+            String::new()
+        };
+
+        // Parse 'to' (for rename operations)
+        let to = if header.tolen > 0 {
+            if offset + header.tolen as usize > data.len() {
+                return Err(anyhow::anyhow!("Invalid tolen"));
+            }
+            let to_bytes = &data[offset..offset + header.tolen as usize];
+            offset += header.tolen as usize;
+
+            Some(
+                CStr::from_bytes_until_nul(to_bytes)
+                    .context("Invalid to string")?
+                    .to_str()
+                    .context("To path not valid UTF-8")?
+                    .to_string(),
+            )
+        } else {
+            None
+        };
+
+        // Parse data buffer
+        let buf_data = if header.size > 0 {
+            if offset + header.size as usize > data.len() {
+                return Err(anyhow::anyhow!("Invalid data size"));
+            }
+            data[offset..offset + header.size as usize].to_vec()
+        } else {
+            Vec::new()
+        };
+
+        Ok(CFuseMessage {
+            size: header.size,
+            offset: header.offset,
+            flags: header.flags,
+            path,
+            to,
+            data: buf_data,
+        })
+    }
+
+    /// Serialize to C wire format
+    pub fn serialize(&self) -> Vec<u8> {
+        let path_bytes = self.path.as_bytes();
+        let pathlen = if path_bytes.is_empty() {
+            0
+        } else {
+            (path_bytes.len() + 1) as u32 // +1 for null terminator
+        };
+
+        let to_bytes = self.to.as_ref().map(|s| s.as_bytes()).unwrap_or(&[]);
+        let tolen = if to_bytes.is_empty() {
+            0
+        } else {
+            (to_bytes.len() + 1) as u32
+        };
+
+        let header = CFuseMessageHeader {
+            size: self.size,
+            offset: self.offset,
+            pathlen,
+            tolen,
+            flags: self.flags,
+        };
+
+        let mut result = Vec::new();
+
+        // Serialize header
+        result.extend_from_slice(bytemuck::bytes_of(&header));
+
+        // Serialize path (with null terminator)
+        if pathlen > 0 {
+            result.extend_from_slice(path_bytes);
+            result.push(0); // null terminator
+        }
+
+        // Serialize 'to' (with null terminator)
+        if tolen > 0 {
+            result.extend_from_slice(to_bytes);
+            result.push(0); // null terminator
+        }
+
+        // Serialize data
+        if self.size > 0 {
+            result.extend_from_slice(&self.data);
+        }
+
+        result
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_serialize_deserialize_write() {
+        let msg = CFuseMessage {
+            size: 13,
+            offset: 0,
+            flags: 0,
+            path: "/test.txt".to_string(),
+            to: None,
+            data: b"Hello, World!".to_vec(),
+        };
+
+        let serialized = msg.serialize();
+        let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+        assert_eq!(parsed.size, msg.size);
+        assert_eq!(parsed.offset, msg.offset);
+        assert_eq!(parsed.flags, msg.flags);
+        assert_eq!(parsed.path, msg.path);
+        assert_eq!(parsed.to, msg.to);
+        assert_eq!(parsed.data, msg.data);
+    }
+
+    #[test]
+    fn test_serialize_deserialize_rename() {
+        let msg = CFuseMessage {
+            size: 0,
+            offset: 0,
+            flags: 0,
+            path: "/old.txt".to_string(),
+            to: Some("/new.txt".to_string()),
+            data: Vec::new(),
+        };
+
+        let serialized = msg.serialize();
+        let parsed = CFuseMessage::parse(&serialized).unwrap();
+
+        assert_eq!(parsed.path, msg.path);
+        assert_eq!(parsed.to, msg.to);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
new file mode 100644
index 00000000..d378f914
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-dfsm/tests/multi_node_sync_tests.rs
@@ -0,0 +1,565 @@
+/// Multi-node integration tests for DFSM cluster synchronization
+///
+/// These tests simulate multi-node clusters to verify the complete synchronization
+/// protocol works correctly with multiple Rust nodes exchanging state.
+use anyhow::Result;
+use pmxcfs_dfsm::{Callbacks, FuseMessage, NodeSyncInfo};
+use pmxcfs_memdb::{MemDb, MemDbIndex, ROOT_INODE, TreeEntry};
+use std::sync::{Arc, Mutex};
+use tempfile::TempDir;
+
+/// Mock callbacks for testing DFSM without full pmxcfs integration
+struct MockCallbacks {
+    memdb: MemDb,
+    states_received: Arc<Mutex<Vec<NodeSyncInfo>>>,
+    updates_received: Arc<Mutex<Vec<TreeEntry>>>,
+    synced_count: Arc<Mutex<usize>>,
+}
+
+impl MockCallbacks {
+    fn new(memdb: MemDb) -> Self {
+        Self {
+            memdb,
+            states_received: Arc::new(Mutex::new(Vec::new())),
+            updates_received: Arc::new(Mutex::new(Vec::new())),
+            synced_count: Arc::new(Mutex::new(0)),
+        }
+    }
+
+    #[allow(dead_code)]
+    fn get_states(&self) -> Vec<NodeSyncInfo> {
+        self.states_received.lock().unwrap().clone()
+    }
+
+    #[allow(dead_code)]
+    fn get_updates(&self) -> Vec<TreeEntry> {
+        self.updates_received.lock().unwrap().clone()
+    }
+
+    #[allow(dead_code)]
+    fn get_synced_count(&self) -> usize {
+        *self.synced_count.lock().unwrap()
+    }
+}
+
+impl Callbacks<FuseMessage> for MockCallbacks {
+    fn deliver_message(
+        &self,
+        _nodeid: u32,
+        _pid: u32,
+        _message: FuseMessage,
+        _timestamp: u64,
+    ) -> Result<(i32, bool)> {
+        Ok((0, true))
+    }
+
+    fn compute_checksum(&self, output: &mut [u8; 32]) -> Result<()> {
+        let checksum = self.memdb.compute_database_checksum()?;
+        output.copy_from_slice(&checksum);
+        Ok(())
+    }
+
+    fn get_state(&self) -> Result<Vec<u8>> {
+        let index = self.memdb.encode_index()?;
+        Ok(index.serialize())
+    }
+
+    fn process_state_update(&self, states: &[NodeSyncInfo]) -> Result<bool> {
+        // Store received states for verification
+        *self.states_received.lock().unwrap() = states.to_vec();
+
+        // Parse indices from states
+        let mut indices: Vec<(u32, u32, MemDbIndex)> = Vec::new();
+        for node in states {
+            if let Some(state_data) = &node.state {
+                match MemDbIndex::deserialize(state_data) {
+                    Ok(index) => indices.push((node.nodeid, node.pid, index)),
+                    Err(_) => continue,
+                }
+            }
+        }
+
+        if indices.is_empty() {
+            return Ok(true);
+        }
+
+        // Find leader (highest version, or if tie, highest mtime)
+        let mut leader_idx = 0;
+        for i in 1..indices.len() {
+            let (_, _, current_index) = &indices[i];
+            let (_, _, leader_index) = &indices[leader_idx];
+            if current_index > leader_index {
+                leader_idx = i;
+            }
+        }
+
+        let (_leader_nodeid, _leader_pid, leader_index) = &indices[leader_idx];
+
+        // Check if WE are synced with leader
+        let our_index = self.memdb.encode_index()?;
+        let we_are_synced = our_index.version == leader_index.version
+            && our_index.mtime == leader_index.mtime
+            && our_index.size == leader_index.size
+            && our_index.entries.len() == leader_index.entries.len()
+            && our_index
+                .entries
+                .iter()
+                .zip(leader_index.entries.iter())
+                .all(|(a, b)| a.inode == b.inode && a.digest == b.digest);
+
+        Ok(we_are_synced)
+    }
+
+    fn process_update(&self, _nodeid: u32, _pid: u32, data: &[u8]) -> Result<()> {
+        // Deserialize and store update
+        let tree_entry = TreeEntry::deserialize_from_update(data)?;
+        self.updates_received
+            .lock()
+            .unwrap()
+            .push(tree_entry.clone());
+
+        // Apply to database
+        self.memdb.apply_tree_entry(tree_entry)?;
+        Ok(())
+    }
+
+    fn commit_state(&self) -> Result<()> {
+        Ok(())
+    }
+
+    fn on_synced(&self) {
+        *self.synced_count.lock().unwrap() += 1;
+    }
+}
+
+fn create_test_node(node_id: u32) -> Result<(MemDb, TempDir, Arc<MockCallbacks>)> {
+    let temp_dir = TempDir::new()?;
+    let db_path = temp_dir.path().join(format!("node{node_id}.db"));
+    let memdb = MemDb::open(&db_path, true)?;
+    // Note: Local operations always use writer=0 (matching C implementation)
+    // Remote DFSM updates use the writer field from the incoming TreeEntry
+
+    let callbacks = Arc::new(MockCallbacks::new(memdb.clone()));
+    Ok((memdb, temp_dir, callbacks))
+}
+
+#[test]
+fn test_two_node_empty_sync() -> Result<()> {
+    // Create two nodes with empty databases
+    let (_memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+    // Generate states from both nodes
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+
+    // Simulate state exchange
+    let states = vec![
+        NodeSyncInfo {
+            nodeid: 1,
+            pid: 1000,
+            state: Some(state1),
+            synced: false,
+        },
+        NodeSyncInfo {
+            nodeid: 2,
+            pid: 2000,
+            state: Some(state2),
+            synced: false,
+        },
+    ];
+
+    // Both nodes process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+
+    // Both should be synced (empty databases are identical)
+    assert!(synced1, "Node 1 should be synced");
+    assert!(synced2, "Node 2 should be synced");
+
+    Ok(())
+}
+
+#[test]
+fn test_two_node_leader_election() -> Result<()> {
+    // Create two nodes
+    let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (_memdb2, _temp2, callbacks2) = create_test_node(2)?;
+
+    // Node 1 has more data (higher version)
+    memdb1.create("/file1.txt", 0, 1000)?;
+    memdb1.write("/file1.txt", 0, 1001, b"data from node 1", 0)?;
+
+    // Generate states
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+
+    // Parse to check versions
+    let index1 = MemDbIndex::deserialize(&state1)?;
+    let index2 = MemDbIndex::deserialize(&state2)?;
+
+    // Node 1 should have higher version
+    assert!(
+        index1.version > index2.version,
+        "Node 1 version {} should be > Node 2 version {}",
+        index1.version,
+        index2.version
+    );
+
+    // Simulate state exchange
+    let states = vec![
+        NodeSyncInfo {
+            nodeid: 1,
+            pid: 1000,
+            state: Some(state1),
+            synced: false,
+        },
+        NodeSyncInfo {
+            nodeid: 2,
+            pid: 2000,
+            state: Some(state2),
+            synced: false,
+        },
+    ];
+
+    // Process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+
+    // Node 1 (leader) should be synced, Node 2 (follower) should not
+    assert!(synced1, "Node 1 (leader) should be synced");
+    assert!(!synced2, "Node 2 (follower) should not be synced");
+
+    Ok(())
+}
+
+#[test]
+fn test_incremental_update_transfer() -> Result<()> {
+    // Create leader and follower
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Leader has data
+    leader_db.create("/config", libc::S_IFDIR, 1000)?;
+    leader_db.create("/config/node.conf", 0, 1001)?;
+    leader_db.write("/config/node.conf", 0, 1002, b"hostname=pve1", 0)?;
+
+    // Get entries from leader
+    let leader_entries = leader_db.get_all_entries()?;
+
+    // Simulate sending updates to follower
+    for entry in leader_entries {
+        if entry.inode == ROOT_INODE {
+            continue; // Skip root (both have it)
+        }
+
+        // Serialize as update message
+        let update_msg = entry.serialize_for_update();
+
+        // Follower receives and processes update
+        follower_callbacks.process_update(1, 1000, &update_msg)?;
+    }
+
+    // Verify follower has the data
+    let config_dir = follower_db.lookup_path("/config");
+    assert!(
+        config_dir.is_some(),
+        "Follower should have /config directory"
+    );
+    assert!(config_dir.unwrap().is_dir());
+
+    let config_file = follower_db.lookup_path("/config/node.conf");
+    assert!(
+        config_file.is_some(),
+        "Follower should have /config/node.conf"
+    );
+
+    let config_data = follower_db.read("/config/node.conf", 0, 1024)?;
+    assert_eq!(
+        config_data, b"hostname=pve1",
+        "Follower should have correct data"
+    );
+
+    Ok(())
+}
+
+#[test]
+fn test_three_node_sync() -> Result<()> {
+    // Create three nodes
+    let (memdb1, _temp1, callbacks1) = create_test_node(1)?;
+    let (memdb2, _temp2, callbacks2) = create_test_node(2)?;
+    let (_memdb3, _temp3, callbacks3) = create_test_node(3)?;
+
+    // Node 1 has the most recent data
+    memdb1.create("/cluster.conf", 0, 5000)?;
+    memdb1.write("/cluster.conf", 0, 5001, b"version=3", 0)?;
+
+    // Node 2 has older data
+    memdb2.create("/cluster.conf", 0, 4000)?;
+    memdb2.write("/cluster.conf", 0, 4001, b"version=2", 0)?;
+
+    // Node 3 is empty (new node joining)
+
+    // Generate states
+    let state1 = callbacks1.get_state()?;
+    let state2 = callbacks2.get_state()?;
+    let state3 = callbacks3.get_state()?;
+
+    let states = vec![
+        NodeSyncInfo {
+            nodeid: 1,
+            pid: 1000,
+            state: Some(state1.clone()),
+            synced: false,
+        },
+        NodeSyncInfo {
+            nodeid: 2,
+            pid: 2000,
+            state: Some(state2.clone()),
+            synced: false,
+        },
+        NodeSyncInfo {
+            nodeid: 3,
+            pid: 3000,
+            state: Some(state3.clone()),
+            synced: false,
+        },
+    ];
+
+    // All nodes process states
+    let synced1 = callbacks1.process_state_update(&states)?;
+    let synced2 = callbacks2.process_state_update(&states)?;
+    let synced3 = callbacks3.process_state_update(&states)?;
+
+    // Node 1 (leader) should be synced
+    assert!(synced1, "Node 1 (leader) should be synced");
+
+    // Nodes 2 and 3 need updates
+    assert!(!synced2, "Node 2 should need updates");
+    assert!(!synced3, "Node 3 should need updates");
+
+    // Verify leader has highest version
+    let index1 = MemDbIndex::deserialize(&state1)?;
+    let index2 = MemDbIndex::deserialize(&state2)?;
+    let index3 = MemDbIndex::deserialize(&state3)?;
+
+    assert!(index1.version >= index2.version);
+    assert!(index1.version >= index3.version);
+
+    Ok(())
+}
+
+#[test]
+fn test_update_message_wire_format_compatibility() -> Result<()> {
+    // Verify our wire format matches C implementation exactly
+    let entry = TreeEntry {
+        inode: 42,
+        parent: 1,
+        version: 100,
+        writer: 2,
+        mtime: 12345,
+        size: 11,
+        entry_type: 8, // DT_REG
+        name: "test.conf".to_string(),
+        data: b"hello world".to_vec(),
+    };
+
+    let serialized = entry.serialize_for_update();
+
+    // Verify header size (41 bytes)
+    // parent(8) + inode(8) + version(8) + writer(4) + mtime(4) + size(4) + namelen(4) + type(1)
+    let expected_header_size = 8 + 8 + 8 + 4 + 4 + 4 + 4 + 1;
+    assert_eq!(expected_header_size, 41);
+
+    // Verify total size
+    let namelen = "test.conf".len() + 1; // Include null terminator
+    let expected_total = expected_header_size + namelen + 11;
+    assert_eq!(serialized.len(), expected_total);
+
+    // Verify we can deserialize it back
+    let deserialized = TreeEntry::deserialize_from_update(&serialized)?;
+    assert_eq!(deserialized.inode, entry.inode);
+    assert_eq!(deserialized.parent, entry.parent);
+    assert_eq!(deserialized.version, entry.version);
+    assert_eq!(deserialized.writer, entry.writer);
+    assert_eq!(deserialized.mtime, entry.mtime);
+    assert_eq!(deserialized.size, entry.size);
+    assert_eq!(deserialized.entry_type, entry.entry_type);
+    assert_eq!(deserialized.name, entry.name);
+    assert_eq!(deserialized.data, entry.data);
+
+    Ok(())
+}
+
+#[test]
+fn test_index_wire_format_compatibility() -> Result<()> {
+    // Verify memdb_index_t wire format matches C implementation
+    use pmxcfs_memdb::IndexEntry;
+
+    let entries = vec![
+        IndexEntry {
+            inode: 1,
+            digest: [0u8; 32],
+        },
+        IndexEntry {
+            inode: 2,
+            digest: [1u8; 32],
+        },
+    ];
+
+    let index = MemDbIndex::new(
+        100,   // version
+        2,     // last_inode
+        1,     // writer
+        12345, // mtime
+        entries,
+    );
+
+    let serialized = index.serialize();
+
+    // Verify header size (32 bytes)
+    // version(8) + last_inode(8) + writer(4) + mtime(4) + size(4) + bytes(4)
+    let expected_header_size = 8 + 8 + 4 + 4 + 4 + 4;
+    assert_eq!(expected_header_size, 32);
+
+    // Verify entry size (40 bytes each)
+    // inode(8) + digest(32)
+    let expected_entry_size = 8 + 32;
+    assert_eq!(expected_entry_size, 40);
+
+    // Verify total size
+    let expected_total = expected_header_size + 2 * expected_entry_size;
+    assert_eq!(serialized.len(), expected_total);
+    assert_eq!(serialized.len(), index.bytes as usize);
+
+    // Verify deserialization
+    let deserialized = MemDbIndex::deserialize(&serialized)?;
+    assert_eq!(deserialized.version, index.version);
+    assert_eq!(deserialized.last_inode, index.last_inode);
+    assert_eq!(deserialized.writer, index.writer);
+    assert_eq!(deserialized.mtime, index.mtime);
+    assert_eq!(deserialized.size, index.size);
+    assert_eq!(deserialized.bytes, index.bytes);
+    assert_eq!(deserialized.entries.len(), 2);
+
+    Ok(())
+}
+
+#[test]
+fn test_sync_with_conflicts() -> Result<()> {
+    // Test scenario: two nodes modified different files
+    let (memdb1, _temp1, _callbacks1) = create_test_node(1)?;
+    let (memdb2, _temp2, _callbacks2) = create_test_node(2)?;
+
+    // Both start with same base
+    memdb1.create("/base.conf", 0, 1000)?;
+    memdb1.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+    memdb2.create("/base.conf", 0, 1000)?;
+    memdb2.write("/base.conf", 0, 1001, b"shared", 0)?;
+
+    // Node 1 adds file1
+    memdb1.create("/file1.txt", 0, 2000)?;
+    memdb1.write("/file1.txt", 0, 2001, b"from node 1", 0)?;
+
+    // Node 2 adds file2
+    memdb2.create("/file2.txt", 0, 2000)?;
+    memdb2.write("/file2.txt", 0, 2001, b"from node 2", 0)?;
+
+    // Generate indices
+    let index1 = memdb1.encode_index()?;
+    let index2 = memdb2.encode_index()?;
+
+    // Find differences
+    let diffs_1_vs_2 = index1.find_differences(&index2);
+    let diffs_2_vs_1 = index2.find_differences(&index1);
+
+    // Node 1 has file1 that node 2 doesn't have
+    assert!(
+        !diffs_1_vs_2.is_empty(),
+        "Node 1 should have entries node 2 doesn't have"
+    );
+
+    // Node 2 has file2 that node 1 doesn't have
+    assert!(
+        !diffs_2_vs_1.is_empty(),
+        "Node 2 should have entries node 1 doesn't have"
+    );
+
+    // Higher version wins - in this case they're both v3 (base + create + write)
+    // so mtime would be tiebreaker
+
+    Ok(())
+}
+
+#[test]
+fn test_large_file_update() -> Result<()> {
+    // Test updating a file with significant data
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Create a file with 10KB of data
+    let large_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect();
+
+    leader_db.create("/large.bin", 0, 1000)?;
+    leader_db.write("/large.bin", 0, 1001, &large_data, 0)?;
+
+    // Get the entry
+    let entry = leader_db.lookup_path("/large.bin").unwrap();
+
+    // Serialize and send
+    let update_msg = entry.serialize_for_update();
+
+    // Follower receives
+    follower_callbacks.process_update(1, 1000, &update_msg)?;
+
+    // Verify
+    let follower_entry = follower_db.lookup_path("/large.bin").unwrap();
+    assert_eq!(follower_entry.size, large_data.len());
+    assert_eq!(follower_entry.data, large_data);
+
+    Ok(())
+}
+
+#[test]
+fn test_directory_hierarchy_sync() -> Result<()> {
+    // Test syncing nested directory structure
+    let (leader_db, _temp_leader, _) = create_test_node(1)?;
+    let (follower_db, _temp_follower, follower_callbacks) = create_test_node(2)?;
+
+    // Create directory hierarchy on leader
+    leader_db.create("/etc", libc::S_IFDIR, 1000)?;
+    leader_db.create("/etc/pve", libc::S_IFDIR, 1001)?;
+    leader_db.create("/etc/pve/nodes", libc::S_IFDIR, 1002)?;
+    leader_db.create("/etc/pve/nodes/pve1", libc::S_IFDIR, 1003)?;
+    leader_db.create("/etc/pve/nodes/pve1/config", 0, 1004)?;
+    leader_db.write(
+        "/etc/pve/nodes/pve1/config",
+        0,
+        1005,
+        b"cpu: 2\nmem: 4096",
+        0,
+    )?;
+
+    // Send all entries to follower
+    let entries = leader_db.get_all_entries()?;
+    for entry in entries {
+        if entry.inode == ROOT_INODE {
+            continue; // Skip root
+        }
+        let update_msg = entry.serialize_for_update();
+        follower_callbacks.process_update(1, 1000, &update_msg)?;
+    }
+
+    // Verify entire hierarchy
+    assert!(follower_db.lookup_path("/etc").is_some());
+    assert!(follower_db.lookup_path("/etc/pve").is_some());
+    assert!(follower_db.lookup_path("/etc/pve/nodes").is_some());
+    assert!(follower_db.lookup_path("/etc/pve/nodes/pve1").is_some());
+
+    let config = follower_db.lookup_path("/etc/pve/nodes/pve1/config");
+    assert!(config.is_some());
+    assert_eq!(config.unwrap().data, b"cpu: 2\nmem: 4096");
+
+    Ok(())
+}
-- 
2.47.3





More information about the pve-devel mailing list