[pve-devel] [PATCH pve-cluster 09/15] pmxcfs-rs: add pmxcfs-ipc crate

Kefu Chai k.chai at proxmox.com
Tue Jan 6 15:24:33 CET 2026


Add libqb-compatible IPC server implementation:
- QB_IPC_SHM protocol (shared memory ring buffers)
- Abstract Unix socket (@pve2) for handshake
- Lock-free SPSC ring buffers
- Authentication via SO_PASSCRED (uid/gid/pid)
- 13 IPC operations (GET_FS_VERSION, GET_CLUSTER_INFO, etc.)

This is an independent crate with no internal dependencies,
only requiring tokio, nix, and memmap2. It provides wire-
compatible IPC with the C implementation's libqb-based server,
allowing existing clients to work unchanged.

Includes wire protocol compatibility tests (require root to run).

Signed-off-by: Kefu Chai <k.chai at proxmox.com>
---
 src/pmxcfs-rs/Cargo.toml                      |    1 +
 src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml           |   44 +
 src/pmxcfs-rs/pmxcfs-ipc/README.md            |  182 +++
 .../pmxcfs-ipc/examples/test_server.rs        |   92 ++
 src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs    |  657 ++++++++++
 src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs       |   93 ++
 src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs           |   37 +
 src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs      |  332 +++++
 src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs    | 1158 +++++++++++++++++
 src/pmxcfs-rs/pmxcfs-ipc/src/server.rs        |  278 ++++
 src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs        |   84 ++
 src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs   |  450 +++++++
 .../pmxcfs-ipc/tests/qb_wire_compat.rs        |  413 ++++++
 13 files changed, 3821 insertions(+)
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/README.md
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
 create mode 100644 src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs

diff --git a/src/pmxcfs-rs/Cargo.toml b/src/pmxcfs-rs/Cargo.toml
index b00ca68f..f4497d58 100644
--- a/src/pmxcfs-rs/Cargo.toml
+++ b/src/pmxcfs-rs/Cargo.toml
@@ -9,6 +9,7 @@ members = [
     "pmxcfs-status",     # Status monitoring and RRD data management
     "pmxcfs-test-utils", # Test utilities and helpers (dev-only)
     "pmxcfs-services",   # Service framework for automatic retry and lifecycle management
+    "pmxcfs-ipc",        # libqb-compatible IPC server
 ]
 resolver = "2"
 
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
new file mode 100644
index 00000000..dbee2e9a
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/Cargo.toml
@@ -0,0 +1,44 @@
+[package]
+name = "pmxcfs-ipc"
+description = "libqb-compatible IPC server implementation in pure Rust"
+
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[lints]
+workspace = true
+
+# System dependencies:
+# - libqb (runtime) - QB IPC library for client compatibility
+# - libqb-dev (build/test only) - Required to run wire protocol tests
+
+[dependencies]
+# Error handling
+anyhow.workspace = true
+
+# Async runtime
+tokio.workspace = true
+tokio-util.workspace = true
+
+# Concurrency primitives
+parking_lot.workspace = true
+
+# System integration
+libc.workspace = true
+nix.workspace = true
+memmap2 = "0.9"
+
+# Logging
+tracing.workspace = true
+
+# Async trait support
+async-trait.workspace = true
+
+[dev-dependencies]
+pmxcfs-test-utils = { path = "../pmxcfs-test-utils" }
+tempfile.workspace = true
+tokio = { workspace = true, features = ["rt", "macros"] }
+tracing-subscriber.workspace = true
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/README.md b/src/pmxcfs-rs/pmxcfs-ipc/README.md
new file mode 100644
index 00000000..5b5b98ae
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/README.md
@@ -0,0 +1,182 @@
+# pmxcfs-ipc: libqb-Compatible IPC Server
+
+**Rust implementation of libqb IPC server for pmxcfs using shared memory ring buffers**
+
+This crate provides a wire-compatible IPC server that works with libqb clients (C `qb_ipcc_*` API) without depending on the libqb C library.
+
+## Table of Contents
+
+- [Overview](#overview)
+- [Architecture](#architecture)
+- [Protocol Implementation](#protocol-implementation)
+- [Usage](#usage)
+- [Testing](#testing)
+- [References](#references)
+
+---
+
+## Overview
+
+pmxcfs uses libqb for IPC communication between the daemon and client tools (`pvecm`, `pvenode`, etc.). This crate implements a server using QB_IPC_SHM (shared memory ring buffers) that is wire-compatible with libqb clients, enabling the Rust pmxcfs implementation to communicate with existing C-based tools.
+
+**Key Features**:
+- Wire-compatible with libqb clients
+- QB_IPC_SHM transport (shared memory ring buffers)
+- Async I/O via tokio
+- Lock-free SPSC ring buffers
+- Supports authentication via uid/gid
+- Per-connection context (uid, gid, pid, read-only flag)
+- Connection statistics tracking
+- Abstract Unix sockets for setup handshake (Linux-specific)
+
+---
+
+## Architecture
+
+### Transport: QB_IPC_SHM (Shared Memory Ring Buffers)
+
+**Rust pmxcfs uses**: `QB_IPC_SHM` (shared memory ring buffers)
+
+We implemented shared memory transport using lock-free SPSC (single-producer single-consumer) ring buffers. This provides:
+
+- **Wire compatibility**: Same handshake protocol as libqb
+- **Async I/O**: Integration with tokio ecosystem
+
+**Ring Buffer Design**:
+- Each connection has 3 ring buffers:
+  1. **Request ring**: Client writes, server reads
+  2. **Response ring**: Server writes, client reads
+  3. **Event ring**: Server writes, client reads (for async notifications)
+- Ring buffers stored in `/dev/shm` (Linux shared memory)
+- Chunk-based protocol matching libqb
+
+### Server Structure
+
+### Connection Statistics
+
+Tracks statistics for C compatibility (matching `qb_ipcs_stats`).
+
+---
+
+## Protocol Implementation
+
+### Connection Handshake
+
+Server creates an abstract Unix socket `@pve2` (@ prefix indicates abstract namespace) for initial connection setup.
+
+### Request/Response Communication
+
+After handshake, communication happens via shared memory ring buffers using libqb-compatible chunk format.
+
+### Wire Format Structures
+
+All structures use `#[repr(C, align(8))]` to match C's alignment requirements.
+
+Error codes must be negative errno values (e.g., `-EPERM`, `-EINVAL`) to match libqb convention.
+
+---
+
+## Testing
+
+Requires Corosync running for integration tests. See `tests/` directory for C client FFI compatibility tests.
+
+## Implementation Status
+
+### Implemented
+
+- Connection handshake (SOCK_STREAM setup socket)
+- Authentication via SO_PASSCRED (uid/gid/pid)
+- QB_IPC_SHM transport (shared memory ring buffers)
+- Lock-free SPSC ring buffers
+- Async I/O via tokio
+- Abstract Unix sockets for setup handshake
+- Message header parsing (request/response)
+- Error code propagation (negative errno)
+- Ring buffer file management (creation/cleanup)
+- Event channel ring buffers (created, not actively used)
+- Connection statistics tracking
+- Disconnect detection
+- Read-only flag based on gid
+
+### Not Implemented
+
+- Event channel message sending (pmxcfs doesn't use events yet)
+
+## Application-Level IPC Operations
+
+### Operation Summary
+
+The following IPC operations are supported (defined in pmxcfs):
+
+| Operation | Request Data | Response Data | Description |
+|-----------|-------------|---------------|-------------|
+| GET_FS_VERSION | Empty | uint32_t version | Get filesystem version number |
+| GET_CLUSTER_INFO | Empty | JSON string | Get cluster information |
+| GET_GUEST_LIST | Empty | JSON array | Get list of all VMs/containers |
+| SET_STATUS | name + data | Empty | Set status key-value pair |
+| GET_STATUS | name | Binary data | Get status value by name |
+| GET_CONFIG | name | File contents | Read configuration file |
+| LOG_CLUSTER_MSG | priority + msg | Empty | Add cluster log entry |
+| GET_CLUSTER_LOG | max_entries | JSON array | Get cluster log entries |
+| GET_RRD_DUMP | Empty | RRD dump text | Get all RRD data |
+| GET_GUEST_CONFIG_PROPERTY | vmid + key | String value | Get single VM config property |
+| GET_GUEST_CONFIG_PROPERTIES | vmid | JSON object | Get all VM config properties |
+| VERIFY_TOKEN | userid + token | Boolean | Verify API token validity |
+
+### Common Clients
+
+The following Proxmox components use the IPC interface:
+
+- **pvestatd**: Updates node/VM/storage metrics (SET_STATUS, GET_STATUS)
+- **pve-ha-crm**: HA cluster resource manager (GET_CLUSTER_INFO, GET_GUEST_LIST)
+- **pve-ha-lrm**: HA local resource manager (GET_CONFIG, LOG_CLUSTER_MSG)
+- **pvecm**: Cluster management CLI (GET_CLUSTER_INFO, GET_CLUSTER_LOG)
+- **pvedaemon**: PVE API daemon (All query operations)
+
+### Permission Model
+
+**Write Operations** (require root):
+- SET_STATUS
+- LOG_CLUSTER_MSG
+
+**Read Operations** (any authenticated user):
+- All GET_* operations
+- VERIFY_TOKEN
+
+---
+
+## References
+
+### libqb Source
+
+Reference implementation of QB IPC protocol (available at https://github.com/ClusterLabs/libqb):
+
+- `libqb/lib/ringbuffer.c` - Ring buffer implementation
+- `libqb/lib/ipc_shm.c` - Shared memory transport
+- `libqb/lib/ipc_setup.c` - Connection setup/handshake
+- `libqb/include/qb/qbipc_common.h` - Wire protocol structures
+
+### C pmxcfs (pve-cluster)
+
+- `src/pmxcfs/server.c` - C IPC server using libqb
+- `src/pmxcfs/cfs-ipc-ops.h` - pmxcfs IPC operation codes
+
+### Related Documentation
+
+- `../C_COMPATIBILITY.md` - General C compatibility notes (if exists)
+
+---
+
+## Notes
+
+### Ring Buffer Naming Convention
+
+Ring buffer files are created in `/dev/shm` with names based on connection descriptor and ring type (request/response/event).
+
+### Error Handling
+
+Always use **negative errno values** for errors to maintain compatibility with libqb clients.
+
+### Alignment and Padding
+
+All wire format structures must use `#[repr(C, align(8))]` to ensure 8-byte alignment matching C's requirements.
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
new file mode 100644
index 00000000..6b9695ce
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/examples/test_server.rs
@@ -0,0 +1,92 @@
+//! Simple test server for debugging libqb connectivity
+
+use async_trait::async_trait;
+use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server};
+
+/// Example handler implementation
+struct TestHandler;
+
+#[async_trait]
+impl Handler for TestHandler {
+    fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+        // Accept root with read-write access
+        if uid == 0 {
+            eprintln!("Authenticated uid={uid}, gid={gid} as root (read-write)");
+            return Some(Permissions::ReadWrite);
+        }
+
+        // Accept all other users with read-only access for testing
+        eprintln!("Authenticated uid={uid}, gid={gid} as regular user (read-only)");
+        Some(Permissions::ReadOnly)
+    }
+
+    async fn handle(&self, request: Request) -> Response {
+        eprintln!(
+            "Received request: id={}, data_len={}, conn={}, uid={}, gid={}, pid={}, read_only={}",
+            request.msg_id,
+            request.data.len(),
+            request.conn_id,
+            request.uid,
+            request.gid,
+            request.pid,
+            request.is_read_only
+        );
+
+        match request.msg_id {
+            1 => {
+                // CFS_IPC_GET_FS_VERSION
+                let response_str = r#"{"version":1,"protocol":1}"#;
+                eprintln!("Responding with: {response_str}");
+                Response::ok(response_str.as_bytes().to_vec())
+            }
+            2 => {
+                // CFS_IPC_GET_CLUSTER_INFO
+                let response_str = r#"{"nodes":["node1","node2"],"quorate":true}"#;
+                eprintln!("Responding with: {response_str}");
+                Response::ok(response_str.as_bytes().to_vec())
+            }
+            3 => {
+                // CFS_IPC_GET_GUEST_LIST
+                let response_str = r#"{"data":[{"vmid":100}]}"#;
+                eprintln!("Responding with: {response_str}");
+                Response::ok(response_str.as_bytes().to_vec())
+            }
+            _ => {
+                eprintln!("Unknown message id: {}", request.msg_id);
+                Response::err(-libc::EINVAL)
+            }
+        }
+    }
+}
+
+#[tokio::main]
+async fn main() {
+    // Initialize tracing
+    tracing_subscriber::fmt()
+        .with_max_level(tracing::Level::DEBUG)
+        .with_target(true)
+        .init();
+
+    println!("Starting QB IPC test server on 'pve2'...");
+
+    // Create handler and server
+    let handler = TestHandler;
+    let mut server = Server::new("pve2", handler);
+
+    println!("Server created, starting...");
+
+    if let Err(e) = server.start() {
+        eprintln!("Failed to start server: {e}");
+        std::process::exit(1);
+    }
+
+    println!("Server started successfully!");
+    println!("Waiting for connections...");
+
+    // Keep server running
+    tokio::signal::ctrl_c()
+        .await
+        .expect("Failed to wait for Ctrl-C");
+
+    println!("Shutting down...");
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
new file mode 100644
index 00000000..d6d77e6c
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/connection.rs
@@ -0,0 +1,657 @@
+/// Per-connection handling for libqb IPC with shared memory ring buffers
+///
+/// This module contains all connection-specific logic including connection
+/// establishment, authentication, request handling, and shared memory ring buffer management.
+use anyhow::{Context, Result};
+use std::os::unix::io::AsRawFd;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::net::UnixStream;
+use tokio_util::sync::CancellationToken;
+
+use super::handler::{Handler, Permissions};
+use super::protocol::*;
+use super::ringbuffer::{FlowControl, RingBuffer};
+
+/// Per-connection state using shared memory ring buffers
+///
+/// Uses SHM transport (shared memory ring buffers).
+#[allow(dead_code)] // Fields are intentionally stored for lifecycle management
+pub(super) struct QbConnection {
+    /// Connection ID for logging and debugging
+    conn_id: u64,
+
+    /// Client process ID (from SO_PEERCRED)
+    pid: u32,
+
+    /// Client user ID (from SO_PEERCRED)
+    uid: u32,
+
+    /// Client group ID (from SO_PEERCRED)
+    gid: u32,
+
+    /// Whether this connection has read-only access (determined by Handler::authenticate)
+    pub(super) read_only: bool,
+
+    /// Setup socket (kept open for disconnect detection)
+    _setup_stream: UnixStream,
+
+    /// Ring buffers for shared memory IPC
+    /// Request ring: client writes, server reads
+    request_rb: Option<RingBuffer>,
+    /// Response ring: server writes, client reads
+    response_rb: Option<RingBuffer>,
+    /// Event ring: server writes, client reads (for async notifications)
+    /// NOTE: The existing PVE/IPCC.xs Perl client only uses qb_ipcc_sendv_recv()
+    /// and never calls qb_ipcc_event_recv(), so this ring buffer is created
+    /// for libqb compatibility but remains unused in practice.
+    _event_rb: Option<RingBuffer>,
+
+    /// Paths to ring buffer data files (for debugging/cleanup)
+    pub(super) ring_buffer_paths: Vec<PathBuf>,
+
+    /// Task handle for request handler (auto-aborted on drop)
+    pub(super) task_handle: Option<tokio::task::JoinHandle<()>>,
+}
+
+impl QbConnection {
+    /// Accept a new connection from the setup socket
+    ///
+    /// Performs authentication, creates ring buffers, spawns request handler task,
+    /// and returns the connection object.
+    pub(super) async fn accept(
+        mut stream: UnixStream,
+        conn_id: u64,
+        service_name: &str,
+        handler: Arc<dyn Handler>,
+        cancellation_token: CancellationToken,
+    ) -> Result<Self> {
+        // Read connection request
+        let fd = stream.as_raw_fd();
+        let mut req_bytes = vec![0u8; std::mem::size_of::<ConnectionRequest>()];
+        stream
+            .read_exact(&mut req_bytes)
+            .await
+            .context("Failed to read connection request")?;
+
+        tracing::debug!(
+            "Connection request raw bytes ({} bytes): {:02x?}",
+            req_bytes.len(),
+            req_bytes
+        );
+
+        let req =
+            unsafe { std::ptr::read_unaligned(req_bytes.as_ptr() as *const ConnectionRequest) };
+
+        tracing::debug!(
+            "Connection request: id={}, size={}, max_msg_size={}",
+            *req.hdr.id,
+            *req.hdr.size,
+            req.max_msg_size
+        );
+
+        // Get peer credentials (SO_PEERCRED on Linux)
+        let (uid, gid, pid) = get_peer_credentials(fd)?;
+
+        // Authenticate using Handler trait
+        let read_only = match handler.authenticate(uid, gid) {
+            Some(Permissions::ReadWrite) => {
+                tracing::info!(pid, uid, gid, "Connection accepted with read-write access");
+                false
+            }
+            Some(Permissions::ReadOnly) => {
+                tracing::info!(pid, uid, gid, "Connection accepted with read-only access");
+                true
+            }
+            None => {
+                tracing::warn!(
+                    pid,
+                    uid,
+                    gid,
+                    "Connection rejected by authentication policy"
+                );
+                send_connection_response(&mut stream, -libc::EPERM, conn_id, 0, "", "", "").await?;
+                anyhow::bail!("Connection authentication failed");
+            }
+        };
+
+        // Create connection descriptor for ring buffer naming
+        let conn_desc = format!("{}-{}-{}", std::process::id(), pid, conn_id);
+        let max_msg_size = req.max_msg_size.max(8192);
+
+        // Create ring buffers in /dev/shm
+        // Pass max_msg_size directly - RingBuffer::new() will add QB_RB_CHUNK_MARGIN and round up
+        // (just like qb_rb_open() does on the client side)
+        let ring_size = max_msg_size as usize;
+
+        tracing::debug!(
+            "Creating ring buffers for connection {}: size={} bytes",
+            conn_id,
+            ring_size
+        );
+
+        // Request ring: client writes, server reads
+        // Request ring needs sizeof(int32_t) for flow control (shared_user_data)
+        let request_rb_name = format!("{conn_desc}-{service_name}-request");
+        let request_rb = RingBuffer::new(
+            "/dev/shm",
+            &request_rb_name,
+            ring_size,
+            std::mem::size_of::<i32>(),
+        )
+        .context("Failed to create request ring buffer")?;
+
+        // Response ring: server writes, client reads
+        // Response ring doesn't need shared_user_data
+        let response_rb_name = format!("{conn_desc}-{service_name}-response");
+        let response_rb = RingBuffer::new("/dev/shm", &response_rb_name, ring_size, 0)
+            .context("Failed to create response ring buffer")?;
+
+        // Event ring: server writes, client reads (for async notifications)
+        // Event ring doesn't need shared_user_data
+        let event_rb_name = format!("{conn_desc}-{service_name}-event");
+        let event_rb = RingBuffer::new("/dev/shm", &event_rb_name, ring_size, 0)
+            .context("Failed to create event ring buffer")?;
+
+        // Collect full paths for cleanup tracking
+        let request_data_path = PathBuf::from(format!("/dev/shm/qb-{request_rb_name}-data"));
+        let response_data_path = PathBuf::from(format!("/dev/shm/qb-{response_rb_name}-data"));
+        let event_data_path = PathBuf::from(format!("/dev/shm/qb-{event_rb_name}-data"));
+
+        // Send connection response with ring buffer BASE NAMES (not full paths)
+        // libqb client expects base names (e.g., "123-456-1-pve2-request")
+        // It will internally prepend "/dev/shm/qb-" and append "-header" or "-data"
+        send_connection_response(
+            &mut stream,
+            0,
+            conn_id,
+            max_msg_size,
+            &request_rb_name,
+            &response_rb_name,
+            &event_rb_name,
+        )
+        .await?;
+
+        // Spawn request handler task
+        let handler_for_task = handler.clone();
+        let cancellation_for_task = cancellation_token.child_token();
+
+        let task_handle = tokio::spawn(async move {
+            Self::handle_requests(
+                request_rb,
+                response_rb,
+                handler_for_task,
+                cancellation_for_task,
+                conn_id,
+                uid,
+                gid,
+                pid,
+                read_only,
+            )
+            .await;
+        });
+
+        tracing::info!("Connection {} established (SHM transport)", conn_id);
+
+        Ok(Self {
+            conn_id,
+            pid,
+            uid,
+            gid,
+            read_only,
+            _setup_stream: stream,
+            request_rb: None,  // Moved to task
+            response_rb: None, // Moved to task
+            _event_rb: Some(event_rb),
+            ring_buffer_paths: vec![request_data_path, response_data_path, event_data_path],
+            task_handle: Some(task_handle),
+        })
+    }
+
+    /// Request handler loop - receives and processes messages via ring buffers
+    ///
+    /// Runs in a background async task, receiving requests and sending responses
+    /// through shared memory ring buffers.
+    ///
+    /// Uses tokio channels to implement a workqueue with flow control:
+    /// - FlowControl::OK: Proceed with sending
+    /// - FlowControl::SLOW_DOWN: Reduce send rate
+    /// - FlowControl::STOP: Do not send
+    ///
+    /// Architecture: Three concurrent tasks communicating via tokio channels:
+    /// 1. Request receiver: reads from request ring buffer, queues work
+    /// 2. Worker: processes requests from work queue, sends to response queue
+    /// 3. Response sender: writes responses from response queue to response ring buffer
+    #[allow(clippy::too_many_arguments)]
+    async fn handle_requests(
+        mut request_rb: RingBuffer,
+        mut response_rb: RingBuffer,
+        handler: Arc<dyn Handler>,
+        cancellation_token: CancellationToken,
+        conn_id: u64,
+        uid: u32,
+        gid: u32,
+        pid: u32,
+        read_only: bool,
+    ) {
+        tracing::debug!("Request handler started for connection {}", conn_id);
+
+        // Workqueue capacity and flow control thresholds
+        //
+        // NOTE: The C implementation (using libqb) processes requests synchronously
+        // in the event loop callback (server.c:159 s1_msg_process_fn), so there's
+        // no explicit queue. We add async queueing in Rust to allow non-blocking
+        // request handling with tokio.
+        //
+        // Queue capacity of 8 is chosen as a reasonable default for:
+        // - Typical PVE workloads: Most IPC operations are fast (file reads/writes)
+        // - Memory efficiency: Each queued item = ~1KB (request header + data)
+        // - Backpressure: Small queue encourages flow control to activate quickly
+        // - Testing: Flow control test (02-flow-control.sh) verifies 20 concurrent
+        //   operations work correctly with capacity 8
+        //
+        // Flow control thresholds match libqb's rate limiting (ipcs.c:199-203):
+        // - FlowControl::OK (0): Proceed with sending (QB_IPCS_RATE_NORMAL)
+        // - FlowControl::SLOW_DOWN (1): Reduce send rate (QB_IPCS_RATE_OFF)
+        // - FlowControl::STOP (2): Do not send (QB_IPCS_RATE_OFF_2)
+        const MAX_PENDING_REQUESTS: usize = 8;
+
+        // Set SLOW_DOWN when queue reaches 75% capacity (6/8 items)
+        // This provides early warning before the queue fills completely,
+        // allowing clients to throttle before hitting STOP
+        const FC_WARNING_THRESHOLD: usize = 6;
+
+        // Work queue: (header, request) -> worker
+        let (work_tx, mut work_rx) =
+            tokio::sync::mpsc::channel::<(RequestHeader, Request)>(MAX_PENDING_REQUESTS);
+
+        // Response queue: worker -> response sender
+        // Unbounded because responses must not block the worker
+        let (response_tx, mut response_rx) =
+            tokio::sync::mpsc::unbounded_channel::<(RequestHeader, Response)>();
+
+        // Spawn worker task to process requests
+        let worker_handler = handler.clone();
+        let worker_response_tx = response_tx.clone();
+        let worker_task = tokio::spawn(async move {
+            while let Some((header, request)) = work_rx.recv().await {
+                let handler_response = worker_handler.handle(request).await;
+                // Send to response queue (unbounded, never blocks)
+                let _ = worker_response_tx.send((header, handler_response));
+            }
+        });
+
+        // Spawn response sender task
+        let response_task = tokio::spawn(async move {
+            while let Some((header, handler_response)) = response_rx.recv().await {
+                Self::send_response(&mut response_rb, header, handler_response).await;
+            }
+        });
+
+        // Main request receiver loop
+        loop {
+            // Wait for incoming request (async, yields to tokio scheduler)
+            let request_data = tokio::select! {
+                _ = cancellation_token.cancelled() => {
+                    tracing::debug!("Request handler cancelled for connection {}", conn_id);
+                    break;
+                }
+                result = request_rb.recv() => {
+                    match result {
+                        Ok(data) => data,
+                        Err(e) => {
+                            tracing::error!("Error receiving request on conn {}: {}", conn_id, e);
+                            break;
+                        }
+                    }
+                }
+            };
+
+            // After receiving from ring buffer, flow control is already set to 0
+            // by RingBufferShared::read_chunk()
+
+            // Parse request header
+            if request_data.len() < std::mem::size_of::<RequestHeader>() {
+                tracing::warn!(
+                    "Request too small: {} bytes (need {} for header)",
+                    request_data.len(),
+                    std::mem::size_of::<RequestHeader>()
+                );
+                continue;
+            }
+
+            let header =
+                unsafe { std::ptr::read_unaligned(request_data.as_ptr() as *const RequestHeader) };
+
+            tracing::debug!(
+                "Received request on conn {}: id={}, size={}",
+                conn_id,
+                *header.id,
+                *header.size
+            );
+
+            // Extract message data (after header)
+            let header_size = std::mem::size_of::<RequestHeader>();
+            let msg_data = &request_data[header_size..];
+
+            // Build request object with full context
+            let request = Request {
+                msg_id: *header.id,
+                data: msg_data.to_vec(),
+                is_read_only: read_only,
+                conn_id,
+                uid,
+                gid,
+                pid,
+            };
+
+            // Send to workqueue - implements backpressure via flow control
+            match work_tx.try_send((header, request)) {
+                Ok(()) => {
+                    // Request queued successfully
+
+                    // Update flow control based on queue depth
+                    // This matches libqb's rate limiting behavior
+                    let queue_len = MAX_PENDING_REQUESTS - work_tx.capacity();
+                    let fc_value = if queue_len >= MAX_PENDING_REQUESTS {
+                        FlowControl::STOP // Queue full - stop sending
+                    } else if queue_len >= FC_WARNING_THRESHOLD {
+                        FlowControl::SLOW_DOWN // Queue approaching full - slow down
+                    } else {
+                        FlowControl::OK // Queue has space - OK to send
+                    };
+
+                    if fc_value > FlowControl::OK {
+                        tracing::debug!(
+                            "Setting flow control to {} (queue: {}/{})",
+                            fc_value,
+                            queue_len,
+                            MAX_PENDING_REQUESTS
+                        );
+                    }
+                    request_rb.flow_control.set(fc_value);
+                }
+                Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
+                    // Queue is full - set flow control to STOP and send EAGAIN
+                    tracing::warn!("Work queue full on conn {}, sending EAGAIN", conn_id);
+                    request_rb.flow_control.set(FlowControl::STOP);
+
+                    let error_response = Response {
+                        error_code: -libc::EAGAIN,
+                        data: Vec::new(),
+                    };
+                    // Send error response directly (bypassing queue)
+                    let _ = response_tx.send((header, error_response));
+                }
+                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
+                    tracing::error!("Work queue closed on conn {}", conn_id);
+                    break;
+                }
+            }
+        }
+
+        // Cleanup: drop channels to signal tasks to exit
+        drop(work_tx);
+        drop(response_tx);
+        let _ = worker_task.await;
+        let _ = response_task.await;
+
+        tracing::debug!("Request handler finished for connection {}", conn_id);
+    }
+
+    /// Send a response to the client
+    async fn send_response(
+        response_rb: &mut RingBuffer,
+        header: RequestHeader,
+        handler_response: Response,
+    ) {
+        // Build and serialize response: [header][data]
+        let response_size = std::mem::size_of::<ResponseHeader>() + handler_response.data.len();
+        let mut response_bytes = Vec::with_capacity(response_size);
+
+        let response_header = ResponseHeader {
+            id: header.id,
+            size: (response_size as i32).into(),
+            error: handler_response.error_code.into(),
+        };
+
+        response_bytes.extend_from_slice(unsafe {
+            std::slice::from_raw_parts(
+                &response_header as *const _ as *const u8,
+                std::mem::size_of::<ResponseHeader>(),
+            )
+        });
+        response_bytes.extend_from_slice(&handler_response.data);
+
+        tracing::debug!("Response header bytes (24): {:02x?}", &response_bytes[..24]);
+
+        // Send response (async, yields if buffer full)
+        match response_rb.send(&response_bytes).await {
+            Ok(()) => {
+                // Response sent successfully
+            }
+            Err(e) => {
+                tracing::error!("Failed to send response: {}", e);
+            }
+        }
+    }
+}
+
+/// Get peer credentials from Unix socket
+fn get_peer_credentials(fd: i32) -> Result<(u32, u32, u32)> {
+    #[cfg(target_os = "linux")]
+    {
+        let mut ucred: libc::ucred = unsafe { std::mem::zeroed() };
+        let mut ucred_size = std::mem::size_of::<libc::ucred>() as libc::socklen_t;
+
+        let res = unsafe {
+            libc::getsockopt(
+                fd,
+                libc::SOL_SOCKET,
+                libc::SO_PEERCRED,
+                &mut ucred as *mut _ as *mut libc::c_void,
+                &mut ucred_size,
+            )
+        };
+
+        if res != 0 {
+            anyhow::bail!(
+                "getsockopt SO_PEERCRED failed: {}",
+                std::io::Error::last_os_error()
+            );
+        }
+
+        Ok((ucred.uid, ucred.gid, ucred.pid as u32))
+    }
+
+    #[cfg(not(target_os = "linux"))]
+    {
+        anyhow::bail!("Peer credentials not supported on this platform");
+    }
+}
+
+/// Send connection response to client
+async fn send_connection_response(
+    stream: &mut UnixStream,
+    error: i32,
+    conn_id: u64,
+    max_msg_size: u32,
+    request_path: &str,
+    response_path: &str,
+    event_path: &str,
+) -> Result<()> {
+    let mut response = ConnectionResponse {
+        hdr: ResponseHeader {
+            id: MSG_AUTHENTICATE.into(),
+            size: (std::mem::size_of::<ConnectionResponse>() as i32).into(),
+            error: error.into(),
+        },
+        connection_type: CONNECTION_TYPE_SHM, // Shared memory transport
+        max_msg_size,
+        connection: conn_id as usize,
+        request: [0u8; PATH_MAX],
+        response: [0u8; PATH_MAX],
+        event: [0u8; PATH_MAX],
+    };
+
+    // Helper to copy path strings into fixed-size buffers
+    let copy_path = |dest: &mut [u8; PATH_MAX], src: &str| {
+        if !src.is_empty() {
+            let len = src.len().min(PATH_MAX - 1);
+            dest[..len].copy_from_slice(&src.as_bytes()[..len]);
+            tracing::debug!("Connection response path: '{}'", src);
+        }
+    };
+
+    copy_path(&mut response.request, request_path);
+    copy_path(&mut response.response, response_path);
+    copy_path(&mut response.event, event_path);
+
+    // Serialize and send
+    let response_bytes = unsafe {
+        std::slice::from_raw_parts(
+            &response as *const _ as *const u8,
+            std::mem::size_of::<ConnectionResponse>(),
+        )
+    };
+
+    stream
+        .write_all(response_bytes)
+        .await
+        .context("Failed to send connection response")?;
+
+    tracing::debug!(
+        "Sent connection response: error={}, connection_type=SHM",
+        error
+    );
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_malformed_request_size_validation() {
+        // This test verifies the size validation logic for malformed requests
+        // The actual validation happens in handle_requests() at line 247-254
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+        assert_eq!(header_size, 16, "RequestHeader should be 16 bytes");
+
+        // Test case 1: Request too small (would be rejected)
+        let too_small_data = [0x01, 0x02, 0x03]; // Only 3 bytes
+        assert!(
+            too_small_data.len() < header_size,
+            "Malformed request with {} bytes should be less than header size {}",
+            too_small_data.len(),
+            header_size
+        );
+
+        // Test case 2: More realistic too-small cases
+        let test_cases = vec![
+            (vec![0u8; 0], 0),   // Empty request
+            (vec![0u8; 1], 1),   // 1 byte
+            (vec![0u8; 8], 8),   // 8 bytes (half header)
+            (vec![0u8; 15], 15), // 15 bytes (just short of header)
+        ];
+
+        for (data, expected_len) in test_cases {
+            assert_eq!(data.len(), expected_len);
+            assert!(
+                data.len() < header_size,
+                "Request with {} bytes should be rejected (need {})",
+                data.len(),
+                header_size
+            );
+        }
+
+        // Test case 3: Valid size requests (would pass size check)
+        let valid_cases = vec![
+            vec![0u8; 16],   // Exact header size
+            vec![0u8; 32],   // Header + data
+            vec![0u8; 1024], // Large request
+        ];
+
+        for data in valid_cases {
+            assert!(
+                data.len() >= header_size,
+                "Request with {} bytes should pass size check",
+                data.len()
+            );
+        }
+    }
+
+    #[test]
+    fn test_malformed_header_structure() {
+        // This test verifies that the header structure is correctly defined
+        // and that we can safely parse various header patterns
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+
+        // Create a valid-sized buffer with various patterns
+        let patterns = vec![
+            vec![0x00; header_size], // All zeros
+            vec![0xFF; header_size], // All ones
+            vec![0xAA; header_size], // Alternating pattern
+        ];
+
+        for pattern in patterns {
+            assert_eq!(pattern.len(), header_size);
+
+            // Parse header (same unsafe code as in handle_requests:256-258)
+            let header =
+                unsafe { std::ptr::read_unaligned(pattern.as_ptr() as *const RequestHeader) };
+
+            // The parsing should not crash, regardless of values
+            // The actual values don't matter for this safety test
+            let _id = *header.id;
+            let _size = *header.size;
+        }
+    }
+
+    #[test]
+    fn test_request_header_alignment() {
+        // Verify that RequestHeader can be read with read_unaligned
+        // This is important because data from ring buffers may not be aligned
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+
+        // Create misaligned buffer (offset by 1 byte to test unaligned access)
+        let mut buffer = vec![0u8; header_size + 1];
+        buffer[1..].fill(0x42);
+
+        // Read from misaligned offset (this is what read_unaligned is for)
+        let header =
+            unsafe { std::ptr::read_unaligned(&buffer[1] as *const u8 as *const RequestHeader) };
+
+        // Should successfully read without crashing
+        let _id = *header.id;
+        let _size = *header.size;
+    }
+
+    #[test]
+    fn test_connection_request_structure() {
+        // Verify ConnectionRequest structure for connection setup
+
+        let conn_req_size = std::mem::size_of::<ConnectionRequest>();
+
+        // ConnectionRequest should be properly sized
+        assert!(
+            conn_req_size > std::mem::size_of::<RequestHeader>(),
+            "ConnectionRequest should include header plus additional fields"
+        );
+
+        // Test that we can parse a zero-filled connection request
+        let data = vec![0u8; conn_req_size];
+        let conn_req =
+            unsafe { std::ptr::read_unaligned(data.as_ptr() as *const ConnectionRequest) };
+
+        // Should not crash when accessing fields
+        let _id = *conn_req.hdr.id;
+        let _size = *conn_req.hdr.size;
+        let _max_msg_size = conn_req.max_msg_size;
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
new file mode 100644
index 00000000..12b40cd4
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/handler.rs
@@ -0,0 +1,93 @@
+//! Handler trait for processing IPC requests
+//!
+//! This module defines the core `Handler` trait that users implement to process
+//! IPC requests. The trait-based approach provides a more idiomatic and extensible
+//! API compared to raw function closures.
+
+use crate::protocol::{Request, Response};
+use async_trait::async_trait;
+
+/// Permissions for IPC connections
+///
+/// Determines the access level for authenticated connections.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Permissions {
+    /// Read-only access
+    ReadOnly,
+    /// Read-write access
+    ReadWrite,
+}
+
+/// Handler trait for processing IPC requests and authentication
+///
+/// Implement this trait to define custom request handling logic and authentication
+/// policy for your IPC server. The handler receives a `Request` containing the
+/// message ID, payload data, and connection context, and returns a `Response` with
+/// an error code and response data.
+///
+/// ## Authentication
+///
+/// The `authenticate` method is called during connection setup to determine whether
+/// a client with given credentials should be accepted. This allows the handler to
+/// implement application-specific authentication policies.
+///
+/// ## Async Support
+///
+/// The `handle` method is async, allowing you to perform I/O operations, database
+/// queries, or other async work within your handler.
+///
+/// ## Thread Safety
+///
+/// Handlers must be `Send + Sync` as they may be called from multiple tokio tasks
+/// concurrently. Use `Arc<Mutex<T>>` or other synchronization primitives if you need
+/// mutable shared state.
+///
+/// ## Error Handling
+///
+/// Return negative errno values in `Response::error_code` to indicate errors.
+/// Use 0 for success. See `libc::*` constants for standard errno values.
+#[async_trait]
+pub trait Handler: Send + Sync {
+    /// Authenticate a connecting client and determine access level
+    ///
+    /// Called during connection setup to determine whether to accept the connection
+    /// and what access level to grant.
+    ///
+    /// # Arguments
+    ///
+    /// * `uid` - Client user ID (from SO_PEERCRED)
+    /// * `gid` - Client group ID (from SO_PEERCRED)
+    ///
+    /// # Returns
+    ///
+    /// - `Some(Permissions::ReadWrite)` to accept with read-write access
+    /// - `Some(Permissions::ReadOnly)` to accept with read-only access
+    /// - `None` to reject the connection
+    fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions>;
+
+    /// Handle an IPC request
+    ///
+    /// # Arguments
+    ///
+    /// * `request` - The incoming request with message ID, data, and connection context
+    ///
+    /// # Returns
+    ///
+    /// A `Response` containing the error code (0 = success, negative = errno) and
+    /// optional response data to send back to the client.
+    async fn handle(&self, request: Request) -> Response;
+}
+
+/// Blanket implementation for Arc<T> where T: Handler
+///
+/// This allows passing `Arc<MyHandler>` directly to `Server::new()`.
+#[async_trait]
+impl<T: Handler> Handler for std::sync::Arc<T> {
+    fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+        (**self).authenticate(uid, gid)
+    }
+
+    async fn handle(&self, request: Request) -> Response {
+        (**self).handle(request).await
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
new file mode 100644
index 00000000..923c359e
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/lib.rs
@@ -0,0 +1,37 @@
+/// libqb-compatible IPC server implementation in pure Rust
+///
+/// This crate implements a minimal libqb IPC server that is wire-compatible
+/// with libqb clients (qb_ipcc_*), without depending on the libqb C library.
+///
+/// ## Protocol Overview
+///
+/// 1. **Connection Handshake** (SOCK_STREAM):
+///    - Server listens on `/var/run/{service_name}`
+///    - Client connects and sends `qb_ipc_connection_request`
+///    - Server authenticates (uid/gid), creates per-connection datagram sockets
+///    - Server sends `qb_ipc_connection_response` with socket paths
+///
+/// 2. **Request/Response** (SOCK_DGRAM):
+///    - Client sends requests on datagram socket
+///    - Server receives, processes, and sends responses
+///
+/// ## Module Structure
+///
+/// - `protocol` - Wire protocol structures and constants
+/// - `socket` - Abstract Unix socket utilities
+/// - `connection` - Per-connection handling and request processing
+/// - `server` - Main IPC server and connection acceptance
+///
+/// References:
+/// - libqb source: ~/dev/libqb/lib/ipc_socket.c, ipc_setup.c
+mod connection;
+mod handler;
+mod protocol;
+mod ringbuffer;
+mod server;
+mod socket;
+
+// Public API
+pub use handler::{Handler, Permissions};
+pub use protocol::{Request, Response};
+pub use server::Server;
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
new file mode 100644
index 00000000..469099f2
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/protocol.rs
@@ -0,0 +1,332 @@
+//! libqb wire protocol structures and constants
+//!
+//! This module contains the low-level protocol definitions for libqb IPC communication.
+//! All structures must match the C counterparts exactly for binary compatibility.
+
+/// Message ID for authentication requests (matches libqb's QB_IPC_MSG_AUTHENTICATE)
+pub(super) const MSG_AUTHENTICATE: i32 = 1;
+
+/// Connection type for shared memory transport (matches libqb's QB_IPC_SHM)
+pub(super) const CONNECTION_TYPE_SHM: u32 = 1;
+
+/// Maximum path length - used in connection response
+pub(super) const PATH_MAX: usize = 4096;
+
+/// Wrapper for i32 that aligns to 8-byte boundary with explicit padding
+///
+/// Simulates C's `__attribute__ ((aligned(8)))` on individual i32 fields.
+/// This is used to match libqb's per-field alignment behavior.
+///
+/// Memory layout:
+/// - Bytes 0-3: i32 value
+/// - Bytes 4-7: zero padding
+/// - Total: 8 bytes
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub struct Align8 {
+    pub value: i32,
+    _pad: u32, // 4 bytes padding for i32 -> 8 bytes total
+}
+
+impl Align8 {
+    #[inline]
+    pub const fn new(value: i32) -> Self {
+        Align8 { value, _pad: 0 }
+    }
+}
+
+impl std::ops::Deref for Align8 {
+    type Target = i32;
+
+    #[inline]
+    fn deref(&self) -> &i32 {
+        &self.value
+    }
+}
+
+impl std::ops::DerefMut for Align8 {
+    #[inline]
+    fn deref_mut(&mut self) -> &mut i32 {
+        &mut self.value
+    }
+}
+
+impl From<i32> for Align8 {
+    #[inline]
+    fn from(value: i32) -> Self {
+        Align8::new(value)
+    }
+}
+
+impl Default for Align8 {
+    #[inline]
+    fn default() -> Self {
+        Align8::new(0)
+    }
+}
+
+/// Request header (matches libqb's qb_ipc_request_header)
+///
+/// Each field is 8-byte aligned to match C's __attribute__ ((aligned(8)))
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+pub struct RequestHeader {
+    pub id: Align8,
+    pub size: Align8,
+}
+
+/// Response header (matches libqb's qb_ipc_response_header)
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+pub struct ResponseHeader {
+    pub id: Align8,
+    pub size: Align8,
+    pub error: Align8,
+}
+
+/// Connection request sent by client during handshake (matches libqb's qb_ipc_connection_request)
+#[repr(C)]
+#[derive(Debug, Copy, Clone)]
+pub(super) struct ConnectionRequest {
+    pub hdr: RequestHeader,
+    pub max_msg_size: u32,
+}
+
+/// Connection response sent by server during handshake (matches libqb's qb_ipc_connection_response)
+#[repr(C, align(8))]
+#[derive(Debug)]
+pub(super) struct ConnectionResponse {
+    pub hdr: ResponseHeader,
+    pub connection_type: u32,
+    pub max_msg_size: u32,
+    pub connection: usize,
+    pub request: [u8; PATH_MAX],
+    pub response: [u8; PATH_MAX],
+    pub event: [u8; PATH_MAX],
+}
+
+/// Request passed to handlers
+///
+/// Contains all information about an IPC request including the message ID,
+/// payload data, and connection context (uid, gid, pid, permissions).
+#[derive(Debug, Clone)]
+pub struct Request {
+    /// Message ID identifying the operation (application-defined)
+    pub msg_id: i32,
+
+    /// Request payload data
+    pub data: Vec<u8>,
+
+    /// Whether this connection has read-only access
+    pub is_read_only: bool,
+
+    /// Connection ID (for logging/debugging)
+    pub conn_id: u64,
+
+    /// Client user ID (from SO_PEERCRED)
+    pub uid: u32,
+
+    /// Client group ID (from SO_PEERCRED)
+    pub gid: u32,
+
+    /// Client process ID (from SO_PEERCRED)
+    pub pid: u32,
+}
+
+/// Response from handlers
+///
+/// Contains the error code and response data to send back to the client.
+#[derive(Debug, Clone)]
+pub struct Response {
+    /// Error code (0 = success, negative = errno)
+    pub error_code: i32,
+
+    /// Response payload data
+    pub data: Vec<u8>,
+}
+
+impl Response {
+    /// Create a successful response with data
+    pub fn ok(data: Vec<u8>) -> Self {
+        Self {
+            error_code: 0,
+            data,
+        }
+    }
+
+    /// Create an error response with errno
+    pub fn err(error_code: i32) -> Self {
+        Self {
+            error_code,
+            data: Vec::new(),
+        }
+    }
+
+    /// Create an error response with errno and optional data
+    pub fn with_error(error_code: i32, data: Vec<u8>) -> Self {
+        Self { error_code, data }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_header_sizes() {
+        assert_eq!(std::mem::size_of::<RequestHeader>(), 16);
+        assert_eq!(std::mem::align_of::<RequestHeader>(), 8);
+        assert_eq!(std::mem::size_of::<ResponseHeader>(), 24);
+        assert_eq!(std::mem::align_of::<ResponseHeader>(), 8);
+        assert_eq!(std::mem::size_of::<ConnectionRequest>(), 24); // 16 (header) + 4 (max_msg_size) + 4 (padding)
+
+        println!(
+            "ConnectionResponse size: {}",
+            std::mem::size_of::<ConnectionResponse>()
+        );
+        println!(
+            "ConnectionResponse align: {}",
+            std::mem::align_of::<ConnectionResponse>()
+        );
+        println!("PATH_MAX: {PATH_MAX}");
+
+        // C expects: 24 (header) + 4 (connection_type) + 4 (max_msg_size) + 8 (connection pointer) + 3*4096 (paths) = 12328
+        assert_eq!(std::mem::size_of::<ConnectionResponse>(), 12328);
+    }
+
+    // ===== Align8 Tests =====
+
+    #[test]
+    fn test_align8_size_and_alignment() {
+        // Verify Align8 is exactly 8 bytes
+        assert_eq!(std::mem::size_of::<Align8>(), 8);
+        assert_eq!(std::mem::align_of::<Align8>(), 8);
+    }
+
+    #[test]
+    fn test_align8_creation_and_value_access() {
+        let a = Align8::new(42);
+        assert_eq!(a.value, 42);
+        assert_eq!(*a, 42); // Test Deref
+    }
+
+    #[test]
+    fn test_align8_from_i32() {
+        let a: Align8 = (-100).into();
+        assert_eq!(a.value, -100);
+    }
+
+    #[test]
+    fn test_align8_default() {
+        let a = Align8::default();
+        assert_eq!(a.value, 0);
+    }
+
+    #[test]
+    fn test_align8_deref_mut() {
+        let mut a = Align8::new(10);
+        *a = 20; // Test DerefMut
+        assert_eq!(a.value, 20);
+    }
+
+    #[test]
+    fn test_align8_padding_is_zero() {
+        let a = Align8::new(123);
+        // Padding should always be 0
+        assert_eq!(a._pad, 0);
+    }
+
+    // ===== Response Tests =====
+
+    #[test]
+    fn test_response_ok_creation() {
+        let data = b"test data".to_vec();
+        let resp = Response::ok(data.clone());
+
+        assert_eq!(resp.error_code, 0);
+        assert_eq!(resp.data, data);
+    }
+
+    #[test]
+    fn test_response_err_creation() {
+        let resp = Response::err(-5); // ERRNO like EIO
+
+        assert_eq!(resp.error_code, -5);
+        assert!(resp.data.is_empty());
+    }
+
+    #[test]
+    fn test_response_with_error_and_data() {
+        let data = b"error details".to_vec();
+        let resp = Response::with_error(-22, data.clone()); // EINVAL
+
+        assert_eq!(resp.error_code, -22);
+        assert_eq!(resp.data, data);
+    }
+
+    #[test]
+    fn test_response_error_codes() {
+        // Test various errno values
+        let test_cases = vec![
+            (0, "success"),
+            (-1, "EPERM"),
+            (-2, "ENOENT"),
+            (-13, "EACCES"),
+            (-22, "EINVAL"),
+        ];
+
+        for (code, _name) in test_cases {
+            let resp = Response::err(code);
+            assert_eq!(resp.error_code, code);
+        }
+    }
+
+    // ===== Request Tests =====
+
+    #[test]
+    fn test_request_creation() {
+        let req = Request {
+            msg_id: 100,
+            data: b"payload".to_vec(),
+            is_read_only: false,
+            conn_id: 12345,
+            uid: 0,
+            gid: 0,
+            pid: 999,
+        };
+
+        assert_eq!(req.msg_id, 100);
+        assert_eq!(req.data, b"payload");
+        assert!(!req.is_read_only);
+        assert_eq!(req.conn_id, 12345);
+        assert_eq!(req.uid, 0);
+        assert_eq!(req.gid, 0);
+        assert_eq!(req.pid, 999);
+    }
+
+    #[test]
+    fn test_request_read_only_flag() {
+        let req_ro = Request {
+            msg_id: 1,
+            data: vec![],
+            is_read_only: true,
+            conn_id: 1,
+            uid: 33,
+            gid: 33,
+            pid: 1000,
+        };
+
+        let req_rw = Request {
+            msg_id: 1,
+            data: vec![],
+            is_read_only: false,
+            conn_id: 2,
+            uid: 0,
+            gid: 0,
+            pid: 1001,
+        };
+
+        assert!(req_ro.is_read_only);
+        assert!(!req_rw.is_read_only);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
new file mode 100644
index 00000000..96dd192b
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/ringbuffer.rs
@@ -0,0 +1,1158 @@
+/// Lock-free ring buffer implementation compatible with libqb's shared memory IPC
+///
+/// This module implements a SPSC (single-producer single-consumer) ring buffer
+/// using shared memory, matching libqb's wire protocol and memory layout.
+///
+/// ## Design
+///
+/// - **Shared Memory**: Two mmap'd files (header + data) in /dev/shm
+/// - **Lock-Free**: Uses atomic operations for read_pt/write_pt synchronization
+/// - **Chunk-Based**: Messages stored as [size][magic][data] chunks
+/// - **Wire-Compatible**: Matches libqb's qb_ringbuffer_shared_s layout
+use anyhow::{Context, Result};
+use memmap2::MmapMut;
+use std::fs::OpenOptions;
+use std::os::fd::AsRawFd;
+use std::path::Path;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
+use tokio::sync::Notify;
+
+/// Circular mmap wrapper for ring buffer data
+///
+/// This struct manages a circular memory mapping where the same file is mapped
+/// twice in consecutive virtual addresses. This allows ring buffer operations
+/// to wrap around naturally without modulo arithmetic.
+///
+/// Matches libqb's qb_sys_circular_mmap() behavior.
+struct CircularMmap {
+    /// Starting address of the 2x circular mapping
+    addr: *mut libc::c_void,
+    /// Size of the file (virtual mapping is 2x this size)
+    size: usize,
+}
+
+impl CircularMmap {
+    /// Create a circular mmap from a file descriptor
+    ///
+    /// Maps the file TWICE in consecutive virtual addresses, allowing ring buffer
+    /// wraparound without modulo arithmetic. Matches libqb's qb_sys_circular_mmap().
+    ///
+    /// # Arguments
+    /// - `fd`: File descriptor of the data file (must be sized to `size` bytes)
+    /// - `size`: Size of the file in bytes (virtual mapping will be 2x this)
+    ///
+    /// # Safety
+    /// The file must be properly sized before calling this function.
+    unsafe fn new(fd: i32, size: usize) -> Result<Self> {
+        // SAFETY: All operations in this function are inherently unsafe as they
+        // manipulate raw memory mappings. The caller must ensure the fd is valid
+        // and the file is properly sized.
+        unsafe {
+            // Step 1: Reserve 2x space with anonymous mmap
+            let addr_orig = libc::mmap(
+                std::ptr::null_mut(),
+                size * 2,
+                libc::PROT_NONE,
+                libc::MAP_ANONYMOUS | libc::MAP_PRIVATE,
+                -1,
+                0,
+            );
+
+            if addr_orig == libc::MAP_FAILED {
+                anyhow::bail!(
+                    "Failed to reserve circular mmap space: {}",
+                    std::io::Error::last_os_error()
+                );
+            }
+
+            // Step 2: Map the file at the start of reserved space
+            let addr1 = libc::mmap(
+                addr_orig,
+                size,
+                libc::PROT_READ | libc::PROT_WRITE,
+                libc::MAP_FIXED | libc::MAP_SHARED,
+                fd,
+                0,
+            );
+
+            if addr1 != addr_orig {
+                libc::munmap(addr_orig, size * 2);
+                anyhow::bail!(
+                    "Failed to map first half of circular buffer: {}",
+                    std::io::Error::last_os_error()
+                );
+            }
+
+            // Step 3: Map the SAME file again right after
+            let addr_next = (addr_orig as *mut u8).add(size) as *mut libc::c_void;
+            let addr2 = libc::mmap(
+                addr_next,
+                size,
+                libc::PROT_READ | libc::PROT_WRITE,
+                libc::MAP_FIXED | libc::MAP_SHARED,
+                fd,
+                0,
+            );
+
+            if addr2 != addr_next {
+                libc::munmap(addr_orig, size * 2);
+                anyhow::bail!(
+                    "Failed to map second half of circular buffer: {}",
+                    std::io::Error::last_os_error()
+                );
+            }
+
+            tracing::debug!(
+                "Created circular mmap: {:p}, {} bytes (2x {} bytes file)",
+                addr_orig,
+                size * 2,
+                size
+            );
+
+            Ok(Self {
+                addr: addr_orig,
+                size,
+            })
+        }
+    }
+
+    /// Get the base address as a mutable pointer to u32
+    ///
+    /// This is the most common use case for ring buffers which work with u32 words.
+    fn as_mut_ptr(&self) -> *mut u32 {
+        self.addr as *mut u32
+    }
+
+    /// Zero-initialize the circular mapping
+    ///
+    /// Only needs to write to the first half due to the circular nature.
+    ///
+    /// # Safety
+    /// The circular mmap must be properly initialized and the address valid.
+    unsafe fn zero_initialize(&mut self) {
+        // SAFETY: Caller ensures the circular mmap is valid and mapped
+        unsafe {
+            std::ptr::write_bytes(self.addr as *mut u8, 0, self.size);
+        }
+    }
+}
+
+impl Drop for CircularMmap {
+    fn drop(&mut self) {
+        // Munmap the 2x circular mapping
+        // Matches libqb's cleanup in qb_rb_close_helper
+        unsafe {
+            libc::munmap(self.addr, self.size * 2);
+        }
+        tracing::debug!(
+            "Unmapped circular buffer: {:p}, {} bytes (2x {} bytes file)",
+            self.addr,
+            self.size * 2,
+            self.size
+        );
+    }
+}
+
+/// Process-shared POSIX semaphore wrapper
+///
+/// This wraps the native Linux sem_t (32 bytes on x86_64) for inter-process
+/// synchronization in the ring buffer.
+///
+/// **libqb compatibility note**: This corresponds to libqb's `rpl_sem_t` type.
+/// On Linux with HAVE_SEM_TIMEDWAIT defined, rpl_sem_t is just an alias for
+/// the native sem_t. The "rpl" prefix stands for "replacement" - libqb provides
+/// a fallback implementation using mutexes/condvars on systems without proper
+/// POSIX semaphore support (like BSD). Since we only target Linux, we use the
+/// native sem_t directly.
+#[repr(C)]
+struct PosixSem {
+    /// Raw sem_t storage (32 bytes on Linux x86_64)
+    _sem: [u8; 32],
+}
+
+impl PosixSem {
+    /// Initialize a POSIX semaphore in-place in shared memory
+    ///
+    /// This initializes the semaphore at its current memory location, which is
+    /// critical for process-shared semaphores in mmap'd memory. The semaphore
+    /// must not be moved after initialization.
+    ///
+    /// The semaphore is always initialized as:
+    /// - **Process-shared** (pshared=1): Shared between processes via mmap
+    /// - **Initial value 0**: No data available initially
+    ///
+    /// Matches libqb's semaphore initialization in `qb_rb_create_from_file`.
+    ///
+    /// # Safety
+    /// The semaphore must remain at its current memory location and must not
+    /// be moved or copied after initialization.
+    unsafe fn init_in_place(&mut self) -> Result<()> {
+        let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t;
+
+        // pshared=1: Process-shared semaphore (for cross-process IPC)
+        // initial_value=0: No data available initially (producers will post)
+        const PSHARED: libc::c_int = 1;
+        const INITIAL_VALUE: libc::c_uint = 0;
+
+        // SAFETY: Caller ensures the semaphore memory is valid and will remain
+        // at this location for its lifetime
+        let ret = unsafe { libc::sem_init(sem_ptr, PSHARED, INITIAL_VALUE) };
+
+        if ret != 0 {
+            anyhow::bail!("sem_init failed: {}", std::io::Error::last_os_error());
+        }
+
+        Ok(())
+    }
+
+    /// Destroy the semaphore
+    ///
+    /// This should be called when the semaphore is no longer needed.
+    /// Matches libqb's rpl_sem_destroy (which is sem_destroy on Linux).
+    ///
+    /// # Safety
+    /// The semaphore must have been properly initialized and no threads should
+    /// be waiting on it.
+    unsafe fn destroy(&mut self) -> Result<()> {
+        let sem_ptr = self._sem.as_mut_ptr() as *mut libc::sem_t;
+
+        // SAFETY: Caller ensures the semaphore is initialized and not in use
+        let ret = unsafe { libc::sem_destroy(sem_ptr) };
+
+        if ret != 0 {
+            anyhow::bail!("sem_destroy failed: {}", std::io::Error::last_os_error());
+        }
+
+        Ok(())
+    }
+
+    /// Post to the semaphore (increment)
+    ///
+    /// Matches libqb's rpl_sem_post (which is sem_post on Linux).
+    unsafe fn post(&self) -> Result<()> {
+        let ret = unsafe { libc::sem_post(self._sem.as_ptr() as *mut libc::sem_t) };
+
+        if ret != 0 {
+            anyhow::bail!("sem_post failed: {}", std::io::Error::last_os_error());
+        }
+
+        Ok(())
+    }
+
+    /// Wait on the semaphore asynchronously (decrement, blocking)
+    ///
+    /// Uses `spawn_blocking` to wait on the semaphore without blocking the tokio
+    /// runtime. This provides true event-driven behavior while maintaining
+    /// compatibility with libqb's semaphore-based notification mechanism.
+    ///
+    /// Matches libqb's `my_posix_sem_timedwait` / `sem_wait` behavior.
+    ///
+    /// # Safety
+    /// The semaphore must be properly initialized and remain valid for the
+    /// duration of the wait operation.
+    async unsafe fn wait(&self) -> Result<()> {
+        // Get raw pointer to semaphore
+        let sem_ptr = self._sem.as_ptr() as *mut libc::sem_t;
+
+        // Convert to usize for safe transfer between threads
+        // This is safe because:
+        // 1. The semaphore is in process-shared memory (mmap'd file)
+        // 2. The memory remains valid for the lifetime of the containing structure
+        // 3. We're only using the pointer on the blocking thread pool
+        let sem_ptr_addr = sem_ptr as usize;
+
+        // Use spawn_blocking to wait on the semaphore without blocking tokio runtime
+        // This offloads the blocking sem_wait to tokio's dedicated blocking thread pool
+        tokio::task::spawn_blocking(move || {
+            // Reconstruct the pointer on the blocking thread
+            // SAFETY: The semaphore is in shared memory and remains valid.
+            // We're calling sem_wait on a process-shared semaphore from a thread
+            // in the same process, which is safe.
+            let sem_ptr = sem_ptr_addr as *mut libc::sem_t;
+            let ret = unsafe { libc::sem_wait(sem_ptr) };
+
+            if ret != 0 {
+                let err = std::io::Error::last_os_error();
+                // Handle EINTR by returning an error that causes retry
+                if err.raw_os_error() == Some(libc::EINTR) {
+                    anyhow::bail!("sem_wait interrupted (EINTR), will retry");
+                }
+                anyhow::bail!("sem_wait failed: {err}");
+            }
+
+            Ok(())
+        })
+        .await
+        .context("spawn_blocking task failed")??;
+
+        Ok(())
+    }
+}
+
+/// Shared memory header matching libqb's qb_ringbuffer_shared_s layout
+///
+/// This structure is mmap'd and shared between processes.
+/// Field order and alignment must exactly match libqb for compatibility.
+///
+/// Note: libqb's struct has `char user_data[1]` which contributes 1 byte to sizeof(),
+/// then the struct is padded to 8-byte alignment (7 bytes padding).
+/// Additional shared_user_data_size bytes are allocated beyond sizeof().
+#[repr(C, align(8))]
+struct RingBufferShared {
+    /// Write pointer (word index, not byte offset)
+    write_pt: AtomicU32,
+    /// Read pointer (word index, not byte offset)
+    read_pt: AtomicU32,
+    /// Ring buffer size in words (u32 units)
+    word_size: u32,
+    /// Path to header file
+    hdr_path: [u8; libc::PATH_MAX as usize],
+    /// Path to data file
+    data_path: [u8; libc::PATH_MAX as usize],
+    /// Reference count (for cleanup)
+    ref_count: AtomicU32,
+    /// Process-shared semaphore for notification
+    posix_sem: PosixSem,
+    /// Flexible array member placeholder (matches C's char user_data[1])
+    /// Actual user_data starts here and continues beyond sizeof(RingBufferShared)
+    user_data: [u8; 1],
+    // 7 bytes of padding added by align(8) to reach 8248 bytes total
+}
+
+impl RingBufferShared {
+    /// Chunk header size in 32-bit words (matching libqb)
+    const CHUNK_HEADER_WORDS: usize = 2;
+
+    /// Chunk magic numbers (matching libqb qb_ringbuffer_int.h)
+    const CHUNK_MAGIC: u32 = 0xA1A1A1A1; // Valid allocated chunk
+    const CHUNK_MAGIC_DEAD: u32 = 0xD0D0D0D0; // Reclaimed/dead chunk
+    const CHUNK_MAGIC_ALLOC: u32 = 0xA110CED0; // Chunk being allocated
+
+    /// Calculate the next pointer position after a chunk of given size
+    ///
+    /// This implements libqb's qb_rb_chunk_step logic (ringbuffer.c:464-484):
+    /// 1. Skip chunk header (CHUNK_HEADER_WORDS)
+    /// 2. Skip user data (rounded up to word boundary)
+    /// 3. Wrap around if needed
+    ///
+    /// # Arguments
+    /// - `current_pt`: Current read or write pointer (in words)
+    /// - `data_size_bytes`: Size of the data payload in bytes
+    ///
+    /// # Returns
+    /// New pointer position (in words), wrapped to [0, word_size)
+    fn chunk_step(&self, current_pt: u32, data_size_bytes: usize) -> u32 {
+        let word_size = self.word_size as usize;
+
+        // Convert bytes to words, rounding up to word boundary
+        // This matches libqb's logic:
+        //   pointer += (chunk_size / sizeof(uint32_t));
+        //   if ((chunk_size % (sizeof(uint32_t) * QB_RB_WORD_ALIGN)) != 0) pointer++;
+        let data_words = data_size_bytes.div_ceil(std::mem::size_of::<u32>());
+
+        // Calculate new position: current + header + data (in words)
+        let new_pt = (current_pt as usize + Self::CHUNK_HEADER_WORDS + data_words) % word_size;
+
+        new_pt as u32
+    }
+
+    /// Initialize a RingBufferShared structure in-place in shared memory
+    ///
+    /// This initializes the ring buffer header at its current memory location, which is
+    /// critical for process-shared data structures in mmap'd memory. The structure
+    /// must not be moved after initialization.
+    ///
+    /// # Arguments
+    /// - `word_size`: Size of ring buffer in 32-bit words
+    /// - `hdr_path`: Path to the header file (will be copied into the structure)
+    /// - `data_path`: Path to the data file (will be copied into the structure)
+    ///
+    /// # Safety
+    /// The RingBufferShared must remain at its current memory location and must not
+    /// be moved or copied after initialization.
+    unsafe fn init_in_place(
+        &mut self,
+        word_size: u32,
+        hdr_path: &std::path::Path,
+        data_path: &std::path::Path,
+    ) -> Result<()> {
+        // SAFETY: Caller ensures this structure is in shared memory and will remain
+        // at this location for its lifetime
+        unsafe {
+            // Zero-initialize the entire structure first
+            std::ptr::write_bytes(self as *mut Self, 0, 1);
+
+            // Initialize atomic fields
+            self.write_pt = AtomicU32::new(0);
+            self.read_pt = AtomicU32::new(0);
+            self.word_size = word_size;
+            self.ref_count = AtomicU32::new(1);
+
+            // Initialize semaphore in-place in shared memory
+            // This is critical - the semaphore must be initialized at its final location
+            self.posix_sem
+                .init_in_place()
+                .context("Failed to initialize semaphore")?;
+
+            // Copy header path into structure
+            let hdr_path_str = hdr_path.to_string_lossy();
+            let hdr_path_bytes = hdr_path_str.as_bytes();
+            let len = hdr_path_bytes.len().min(libc::PATH_MAX as usize - 1);
+            self.hdr_path[..len].copy_from_slice(&hdr_path_bytes[..len]);
+
+            // Copy data path into structure
+            let data_path_str = data_path.to_string_lossy();
+            let data_path_bytes = data_path_str.as_bytes();
+            let len = data_path_bytes.len().min(libc::PATH_MAX as usize - 1);
+            self.data_path[..len].copy_from_slice(&data_path_bytes[..len]);
+        }
+
+        Ok(())
+    }
+
+    /// Calculate free space in the ring buffer (in words)
+    ///
+    /// Returns the number of free words (u32 units) available for allocation.
+    /// This uses atomic loads to read the pointers safely.
+    fn space_free_words(&self) -> usize {
+        let write_pt = self.write_pt.load(Ordering::Acquire);
+        let read_pt = self.read_pt.load(Ordering::Acquire);
+        let word_size = self.word_size as usize;
+
+        if write_pt >= read_pt {
+            if write_pt == read_pt {
+                word_size // Buffer is empty, all space available
+            } else {
+                (read_pt as usize + word_size - write_pt as usize) - 1
+            }
+        } else {
+            (read_pt as usize - write_pt as usize) - 1
+        }
+    }
+
+    /// Calculate free space in bytes
+    ///
+    /// Converts the word count to bytes by multiplying by sizeof(uint32_t).
+    /// Matches libqb's qb_rb_space_free (ringbuffer.c:373).
+    fn space_free_bytes(&self) -> usize {
+        self.space_free_words() * std::mem::size_of::<u32>()
+    }
+
+    /// Check if a chunk of given size (in bytes) can fit in the buffer
+    ///
+    /// Includes chunk header overhead and alignment requirements.
+    fn chunk_fits(&self, message_size: usize, chunk_margin: usize) -> bool {
+        let required_bytes = message_size + chunk_margin;
+        self.space_free_bytes() >= required_bytes
+    }
+
+    /// Write a chunk to the ring buffer
+    ///
+    /// This performs the complete chunk write operation:
+    /// 1. Allocate space in the ring buffer
+    /// 2. Write the message data (handling wraparound)
+    /// 3. Commit the chunk (update write_pt, set magic)
+    /// 4. Post to semaphore to wake readers
+    ///
+    /// # Safety
+    /// Caller must ensure:
+    /// - shared_data points to valid ring buffer data
+    /// - There is sufficient space (checked via chunk_fits)
+    /// - No other thread is writing concurrently
+    unsafe fn write_chunk(&self, shared_data: *mut u32, message: &[u8]) -> Result<()> {
+        let msg_len = message.len();
+        let word_size = self.word_size as usize;
+
+        // Get current write pointer
+        let write_pt = self.write_pt.load(Ordering::Acquire);
+
+        // Write chunk header: [size=0][magic=ALLOC]
+        // Matches libqb's qb_rb_chunk_alloc (ringbuffer.c:439-440)
+        unsafe {
+            *shared_data.add(write_pt as usize) = 0; // Size is 0 during allocation
+            *shared_data.add((write_pt as usize + 1) % word_size) = Self::CHUNK_MAGIC_ALLOC;
+        }
+
+        // Write message data
+        let data_offset = (write_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size;
+        let data_ptr = unsafe { shared_data.add(data_offset) as *mut u8 };
+
+        // Handle wraparound - calculate remaining bytes in buffer before wraparound
+        let remaining = (word_size - data_offset) * std::mem::size_of::<u32>();
+        if msg_len <= remaining {
+            // No wraparound needed
+            unsafe {
+                std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, msg_len);
+            }
+        } else {
+            // Need to wrap around
+            unsafe {
+                std::ptr::copy_nonoverlapping(message.as_ptr(), data_ptr, remaining);
+                std::ptr::copy_nonoverlapping(
+                    message.as_ptr().add(remaining),
+                    shared_data as *mut u8,
+                    msg_len - remaining,
+                );
+            }
+        }
+
+        // Calculate new write pointer - matches libqb's qb_rb_chunk_step logic
+        let new_write_pt = self.chunk_step(write_pt, msg_len);
+
+        // Commit: write size, update write pointer, then set magic with atomic RELEASE
+        // This matches libqb's qb_rb_chunk_commit behavior (ringbuffer.c:497-504)
+        unsafe {
+            // 1. Write chunk size
+            *shared_data.add(write_pt as usize) = msg_len as u32;
+
+            // 2. Update write pointer
+            self.write_pt.store(new_write_pt, Ordering::Relaxed);
+
+            // 3. Set magic with RELEASE
+            // RELEASE ensures all previous writes (data, size, write_pt) are visible before magic
+            let magic_offset = (write_pt as usize + 1) % word_size;
+            let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32;
+            (*magic_ptr).store(Self::CHUNK_MAGIC, Ordering::Release);
+
+            // 4. Post to semaphore to wake up waiting readers
+            self.posix_sem
+                .post()
+                .context("Failed to post to semaphore")?;
+        }
+
+        tracing::debug!(
+            "Wrote chunk: {} bytes, write_pt {} -> {}",
+            msg_len,
+            write_pt,
+            new_write_pt
+        );
+
+        Ok(())
+    }
+
+    /// Read a chunk from the ring buffer
+    ///
+    /// This reads the chunk at the current read pointer, validates it,
+    /// copies the data, and reclaims the chunk.
+    ///
+    /// Returns None if the buffer is empty (read_pt == write_pt).
+    ///
+    /// # Safety
+    /// Caller must ensure:
+    /// - shared_data points to valid ring buffer data
+    /// - flow_control_ptr (if Some) points to valid i32
+    /// - No other thread is reading concurrently
+    unsafe fn read_chunk(
+        &self,
+        shared_data: *mut u32,
+        flow_control_ptr: Option<*mut i32>,
+    ) -> Result<Option<Vec<u8>>> {
+        let word_size = self.word_size as usize;
+
+        // Get current read pointer
+        let read_pt = self.read_pt.load(Ordering::Acquire);
+        let write_pt = self.write_pt.load(Ordering::Acquire);
+
+        // Check if buffer is empty
+        if read_pt == write_pt {
+            return Ok(None);
+        }
+
+        // Read chunk header with ACQUIRE to see all writes
+        let magic_offset = (read_pt as usize + 1) % word_size;
+        let magic_ptr = unsafe { shared_data.add(magic_offset) as *const AtomicU32 };
+        let chunk_magic = unsafe { (*magic_ptr).load(Ordering::Acquire) };
+
+        // Read chunk size
+        let chunk_size = unsafe { *shared_data.add(read_pt as usize) };
+
+        tracing::debug!(
+            "Reading chunk: read_pt={}, write_pt={}, size={}, magic=0x{:08x}",
+            read_pt,
+            write_pt,
+            chunk_size,
+            chunk_magic
+        );
+
+        // Verify magic
+        if chunk_magic != Self::CHUNK_MAGIC {
+            anyhow::bail!(
+                "Invalid chunk magic at read_pt={}: expected 0x{:08x}, got 0x{:08x}",
+                read_pt,
+                Self::CHUNK_MAGIC,
+                chunk_magic
+            );
+        }
+
+        // Read message data
+        let data_offset = (read_pt as usize + Self::CHUNK_HEADER_WORDS) % word_size;
+        let data_ptr = unsafe { shared_data.add(data_offset) as *const u8 };
+
+        let mut message = vec![0u8; chunk_size as usize];
+
+        // Handle wraparound - calculate remaining bytes in buffer before wraparound
+        let remaining = (word_size - data_offset) * std::mem::size_of::<u32>();
+        if chunk_size as usize <= remaining {
+            // No wraparound
+            unsafe {
+                std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), chunk_size as usize);
+            }
+        } else {
+            // Wraparound
+            unsafe {
+                std::ptr::copy_nonoverlapping(data_ptr, message.as_mut_ptr(), remaining);
+                std::ptr::copy_nonoverlapping(
+                    shared_data as *const u8,
+                    message.as_mut_ptr().add(remaining),
+                    chunk_size as usize - remaining,
+                );
+            }
+        }
+
+        // Reclaim chunk: clear header and update read pointer
+        let new_read_pt = self.chunk_step(read_pt, chunk_size as usize);
+
+        unsafe {
+            // Clear chunk size
+            *shared_data.add(read_pt as usize) = 0;
+
+            // Set magic to DEAD with RELEASE
+            let magic_ptr = shared_data.add(magic_offset) as *mut AtomicU32;
+            (*magic_ptr).store(Self::CHUNK_MAGIC_DEAD, Ordering::Release);
+
+            // Update read_pt
+            self.read_pt.store(new_read_pt, Ordering::Relaxed);
+
+            // Signal flow control - server is ready for next request
+            if let Some(fc_ptr) = flow_control_ptr {
+                let refcount = self.ref_count.load(Ordering::Acquire);
+                if refcount == 2 {
+                    let fc_atomic = fc_ptr as *mut AtomicI32;
+                    (*fc_atomic).store(0, Ordering::Relaxed);
+                }
+            }
+        }
+
+        Ok(Some(message))
+    }
+}
+
+/// Flow control mechanism for ring buffer backpressure
+///
+/// Implements libqb's flow control protocol for IPC communication.
+/// The server writes flow control values to shared memory, and clients
+/// read these values to determine if they should back off.
+///
+/// Flow control values (matching libqb's rate limiting):
+/// - `OK`: Proceed with sending (QB_IPCS_RATE_NORMAL)
+/// - `SLOW_DOWN`: Approaching capacity, reduce send rate (QB_IPCS_RATE_OFF)
+/// - `STOP`: Queue full, do not send (QB_IPCS_RATE_OFF_2)
+///
+/// ## Disabled Flow Control
+///
+/// When constructed with a null fc_ptr, flow control is disabled and all
+/// operations become no-ops. This matches libqb's behavior for response/event
+/// rings which don't need backpressure signaling.
+///
+/// Matches libqb's qb_ipc_shm_fc_get/qb_ipc_shm_fc_set (ipc_shm.c:176-195)
+pub struct FlowControl {
+    /// Pointer to flow control field in shared memory (i32 atomic)
+    /// Located in shared_user_data area of RingBufferShared
+    /// If null, flow control is disabled (no-op mode)
+    fc_ptr: *mut i32,
+    /// Pointer to shared header for refcount checks
+    /// If null, flow control is disabled (no-op mode)
+    shared_hdr: *mut RingBufferShared,
+}
+
+impl FlowControl {
+    /// OK to send - queue has space (QB_IPCS_RATE_NORMAL)
+    pub const OK: i32 = 0;
+
+    /// Slow down - queue approaching full (QB_IPCS_RATE_OFF)
+    pub const SLOW_DOWN: i32 = 1;
+
+    /// Stop sending - queue full (QB_IPCS_RATE_OFF_2)
+    pub const STOP: i32 = 2;
+
+    /// Create a new FlowControl instance
+    ///
+    /// Pass null pointers to create a disabled (no-op) flow control instance.
+    /// This is used for response/event rings that don't need backpressure.
+    ///
+    /// # Safety
+    /// - If fc_ptr is non-null, it must point to valid shared memory for an i32
+    /// - If shared_hdr is non-null, it must point to valid RingBufferShared
+    /// - Both must remain valid for the lifetime of FlowControl (if non-null)
+    unsafe fn new(fc_ptr: *mut i32, shared_hdr: *mut RingBufferShared) -> Self {
+        // Initialize to 0 if enabled - server is ready for requests
+        // libqb clients check: if (fc > 0 && fc <= fc_enable_max) return EAGAIN
+        // So 0 means "ready to transmit", > 0 means "flow control active/blocked"
+        if !fc_ptr.is_null() {
+            let fc_atomic = fc_ptr as *mut AtomicI32;
+            unsafe {
+                (*fc_atomic).store(0, Ordering::Relaxed);
+            }
+        }
+
+        Self { fc_ptr, shared_hdr }
+    }
+
+    /// Check if flow control is enabled
+    #[inline]
+    fn is_enabled(&self) -> bool {
+        !self.fc_ptr.is_null()
+    }
+
+    /// Get the raw flow control pointer (for internal use)
+    #[inline]
+    fn fc_ptr(&self) -> *mut i32 {
+        self.fc_ptr
+    }
+
+    /// Get flow control value
+    ///
+    /// Matches libqb's qb_ipc_shm_fc_get (ipc_shm.c:185-195).
+    /// Returns:
+    /// - 0: Ready for requests (or flow control disabled)
+    /// - >0: Flow control active (client should retry)
+    /// - <0: Error (not connected)
+    ///
+    /// Note: This method is primarily for libqb clients, not used internally by server
+    #[allow(dead_code)]
+    pub fn get(&self) -> i32 {
+        if !self.is_enabled() {
+            return 0; // Disabled = always ready
+        }
+
+        // Check if both client and server are connected (refcount == 2)
+        let refcount = unsafe { (*self.shared_hdr).ref_count.load(Ordering::Acquire) };
+        if refcount != 2 {
+            return -libc::ENOTCONN;
+        }
+
+        // Read flow control value atomically
+        unsafe {
+            let fc_atomic = self.fc_ptr as *const AtomicI32;
+            (*fc_atomic).load(Ordering::Relaxed)
+        }
+    }
+
+    /// Set flow control value
+    ///
+    /// Matches libqb's qb_ipc_shm_fc_set (ipc_shm.c:176-182).
+    /// - fc_enable = 0: Ready for requests
+    /// - fc_enable > 0: Flow control active (backpressure)
+    ///
+    /// No-op if flow control is disabled.
+    pub fn set(&self, fc_enable: i32) {
+        if !self.is_enabled() {
+            return; // Disabled = no-op
+        }
+
+        tracing::trace!("Setting flow control to {}", fc_enable);
+        unsafe {
+            let fc_atomic = self.fc_ptr as *mut AtomicI32;
+            (*fc_atomic).store(fc_enable, Ordering::Relaxed);
+        }
+    }
+}
+
+// Safety: FlowControl uses atomic operations for synchronization
+unsafe impl Send for FlowControl {}
+unsafe impl Sync for FlowControl {}
+
+/// Ring buffer handle
+///
+/// Owns the mmap'd memory regions and provides async message-passing API.
+pub struct RingBuffer {
+    /// Mmap of shared header
+    _mmap_hdr: MmapMut,
+    /// Circular mmap of shared data (2x virtual mapping)
+    _mmap_data: CircularMmap,
+    /// Pointer to shared header (inside _mmap_hdr)
+    shared_hdr: *mut RingBufferShared,
+    /// Pointer to shared data array (inside _mmap_data)
+    shared_data: *mut u32,
+    /// Flow control mechanism
+    /// Always present, but may be disabled (no-op) for response/event rings
+    pub flow_control: FlowControl,
+    /// Notifier for when data becomes available (for consumers)
+    data_available: Arc<Notify>,
+    /// Notifier for when space becomes available (for producers)
+    space_available: Arc<Notify>,
+    /// Whether this instance created the ring buffer (and thus owns cleanup)
+    /// Matches libqb's QB_RB_FLAG_CREATE flag
+    is_creator: bool,
+}
+
+// Safety: RingBuffer uses atomic operations for synchronization
+unsafe impl Send for RingBuffer {}
+unsafe impl Sync for RingBuffer {}
+
+impl RingBuffer {
+    /// Chunk margin for space calculations (in bytes)
+    /// Matches libqb: sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS)
+    /// We don't use cache line alignment, so CACHE_LINE_WORDS = 0
+    const CHUNK_MARGIN: usize = 4 * (RingBufferShared::CHUNK_HEADER_WORDS + 1);
+
+    /// Create a new ring buffer in shared memory
+    ///
+    /// Creates two files in `/dev/shm`:
+    /// - `{base_dir}/qb-{name}-header`
+    /// - `{base_dir}/qb-{name}-data`
+    ///
+    /// # Arguments
+    /// - `base_dir`: Directory for shared memory files (typically "/dev/shm")
+    /// - `name`: Ring buffer name
+    /// - `size_bytes`: Size of ring buffer data in bytes
+    /// - `shared_user_data_size`: Extra bytes to allocate after RingBufferShared for flow control
+    ///
+    /// The header file size will be: sizeof(RingBufferShared) + shared_user_data_size
+    /// This matches libqb's behavior: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size
+    pub fn new(
+        base_dir: impl AsRef<Path>,
+        name: &str,
+        size_bytes: usize,
+        shared_user_data_size: usize,
+    ) -> Result<Self> {
+        let base_dir = base_dir.as_ref();
+
+        // Match libqb's size calculation exactly:
+        // 1. Add CHUNK_MARGIN + 1 (13 bytes)
+        //    CHUNK_MARGIN = sizeof(uint32_t) * (CHUNK_HEADER_WORDS + WORD_ALIGN + CACHE_LINE_WORDS)
+        //    = 4 * (2 + 1 + 0) = 12 bytes (without cache line alignment)
+        let size = size_bytes + Self::CHUNK_MARGIN + 1;
+
+        // 2. Round up to page size (typically 4096)
+        let page_size = 4096; // Standard page size on Linux
+        let real_size = size.div_ceil(page_size) * page_size;
+
+        // 3. Calculate word_size from rounded size
+        let word_size = real_size / 4;
+
+        tracing::info!(
+            "Creating ring buffer '{}': size_bytes={}, real_size={}, word_size={} ({}words = {} bytes)",
+            name,
+            size_bytes,
+            real_size,
+            word_size,
+            word_size,
+            real_size
+        );
+
+        // Create header file
+        let hdr_filename = format!("qb-{name}-header");
+        let hdr_path = base_dir.join(&hdr_filename);
+
+        let hdr_file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(&hdr_path)
+            .context("Failed to create header file")?;
+
+        // Resize to fit RingBufferShared structure + shared_user_data
+        // This matches libqb: sizeof(qb_ringbuffer_shared_s) + shared_user_data_size
+        let hdr_size = std::mem::size_of::<RingBufferShared>() + shared_user_data_size;
+        hdr_file
+            .set_len(hdr_size as u64)
+            .context("Failed to resize header file")?;
+
+        // Mmap header
+        let mut mmap_hdr =
+            unsafe { MmapMut::map_mut(&hdr_file) }.context("Failed to mmap header")?;
+
+        // Create data file path (needed for init_in_place)
+        let data_filename = format!("qb-{name}-data");
+        let data_path = base_dir.join(&data_filename);
+
+        // Initialize shared header
+        let shared_hdr = mmap_hdr.as_mut_ptr() as *mut RingBufferShared;
+
+        unsafe {
+            (*shared_hdr).init_in_place(word_size as u32, &hdr_path, &data_path)?;
+        }
+
+        // Create data file
+        let data_file = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(&data_path)
+            .context("Failed to create data file")?;
+
+        // Create data file with real_size (NOT 2x real_size!)
+        // libqb creates the file with real_size, then uses circular mmap to map it TWICE
+        // in consecutive virtual address space. The file itself is only real_size bytes.
+        // During cleanup, libqb unmaps 2*real_size bytes (the circular mmap), but the
+        // file itself remains real_size bytes.
+        data_file
+            .set_len(real_size as u64)
+            .context("Failed to resize data file")?;
+
+        // Create circular mmap - maps the file TWICE in consecutive virtual memory
+        // This matches libqb's qb_sys_circular_mmap implementation
+        let data_fd = data_file.as_raw_fd();
+        let mut mmap_data = unsafe {
+            CircularMmap::new(data_fd, real_size).context("Failed to create circular mmap")?
+        };
+
+        // Zero-initialize the data (only need to zero first half due to circular mapping)
+        unsafe {
+            mmap_data.zero_initialize();
+        }
+
+        let shared_data = mmap_data.as_mut_ptr();
+
+        // Write sentinel value at end of buffer (matches libqb behavior)
+        // This works now because we have circular mmap with 2x virtual space!
+        unsafe {
+            *shared_data.add(word_size) = 5;
+        }
+
+        // Initialize flow control
+        // If shared_user_data_size >= sizeof(i32), flow control is enabled (for request ring)
+        // Otherwise, flow control is disabled (for response/event rings)
+        let flow_control = if shared_user_data_size >= std::mem::size_of::<i32>() {
+            unsafe {
+                // Get pointer to user_data field within the structure
+                // This matches libqb's: return rb->shared_hdr->user_data;
+                let fc_ptr = std::ptr::addr_of_mut!((*shared_hdr).user_data) as *mut i32;
+                FlowControl::new(fc_ptr, shared_hdr)
+            }
+        } else {
+            // Disabled flow control (null pointers = no-op mode)
+            unsafe { FlowControl::new(std::ptr::null_mut(), std::ptr::null_mut()) }
+        };
+
+        Ok(Self {
+            _mmap_hdr: mmap_hdr,
+            _mmap_data: mmap_data,
+            shared_hdr,
+            shared_data,
+            flow_control,
+            data_available: Arc::new(Notify::new()),
+            space_available: Arc::new(Notify::new()),
+            is_creator: true, // This instance created the ring buffer
+        })
+    }
+
+    /// Send a message into the ring buffer (async)
+    ///
+    /// Allocates a chunk, writes the message data, and commits the chunk.
+    /// Awaits if there's insufficient space.
+    pub async fn send(&mut self, message: &[u8]) -> Result<()> {
+        loop {
+            match self.try_send(message) {
+                Ok(()) => {
+                    // Notify consumers that data is available
+                    self.data_available.notify_one();
+                    return Ok(());
+                }
+                Err(e) if e.to_string().contains("Insufficient space") => {
+                    // Wait for space to become available
+                    self.space_available.notified().await;
+                    continue;
+                }
+                Err(e) => return Err(e),
+            }
+        }
+    }
+
+    /// Try to send a message without blocking
+    ///
+    /// Returns an error if there's insufficient space.
+    pub fn try_send(&mut self, message: &[u8]) -> Result<()> {
+        // Check if we have enough space
+        if !unsafe { (*self.shared_hdr).chunk_fits(message.len(), Self::CHUNK_MARGIN) } {
+            let space_free = self.space_free();
+            let required = Self::CHUNK_MARGIN + message.len();
+            anyhow::bail!(
+                "Insufficient space: need {required} bytes, have {space_free} bytes free"
+            );
+        }
+
+        // Write the chunk using RingBufferShared
+        unsafe { (*self.shared_hdr).write_chunk(self.shared_data, message)? };
+
+        Ok(())
+    }
+
+    /// Receive a message from the ring buffer (async)
+    ///
+    /// Awaits if no message is available.
+    /// After processing, the chunk is automatically reclaimed.
+    ///
+    /// ## Implementation Note
+    ///
+    /// libqb uses semaphore-based blocking (sem_timedwait) to wait for data
+    /// (see qb_rb_chunk_peek in libqb/lib/ringbuffer.c).
+    ///
+    /// We use tokio's `spawn_blocking` to wait on the POSIX semaphore without
+    /// blocking the async runtime. This provides true event-driven behavior with
+    /// zero polling overhead, while maintaining compatibility with libqb clients.
+    pub async fn recv(&mut self) -> Result<Vec<u8>> {
+        loop {
+            // Wait on POSIX semaphore asynchronously
+            // This matches libqb's timedwait_fn behavior in qb_rb_chunk_peek
+            // SAFETY: The semaphore is properly initialized in new() and remains
+            // valid for the lifetime of RingBuffer
+            unsafe { (*self.shared_hdr).posix_sem.wait().await? };
+
+            // Semaphore was decremented, data should be available
+            // Read and reclaim the chunk
+            match self.recv_after_semwait()? {
+                Some(data) => {
+                    // Notify producers that space is available
+                    self.space_available.notify_one();
+                    return Ok(data);
+                }
+                None => {
+                    // Spurious wakeup or race condition - semaphore was decremented
+                    // but no valid data found. This shouldn't happen in normal operation.
+                    tracing::warn!("Spurious semaphore wakeup detected, retrying");
+                    continue;
+                }
+            }
+        }
+    }
+
+    /// Receive a message after semaphore has been decremented
+    ///
+    /// This is called after `PosixSem::wait()` has successfully decremented
+    /// the semaphore. It reads the chunk data and reclaims the chunk.
+    ///
+    /// Returns `None` if the buffer is empty despite semaphore being decremented
+    /// (which indicates a bug or race condition).
+    fn recv_after_semwait(&mut self) -> Result<Option<Vec<u8>>> {
+        // Get fc_ptr if flow control is enabled, otherwise null
+        let fc_ptr = if self.flow_control.is_enabled() {
+            Some(self.flow_control.fc_ptr())
+        } else {
+            None
+        };
+        unsafe { (*self.shared_hdr).read_chunk(self.shared_data, fc_ptr) }
+    }
+
+    /// Calculate free space in the ring buffer (in bytes)
+    fn space_free(&self) -> usize {
+        unsafe { (*self.shared_hdr).space_free_bytes() }
+    }
+}
+
+impl Drop for RingBuffer {
+    fn drop(&mut self) {
+        // Decrement ref count
+        let ref_count = unsafe { (*self.shared_hdr).ref_count.fetch_sub(1, Ordering::AcqRel) };
+
+        tracing::debug!(
+            "Dropping ring buffer, ref_count: {} -> {}",
+            ref_count,
+            ref_count - 1
+        );
+
+        // If last reference AND we created it, clean up semaphore and files
+        // This matches libqb's behavior: only the creator (QB_RB_FLAG_CREATE) destroys the semaphore
+        if ref_count == 1 && self.is_creator {
+            unsafe {
+                // Destroy the semaphore before cleaning up the mmap
+                // Matches libqb's cleanup in qb_rb_close_helper
+                if let Err(e) = (*self.shared_hdr).posix_sem.destroy() {
+                    tracing::warn!("Failed to destroy semaphore: {}", e);
+                }
+
+                let hdr_path =
+                    std::ffi::CStr::from_ptr((*self.shared_hdr).hdr_path.as_ptr() as *const i8);
+                let data_path =
+                    std::ffi::CStr::from_ptr((*self.shared_hdr).data_path.as_ptr() as *const i8);
+
+                if let Ok(hdr_path_str) = hdr_path.to_str()
+                    && !hdr_path_str.is_empty()
+                {
+                    let _ = std::fs::remove_file(hdr_path_str);
+                    tracing::debug!("Removed header file: {}", hdr_path_str);
+                }
+
+                if let Ok(data_path_str) = data_path.to_str()
+                    && !data_path_str.is_empty()
+                {
+                    let _ = std::fs::remove_file(data_path_str);
+                    tracing::debug!("Removed data file: {}", data_path_str);
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_ringbuffer_basic() -> Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+        // Send a message
+        rb.send(b"hello world").await?;
+
+        // Receive the message
+        let msg = rb.recv().await?;
+        assert_eq!(msg, b"hello world");
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ringbuffer_multiple_messages() -> Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+        // Send multiple messages
+        rb.send(b"message 1").await?;
+        rb.send(b"message 2").await?;
+        rb.send(b"message 3").await?;
+
+        // Receive in order
+        assert_eq!(rb.recv().await?, b"message 1");
+        assert_eq!(rb.recv().await?, b"message 2");
+        assert_eq!(rb.recv().await?, b"message 3");
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ringbuffer_nonblocking_send() -> Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let mut rb = RingBuffer::new(temp_dir.path(), "test", 4096, 0)?;
+
+        // Test try_send (non-blocking send) with async recv
+        rb.try_send(b"data")?;
+        let msg = rb.recv().await?;
+        assert_eq!(msg, b"data");
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ringbuffer_wraparound() -> Result<()> {
+        let temp_dir = tempfile::tempdir()?;
+        let mut rb = RingBuffer::new(temp_dir.path(), "test", 256, 0)?;
+
+        // Fill and drain to force wraparound
+        for _ in 0..10 {
+            rb.send(b"data").await?;
+            rb.recv().await?;
+        }
+
+        // Should still work
+        rb.send(b"after wrap").await?;
+        assert_eq!(rb.recv().await?, b"after wrap");
+
+        Ok(())
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
new file mode 100644
index 00000000..73d63de0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/server.rs
@@ -0,0 +1,278 @@
+/// Main libqb IPC server implementation
+///
+/// This module contains the Server struct and its implementation,
+/// including connection acceptance and server lifecycle management.
+use anyhow::{Context, Result};
+use parking_lot::Mutex;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use tokio::net::UnixListener;
+use tokio_util::sync::CancellationToken;
+
+use super::connection::QbConnection;
+use super::handler::Handler;
+use super::socket::bind_abstract_socket;
+
+/// Server-level connection statistics (matches libqb qb_ipcs_stats)
+#[derive(Debug, Default)]
+pub struct ServerStats {
+    /// Number of currently active connections
+    pub active_connections: AtomicUsize,
+    /// Total number of closed connections since server start
+    pub closed_connections: AtomicUsize,
+}
+
+impl ServerStats {
+    fn new() -> Self {
+        Self {
+            active_connections: AtomicUsize::new(0),
+            closed_connections: AtomicUsize::new(0),
+        }
+    }
+
+    /// Increment active connections count (new connection established)
+    fn connection_created(&self) {
+        self.active_connections.fetch_add(1, Ordering::Relaxed);
+        tracing::debug!(
+            active = self.active_connections.load(Ordering::Relaxed),
+            closed = self.closed_connections.load(Ordering::Relaxed),
+            "Connection created"
+        );
+    }
+
+    /// Decrement active, increment closed (connection terminated)
+    fn connection_closed(&self) {
+        self.active_connections.fetch_sub(1, Ordering::Relaxed);
+        self.closed_connections.fetch_add(1, Ordering::Relaxed);
+        tracing::debug!(
+            active = self.active_connections.load(Ordering::Relaxed),
+            closed = self.closed_connections.load(Ordering::Relaxed),
+            "Connection closed"
+        );
+    }
+
+    /// Get current statistics (for monitoring/debugging)
+    pub fn get(&self) -> (usize, usize) {
+        (
+            self.active_connections.load(Ordering::Relaxed),
+            self.closed_connections.load(Ordering::Relaxed),
+        )
+    }
+}
+
+/// libqb-compatible IPC server
+pub struct Server {
+    service_name: String,
+
+    // Setup socket (SOCK_STREAM) - accepts new connections
+    setup_listener: Option<Arc<UnixListener>>,
+
+    // Per-connection state
+    connections: Arc<Mutex<HashMap<u64, QbConnection>>>,
+    next_conn_id: Arc<AtomicU64>,
+
+    // Connection statistics (matches libqb behavior)
+    stats: Arc<ServerStats>,
+
+    // Message handler (trait object, also handles authentication)
+    handler: Arc<dyn Handler>,
+
+    // Cancellation token for graceful shutdown
+    cancellation_token: CancellationToken,
+}
+
+impl Server {
+    /// Create a new libqb-compatible IPC server
+    ///
+    /// Uses Linux abstract Unix sockets for IPC (no filesystem paths needed).
+    ///
+    /// # Arguments
+    /// * `service_name` - Service name (e.g., "pve2"), used as abstract socket name
+    /// * `handler` - Handler implementing the Handler trait (handles both authentication and requests)
+    pub fn new(service_name: &str, handler: impl Handler + 'static) -> Self {
+        Self {
+            service_name: service_name.to_string(),
+            setup_listener: None,
+            connections: Arc::new(Mutex::new(HashMap::new())),
+            next_conn_id: Arc::new(AtomicU64::new(1)),
+            stats: Arc::new(ServerStats::new()),
+            handler: Arc::new(handler),
+            cancellation_token: CancellationToken::new(),
+        }
+    }
+
+    /// Start the IPC server
+    ///
+    /// Creates abstract Unix socket that libqb clients can connect to
+    pub fn start(&mut self) -> Result<()> {
+        tracing::info!(
+            "Starting libqb-compatible IPC server: {}",
+            self.service_name
+        );
+
+        // Create abstract Unix socket (no filesystem paths needed)
+        let std_listener =
+            bind_abstract_socket(&self.service_name).context("Failed to bind abstract socket")?;
+
+        // Convert to tokio listener
+        std_listener.set_nonblocking(true)?;
+        let listener = UnixListener::from_std(std_listener)?;
+
+        tracing::info!("Bound abstract Unix socket: @{}", self.service_name);
+
+        let listener_arc = Arc::new(listener);
+        self.setup_listener = Some(listener_arc.clone());
+
+        // Start connection acceptor task
+        let context = AcceptorContext {
+            listener: listener_arc,
+            service_name: self.service_name.clone(),
+            connections: self.connections.clone(),
+            next_conn_id: self.next_conn_id.clone(),
+            stats: self.stats.clone(),
+            handler: self.handler.clone(),
+            cancellation_token: self.cancellation_token.child_token(),
+        };
+
+        tokio::spawn(async move {
+            context.run().await;
+        });
+
+        tracing::info!("libqb IPC server started: {}", self.service_name);
+        Ok(())
+    }
+
+    /// Stop the IPC server
+    pub fn stop(&mut self) {
+        tracing::info!("Stopping libqb IPC server: {}", self.service_name);
+
+        // Signal all tasks to stop
+        self.cancellation_token.cancel();
+
+        // Close all connections
+        let connections = std::mem::take(&mut *self.connections.lock());
+        let num_connections = connections.len();
+
+        for (_id, conn) in connections {
+            // Clean up ring buffer files
+            for rb_path in &conn.ring_buffer_paths {
+                if let Err(e) = std::fs::remove_file(rb_path) {
+                    tracing::debug!(
+                        "Failed to remove ring buffer file {} (may already be cleaned up): {}",
+                        rb_path.display(),
+                        e
+                    );
+                }
+            }
+
+            // Update statistics
+            self.stats.connection_closed();
+
+            // Task handles will be aborted when dropped
+        }
+
+        // Final stats
+        if num_connections > 0 {
+            let (active, closed) = self.stats.get();
+            tracing::info!(
+                "Closed {} connections (final stats: active={}, closed={})",
+                num_connections,
+                active,
+                closed
+            );
+        }
+
+        self.setup_listener = None;
+
+        tracing::info!("libqb IPC server stopped");
+    }
+}
+
+impl Drop for Server {
+    fn drop(&mut self) {
+        self.stop();
+    }
+}
+
+/// Context for the connection acceptor task
+///
+/// Bundles all the state needed by the acceptor loop to avoid passing many parameters.
+struct AcceptorContext {
+    listener: Arc<UnixListener>,
+    service_name: String,
+    connections: Arc<Mutex<HashMap<u64, QbConnection>>>,
+    next_conn_id: Arc<AtomicU64>,
+    stats: Arc<ServerStats>,
+    handler: Arc<dyn Handler>,
+    cancellation_token: CancellationToken,
+}
+
+impl AcceptorContext {
+    /// Run the connection acceptor loop
+    ///
+    /// Accepts new connections and spawns handler tasks for each.
+    async fn run(self) {
+        tracing::debug!("libqb IPC connection acceptor started");
+
+        loop {
+            // Accept new connection with cancellation support
+            let accept_result = tokio::select! {
+                _ = self.cancellation_token.cancelled() => {
+                    tracing::debug!("Connection acceptor cancelled");
+                    break;
+                }
+                result = self.listener.accept() => result,
+            };
+
+            let (stream, _addr) = match accept_result {
+                Ok((stream, addr)) => (stream, addr),
+                Err(e) => {
+                    if !self.cancellation_token.is_cancelled() {
+                        tracing::error!("Error accepting connection: {}", e);
+                    }
+                    break;
+                }
+            };
+
+            tracing::debug!("Accepted new setup connection");
+
+            // Handle connection
+            let conn_id = self.next_conn_id.fetch_add(1, Ordering::SeqCst);
+            match QbConnection::accept(
+                stream,
+                conn_id,
+                &self.service_name,
+                self.handler.clone(),
+                self.cancellation_token.child_token(),
+            )
+            .await
+            {
+                Ok(conn) => {
+                    self.connections.lock().insert(conn_id, conn);
+                    // Update statistics
+                    self.stats.connection_created();
+                }
+                Err(e) => {
+                    tracing::error!("Failed to accept connection {}: {}", conn_id, e);
+                }
+            }
+        }
+
+        tracing::debug!("libqb IPC connection acceptor finished");
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::protocol::*;
+
+    #[test]
+    fn test_header_sizes() {
+        // Verify C struct compatibility
+        assert_eq!(std::mem::size_of::<RequestHeader>(), 16);
+        assert_eq!(std::mem::align_of::<RequestHeader>(), 8);
+        assert_eq!(std::mem::size_of::<ResponseHeader>(), 24);
+        assert_eq!(std::mem::align_of::<ResponseHeader>(), 8);
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
new file mode 100644
index 00000000..5831b329
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/src/socket.rs
@@ -0,0 +1,84 @@
+/// Abstract Unix socket utilities
+///
+/// This module provides functions for working with Linux abstract Unix sockets,
+/// which are used by libqb for IPC communication.
+use anyhow::Result;
+use std::os::unix::io::FromRawFd;
+use std::os::unix::net::UnixListener;
+
+/// Bind to an abstract Unix socket (Linux-specific)
+///
+/// Abstract sockets are identified by a name in the kernel's socket namespace,
+/// not a filesystem path. They are automatically removed when all references are closed.
+///
+/// libqb clients create abstract sockets with FULL 108-byte sun_path (null-padded).
+/// Linux abstract sockets are length-sensitive, so we must match exactly.
+pub(super) fn bind_abstract_socket(name: &str) -> Result<UnixListener> {
+    // Create a Unix socket using libc directly
+    let sock_fd = unsafe { libc::socket(libc::AF_UNIX, libc::SOCK_STREAM, 0) };
+    if sock_fd < 0 {
+        anyhow::bail!(
+            "Failed to create Unix socket: {}",
+            std::io::Error::last_os_error()
+        );
+    }
+
+    // RAII guard to ensure socket is closed on error
+    struct SocketGuard(i32);
+    impl Drop for SocketGuard {
+        fn drop(&mut self) {
+            unsafe { libc::close(self.0) };
+        }
+    }
+    let guard = SocketGuard(sock_fd);
+
+    // Create sockaddr_un with full 108-byte abstract address (matching libqb)
+    // libqb format: sun_path[0] = '\0', sun_path[1..] = "name\0\0..." (null-padded)
+    let mut addr: libc::sockaddr_un = unsafe { std::mem::zeroed() };
+    addr.sun_family = libc::AF_UNIX as libc::sa_family_t;
+
+    // sun_path[0] is already 0 (abstract socket marker)
+    // Copy name starting at sun_path[1]
+    let name_bytes = name.as_bytes();
+    let copy_len = name_bytes.len().min(107); // Leave room for initial \0
+    unsafe {
+        std::ptr::copy_nonoverlapping(
+            name_bytes.as_ptr(),
+            addr.sun_path.as_mut_ptr().offset(1) as *mut u8,
+            copy_len,
+        );
+    }
+
+    // Use FULL sockaddr_un length for libqb compatibility!
+    // libqb clients use the full 110-byte structure (2 + 108) when connecting,
+    // so we MUST bind with the same length. Verified via strace.
+    let addr_len = std::mem::size_of::<libc::sockaddr_un>() as libc::socklen_t;
+    let bind_res = unsafe {
+        libc::bind(
+            sock_fd,
+            &addr as *const _ as *const libc::sockaddr,
+            addr_len,
+        )
+    };
+    if bind_res < 0 {
+        anyhow::bail!(
+            "Failed to bind abstract socket: {}",
+            std::io::Error::last_os_error()
+        );
+    }
+
+    // Set socket to listen mode (backlog = 128)
+    let listen_res = unsafe { libc::listen(sock_fd, 128) };
+    if listen_res < 0 {
+        anyhow::bail!(
+            "Failed to listen on socket: {}",
+            std::io::Error::last_os_error()
+        );
+    }
+
+    // Convert raw fd to UnixListener (takes ownership, forget guard)
+    std::mem::forget(guard);
+    let listener = unsafe { UnixListener::from_raw_fd(sock_fd) };
+
+    Ok(listener)
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
new file mode 100644
index 00000000..f8e541b0
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/auth_test.rs
@@ -0,0 +1,450 @@
+//! Authentication tests for pmxcfs-ipc
+//!
+//! These tests verify that the Handler::authenticate() mechanism works correctly
+//! for different authentication policies.
+//!
+//! Note: These tests use real Unix sockets, so they test authentication behavior
+//! from the server's perspective. The UID/GID will be the test process's credentials,
+//! so we test the Handler logic rather than OS-level credential checking.
+use async_trait::async_trait;
+use pmxcfs_ipc::{Handler, Permissions, Request, Response, Server};
+use pmxcfs_test_utils::wait_for_condition_blocking;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::thread;
+use std::time::Duration;
+
+/// Helper to create a unique service name for each test
+fn unique_service_name() -> String {
+    static COUNTER: AtomicU32 = AtomicU32::new(0);
+    format!("auth-test-{}", COUNTER.fetch_add(1, Ordering::SeqCst))
+}
+
+/// Helper to connect using the qb_wire_compat FFI client
+/// Returns true if connection succeeded, false if rejected
+fn try_connect(service_name: &str) -> bool {
+    use std::ffi::CString;
+
+    #[repr(C)]
+    struct QbIpccConnection {
+        _private: [u8; 0],
+    }
+
+    #[link(name = "qb")]
+    unsafe extern "C" {
+        fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize)
+        -> *mut QbIpccConnection;
+        fn qb_ipcc_disconnect(conn: *mut QbIpccConnection);
+    }
+
+    let name = CString::new(service_name).expect("Invalid service name");
+    let conn = unsafe { qb_ipcc_connect(name.as_ptr(), 8192) };
+
+    let success = !conn.is_null();
+
+    if success {
+        unsafe { qb_ipcc_disconnect(conn) };
+    }
+
+    success
+}
+
+// ============================================================================
+// Test Handlers with Different Authentication Policies
+// ============================================================================
+
+/// Handler that accepts all connections with read-write access
+struct AcceptAllHandler;
+
+#[async_trait]
+impl Handler for AcceptAllHandler {
+    fn authenticate(&self, _uid: u32, _gid: u32) -> Option<Permissions> {
+        Some(Permissions::ReadWrite)
+    }
+
+    async fn handle(&self, _request: Request) -> Response {
+        Response::ok(b"test".to_vec())
+    }
+}
+
+/// Handler that rejects all connections
+struct RejectAllHandler;
+
+#[async_trait]
+impl Handler for RejectAllHandler {
+    fn authenticate(&self, _uid: u32, _gid: u32) -> Option<Permissions> {
+        None
+    }
+
+    async fn handle(&self, _request: Request) -> Response {
+        Response::ok(b"test".to_vec())
+    }
+}
+
+/// Handler that only accepts root (uid=0)
+struct RootOnlyHandler;
+
+#[async_trait]
+impl Handler for RootOnlyHandler {
+    fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+        if uid == 0 {
+            Some(Permissions::ReadWrite)
+        } else {
+            None
+        }
+    }
+
+    async fn handle(&self, _request: Request) -> Response {
+        Response::ok(b"test".to_vec())
+    }
+}
+
+/// Handler that tracks authentication calls
+struct TrackingHandler {
+    call_count: Arc<AtomicU32>,
+    last_uid: Arc<AtomicU32>,
+    last_gid: Arc<AtomicU32>,
+}
+
+impl TrackingHandler {
+    fn new() -> (Self, Arc<AtomicU32>, Arc<AtomicU32>, Arc<AtomicU32>) {
+        let call_count = Arc::new(AtomicU32::new(0));
+        let last_uid = Arc::new(AtomicU32::new(0));
+        let last_gid = Arc::new(AtomicU32::new(0));
+
+        (
+            Self {
+                call_count: call_count.clone(),
+                last_uid: last_uid.clone(),
+                last_gid: last_gid.clone(),
+            },
+            call_count,
+            last_uid,
+            last_gid,
+        )
+    }
+}
+
+#[async_trait]
+impl Handler for TrackingHandler {
+    fn authenticate(&self, uid: u32, gid: u32) -> Option<Permissions> {
+        self.call_count.fetch_add(1, Ordering::SeqCst);
+        self.last_uid.store(uid, Ordering::SeqCst);
+        self.last_gid.store(gid, Ordering::SeqCst);
+        Some(Permissions::ReadWrite)
+    }
+
+    async fn handle(&self, _request: Request) -> Response {
+        Response::ok(b"test".to_vec())
+    }
+}
+
+/// Handler that grants read-only access to non-root
+struct ReadOnlyForNonRootHandler;
+
+#[async_trait]
+impl Handler for ReadOnlyForNonRootHandler {
+    fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+        if uid == 0 {
+            Some(Permissions::ReadWrite)
+        } else {
+            Some(Permissions::ReadOnly)
+        }
+    }
+
+    async fn handle(&self, request: Request) -> Response {
+        // read_only field is visible to the handler via the connection
+        // For testing purposes, just accept requests
+        Response::ok(format!("handled msg_id {}", request.msg_id).into_bytes())
+    }
+}
+
+// ============================================================================
+// Helper to start server in background thread
+// ============================================================================
+
+fn start_server<H: Handler + 'static>(service_name: String, handler: H) -> thread::JoinHandle<()> {
+    thread::spawn(move || {
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
+        rt.block_on(async {
+            let mut server = Server::new(&service_name, handler);
+            server.start().expect("Server startup failed");
+            std::future::pending::<()>().await;
+        });
+    })
+}
+
+/// Wait for server to be ready by checking if socket file exists
+fn wait_for_server_ready(service_name: &str) {
+    // The socket is created in /dev/shm/qb-{service_name}-*
+    // We'll just try to connect repeatedly until successful or timeout
+    assert!(
+        wait_for_condition_blocking(
+            || {
+                // Try a quick connection attempt
+                // For servers that accept connections, this will succeed
+                // For servers that reject, the socket will at least exist
+
+                let socket_pattern = format!("/dev/shm/qb-{service_name}-");
+                // Check if any socket file matching the pattern exists
+                if let Ok(entries) = std::fs::read_dir("/dev/shm") {
+                    for entry in entries.flatten() {
+                        if let Ok(name) = entry.file_name().into_string()
+                            && name.starts_with(&socket_pattern)
+                        {
+                            return true;
+                        }
+                    }
+                }
+                false
+            },
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        ),
+        "Server should be ready within 5 seconds"
+    );
+}
+
+// ============================================================================
+// Tests
+// ============================================================================
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_accept_all_handler() {
+    let service_name = unique_service_name();
+    let _server = start_server(service_name.clone(), AcceptAllHandler);
+
+    wait_for_server_ready(&service_name);
+
+    assert!(
+        try_connect(&service_name),
+        "AcceptAllHandler should accept connection"
+    );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_reject_all_handler() {
+    let service_name = unique_service_name();
+    let _server = start_server(service_name.clone(), RejectAllHandler);
+
+    wait_for_server_ready(&service_name);
+
+    assert!(
+        !try_connect(&service_name),
+        "RejectAllHandler should reject connection"
+    );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_root_only_handler() {
+    let service_name = unique_service_name();
+    let _server = start_server(service_name.clone(), RootOnlyHandler);
+
+    wait_for_server_ready(&service_name);
+
+    let connected = try_connect(&service_name);
+
+    // Get current uid
+    let current_uid = unsafe { libc::getuid() };
+
+    if current_uid == 0 {
+        assert!(
+            connected,
+            "RootOnlyHandler should accept connection when running as root"
+        );
+    } else {
+        assert!(
+            !connected,
+            "RootOnlyHandler should reject connection when not running as root (uid={current_uid})"
+        );
+    }
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_authentication_called_with_credentials() {
+    let service_name = unique_service_name();
+    let (handler, call_count, last_uid, last_gid) = TrackingHandler::new();
+    let _server = start_server(service_name.clone(), handler);
+
+    wait_for_server_ready(&service_name);
+
+    let current_uid = unsafe { libc::getuid() };
+    let current_gid = unsafe { libc::getgid() };
+
+    assert_eq!(
+        call_count.load(Ordering::SeqCst),
+        0,
+        "Should not be called yet"
+    );
+
+    let connected = try_connect(&service_name);
+
+    assert!(connected, "TrackingHandler should accept connection");
+    assert_eq!(
+        call_count.load(Ordering::SeqCst),
+        1,
+        "authenticate() should be called once"
+    );
+    assert_eq!(
+        last_uid.load(Ordering::SeqCst),
+        current_uid,
+        "authenticate() should receive correct uid"
+    );
+    assert_eq!(
+        last_gid.load(Ordering::SeqCst),
+        current_gid,
+        "authenticate() should receive correct gid"
+    );
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_multiple_connections_call_authenticate_each_time() {
+    let service_name = unique_service_name();
+    let (handler, call_count, _, _) = TrackingHandler::new();
+    let _server = start_server(service_name.clone(), handler);
+
+    wait_for_server_ready(&service_name);
+
+    // First connection
+    assert!(try_connect(&service_name));
+    assert_eq!(call_count.load(Ordering::SeqCst), 1);
+
+    // Second connection
+    assert!(try_connect(&service_name));
+    assert_eq!(call_count.load(Ordering::SeqCst), 2);
+
+    // Third connection
+    assert!(try_connect(&service_name));
+    assert_eq!(call_count.load(Ordering::SeqCst), 3);
+}
+
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_read_only_permissions_accepted() {
+    let service_name = unique_service_name();
+    let _server = start_server(service_name.clone(), ReadOnlyForNonRootHandler);
+
+    wait_for_server_ready(&service_name);
+
+    // Connection should succeed regardless of whether we get ReadOnly or ReadWrite
+    // (both are accepted, just with different permissions)
+    assert!(
+        try_connect(&service_name),
+        "ReadOnlyForNonRootHandler should accept connections with appropriate permissions"
+    );
+}
+
+/// Test that demonstrates the authentication policy is enforced at connection time
+#[test]
+#[ignore] // Requires libqb-dev
+fn test_authentication_enforced_at_connection_time() {
+    // This test verifies that authentication happens during connection setup,
+    // not during request handling
+    let service_name = unique_service_name();
+    let _server = start_server(service_name.clone(), RejectAllHandler);
+
+    wait_for_server_ready(&service_name);
+
+    // Connection should fail immediately, before any request is sent
+    let start = std::time::Instant::now();
+    let connected = try_connect(&service_name);
+    let duration = start.elapsed();
+
+    assert!(!connected, "Connection should be rejected");
+    assert!(
+        duration < Duration::from_millis(100),
+        "Rejection should happen quickly during handshake, not during request processing"
+    );
+}
+
+#[cfg(test)]
+mod policy_examples {
+    use super::*;
+
+    /// Example: Handler that mimics Proxmox VE authentication policy
+    /// - Root (uid=0) gets read-write
+    /// - www-data (uid=33) gets read-only (for web UI)
+    /// - Others are rejected
+    struct ProxmoxStyleHandler;
+
+    #[async_trait]
+    impl Handler for ProxmoxStyleHandler {
+        fn authenticate(&self, uid: u32, _gid: u32) -> Option<Permissions> {
+            match uid {
+                0 => Some(Permissions::ReadWrite), // root
+                33 => Some(Permissions::ReadOnly), // www-data
+                _ => None,                         // reject others
+            }
+        }
+
+        async fn handle(&self, request: Request) -> Response {
+            // In real implementation, would check request.read_only
+            // to enforce read-only restrictions
+            Response::ok(format!("msg_id {}", request.msg_id).into_bytes())
+        }
+    }
+
+    #[test]
+    #[ignore] // Requires libqb-dev
+    fn test_proxmox_style_policy() {
+        let service_name = unique_service_name();
+        let _server = start_server(service_name.clone(), ProxmoxStyleHandler);
+
+        wait_for_server_ready(&service_name);
+
+        let current_uid = unsafe { libc::getuid() };
+        let connected = try_connect(&service_name);
+
+        match current_uid {
+            0 => assert!(connected, "Root should be accepted"),
+            33 => assert!(connected, "www-data should be accepted"),
+            _ => assert!(!connected, "Other users should be rejected"),
+        }
+    }
+
+    /// Example: Handler that uses group-based authentication
+    struct GroupBasedHandler {
+        allowed_gid: u32,
+    }
+
+    impl GroupBasedHandler {
+        fn new(allowed_gid: u32) -> Self {
+            Self { allowed_gid }
+        }
+    }
+
+    #[async_trait]
+    impl Handler for GroupBasedHandler {
+        fn authenticate(&self, _uid: u32, gid: u32) -> Option<Permissions> {
+            if gid == self.allowed_gid {
+                Some(Permissions::ReadWrite)
+            } else {
+                None
+            }
+        }
+
+        async fn handle(&self, _request: Request) -> Response {
+            Response::ok(b"ok".to_vec())
+        }
+    }
+
+    #[test]
+    #[ignore] // Requires libqb-dev
+    fn test_group_based_authentication() {
+        let service_name = unique_service_name();
+        let current_gid = unsafe { libc::getgid() };
+        let _server = start_server(service_name.clone(), GroupBasedHandler::new(current_gid));
+
+        wait_for_server_ready(&service_name);
+
+        assert!(
+            try_connect(&service_name),
+            "Should accept connection from same group"
+        );
+    }
+}
diff --git a/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs
new file mode 100644
index 00000000..8c0db962
--- /dev/null
+++ b/src/pmxcfs-rs/pmxcfs-ipc/tests/qb_wire_compat.rs
@@ -0,0 +1,413 @@
+//! Wire protocol compatibility test with libqb C clients
+//!
+//! This integration test verifies that our Rust Server is fully compatible
+//! with real libqb C clients by using libqb's client API via FFI.
+//!
+//! Run with: cargo test --package pmxcfs-ipc --test qb_wire_compat -- --ignored --nocapture
+//!
+//! Requires: libqb-dev installed
+
+use pmxcfs_test_utils::wait_for_condition_blocking;
+use std::ffi::CString;
+use std::thread;
+use std::time::Duration;
+
+// ============================================================================
+// Minimal libqb FFI bindings (client-side only)
+// ============================================================================
+
+/// libqb request header matching C's __attribute__ ((aligned(8)))
+/// Each field is i32 with 8-byte alignment, achieved via explicit padding
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+struct QbIpcRequestHeader {
+    id: i32,    // 4 bytes
+    _pad1: u32, // 4 bytes padding
+    size: i32,  // 4 bytes
+    _pad2: u32, // 4 bytes padding
+}
+
+/// libqb response header matching C's __attribute__ ((aligned(8)))
+/// Each field is i32 with 8-byte alignment, achieved via explicit padding
+#[repr(C, align(8))]
+#[derive(Debug, Copy, Clone)]
+struct QbIpcResponseHeader {
+    id: i32,    // 4 bytes
+    _pad1: u32, // 4 bytes padding
+    size: i32,  // 4 bytes
+    _pad2: u32, // 4 bytes padding
+    error: i32, // 4 bytes
+    _pad3: u32, // 4 bytes padding
+}
+
+// Opaque type for connection handle
+#[repr(C)]
+struct QbIpccConnection {
+    _private: [u8; 0],
+}
+
+#[link(name = "qb")]
+unsafe extern "C" {
+    /// Connect to a QB IPC service
+    /// Returns NULL on failure
+    fn qb_ipcc_connect(name: *const libc::c_char, max_msg_size: usize) -> *mut QbIpccConnection;
+
+    /// Send request and receive response (with iovec)
+    /// Returns number of bytes received, or negative errno on error
+    fn qb_ipcc_sendv_recv(
+        conn: *mut QbIpccConnection,
+        iov: *const libc::iovec,
+        iov_len: u32,
+        res_buf: *mut libc::c_void,
+        res_buf_size: usize,
+        timeout_ms: i32,
+    ) -> libc::ssize_t;
+
+    /// Disconnect from service
+    fn qb_ipcc_disconnect(conn: *mut QbIpccConnection);
+
+    /// Initialize libqb logging
+    fn qb_log_init(name: *const libc::c_char, facility: i32, priority: i32);
+
+    /// Control log targets
+    fn qb_log_ctl(target: i32, conf: i32, arg: i32) -> i32;
+
+    /// Filter control
+    fn qb_log_filter_ctl(
+        target: i32,
+        op: i32,
+        type_: i32,
+        text: *const libc::c_char,
+        priority: i32,
+    ) -> i32;
+}
+
+// Log targets
+const QB_LOG_STDERR: i32 = 2;
+
+// Log control operations
+const QB_LOG_CONF_ENABLED: i32 = 1;
+
+// Log filter operations
+const QB_LOG_FILTER_ADD: i32 = 0;
+const QB_LOG_FILTER_FILE: i32 = 1;
+
+// Log levels (from syslog.h)
+const LOG_TRACE: i32 = 8; // LOG_DEBUG + 1
+
+// ============================================================================
+// Safe Rust wrapper around libqb client
+// ============================================================================
+
+struct QbIpcClient {
+    conn: *mut QbIpccConnection,
+}
+
+impl QbIpcClient {
+    fn connect(service_name: &str, max_msg_size: usize) -> Result<Self, String> {
+        let name = CString::new(service_name).map_err(|e| format!("Invalid service name: {e}"))?;
+
+        let conn = unsafe { qb_ipcc_connect(name.as_ptr(), max_msg_size) };
+
+        if conn.is_null() {
+            let errno = unsafe { *libc::__errno_location() };
+            let error_str = unsafe {
+                let err_ptr = libc::strerror(errno);
+                std::ffi::CStr::from_ptr(err_ptr)
+                    .to_string_lossy()
+                    .to_string()
+            };
+            Err(format!(
+                "qb_ipcc_connect returned NULL (errno={errno}: {error_str})"
+            ))
+        } else {
+            Ok(Self { conn })
+        }
+    }
+
+    fn send_recv(
+        &self,
+        request_id: i32,
+        request_data: &[u8],
+        timeout_ms: i32,
+    ) -> Result<(i32, Vec<u8>), String> {
+        // Build request
+        let req_header = QbIpcRequestHeader {
+            id: request_id,
+            _pad1: 0,
+            size: (std::mem::size_of::<QbIpcRequestHeader>() + request_data.len()) as i32,
+            _pad2: 0,
+        };
+
+        // Setup iovec
+        let mut iov = vec![libc::iovec {
+            iov_base: &req_header as *const _ as *mut libc::c_void,
+            iov_len: std::mem::size_of::<QbIpcRequestHeader>(),
+        }];
+
+        if !request_data.is_empty() {
+            iov.push(libc::iovec {
+                iov_base: request_data.as_ptr() as *mut libc::c_void,
+                iov_len: request_data.len(),
+            });
+        }
+
+        // Response buffer
+        const MAX_RESPONSE: usize = 8192 * 128;
+        let mut resp_buf = vec![0u8; MAX_RESPONSE];
+
+        // Send and receive
+        let result = unsafe {
+            qb_ipcc_sendv_recv(
+                self.conn,
+                iov.as_ptr(),
+                iov.len() as u32,
+                resp_buf.as_mut_ptr() as *mut libc::c_void,
+                resp_buf.len(),
+                timeout_ms,
+            )
+        };
+
+        if result < 0 {
+            return Err(format!("qb_ipcc_sendv_recv failed: {}", -result));
+        }
+
+        let bytes_received = result as usize;
+
+        // Parse response header
+        if bytes_received < std::mem::size_of::<QbIpcResponseHeader>() {
+            return Err("Response too short".to_string());
+        }
+
+        let resp_header = unsafe { *(resp_buf.as_ptr() as *const QbIpcResponseHeader) };
+
+        // Verify response ID matches request
+        if resp_header.id != request_id {
+            return Err(format!(
+                "Response ID mismatch: expected {}, got {}",
+                request_id, resp_header.id
+            ));
+        }
+
+        // Extract data
+        let data_start = std::mem::size_of::<QbIpcResponseHeader>();
+        let data = resp_buf[data_start..bytes_received].to_vec();
+
+        Ok((resp_header.error, data))
+    }
+}
+
+impl Drop for QbIpcClient {
+    fn drop(&mut self) {
+        unsafe {
+            qb_ipcc_disconnect(self.conn);
+        }
+    }
+}
+
+// ============================================================================
+// Integration Test
+// ============================================================================
+
+#[test]
+#[ignore] // Run with: cargo test -- --ignored
+fn test_libqb_wire_protocol_compatibility() {
+    eprintln!("🧪 Starting wire protocol compatibility test");
+
+    // Check if libqb is available
+    eprintln!("🔍 Checking if libqb is available...");
+    if !check_libqb_available() {
+        eprintln!("⏭️  SKIP: libqb not installed");
+        eprintln!("   Install with: sudo apt-get install libqb-dev");
+        return;
+    }
+    eprintln!("✓ libqb is available");
+
+    // Start test server
+    eprintln!("🚀 Starting test server...");
+    let server_handle = start_test_server();
+    eprintln!("✓ Server thread spawned");
+
+    // Wait for server to be ready
+    eprintln!("⏳ Waiting for server initialization...");
+    wait_for_server_ready("pve2");
+    eprintln!("✓ Server is ready");
+
+    // Run tests
+    eprintln!("🧪 Running client tests...");
+    let test_result = run_client_tests();
+
+    // Cleanup
+    drop(server_handle);
+
+    // Assert results
+    assert!(
+        test_result.is_ok(),
+        "Client tests failed: {:?}",
+        test_result.err()
+    );
+}
+
+fn check_libqb_available() -> bool {
+    std::process::Command::new("pkg-config")
+        .args(["--exists", "libqb"])
+        .status()
+        .map(|s| s.success())
+        .unwrap_or(false)
+}
+
+fn start_test_server() -> thread::JoinHandle<()> {
+    use async_trait::async_trait;
+    use pmxcfs_ipc::{Handler, Request, Response, Server};
+
+    // Create test handler
+    struct TestHandler;
+
+    #[async_trait]
+    impl Handler for TestHandler {
+        fn authenticate(&self, _uid: u32, _gid: u32) -> Option<pmxcfs_ipc::Permissions> {
+            // Accept all connections with read-write access for testing
+            Some(pmxcfs_ipc::Permissions::ReadWrite)
+        }
+
+        async fn handle(&self, request: Request) -> Response {
+            match request.msg_id {
+                1 => {
+                    // CFS_IPC_GET_FS_VERSION
+                    let response_str = r#"{"version":1,"protocol":1}"#;
+                    Response::ok(response_str.as_bytes().to_vec())
+                }
+                2 => {
+                    // CFS_IPC_GET_CLUSTER_INFO
+                    let response_str = r#"{"nodes":[],"quorate":false}"#;
+                    Response::ok(response_str.as_bytes().to_vec())
+                }
+                3 => {
+                    // CFS_IPC_GET_GUEST_LIST
+                    let response_str = r#"{"data":[]}"#;
+                    Response::ok(response_str.as_bytes().to_vec())
+                }
+                _ => Response::err(-libc::EINVAL),
+            }
+        }
+    }
+
+    // Spawn server thread with tokio runtime
+    thread::spawn(move || {
+        // Initialize tracing for server (WARN level - silent on success)
+        tracing_subscriber::fmt()
+            .with_max_level(tracing::Level::WARN)
+            .with_target(false)
+            .init();
+
+        // Create tokio runtime for async server
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
+
+        rt.block_on(async {
+            let mut server = Server::new("pve2", TestHandler);
+
+            // Server uses abstract Unix socket (Linux-specific)
+            if let Err(e) = server.start() {
+                eprintln!("Server startup failed: {e}");
+                eprintln!("Error details: {e:?}");
+                panic!("Server startup failed");
+            }
+
+            // Give tokio a chance to start the acceptor task
+            tokio::task::yield_now().await;
+
+            // Block forever to keep server alive
+            std::future::pending::<()>().await;
+        });
+    })
+}
+
+/// Wait for server to be ready by checking if socket file exists
+fn wait_for_server_ready(service_name: &str) {
+    assert!(
+        wait_for_condition_blocking(
+            || {
+                // Check if socket file exists in /dev/shm
+                let socket_pattern = format!("/dev/shm/qb-{service_name}-");
+                if let Ok(entries) = std::fs::read_dir("/dev/shm") {
+                    for entry in entries.flatten() {
+                        if let Ok(name) = entry.file_name().into_string()
+                            && name.starts_with(&socket_pattern)
+                        {
+                            return true;
+                        }
+                    }
+                }
+                false
+            },
+            Duration::from_secs(5),
+            Duration::from_millis(10),
+        ),
+        "Server should be ready within 5 seconds"
+    );
+}
+
+fn run_client_tests() -> Result<(), String> {
+    // Enable libqb debug logging to see what's happening
+    eprintln!("🔧 Enabling libqb debug logging...");
+    unsafe {
+        let name = CString::new("qb_test").unwrap();
+        qb_log_init(name.as_ptr(), libc::LOG_USER, LOG_TRACE);
+        qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, 1);
+        // Enable all log messages from all files at TRACE level
+        let all_files = CString::new("*").unwrap();
+        qb_log_filter_ctl(
+            QB_LOG_STDERR,
+            QB_LOG_FILTER_ADD,
+            QB_LOG_FILTER_FILE,
+            all_files.as_ptr(),
+            LOG_TRACE,
+        );
+    }
+    eprintln!("✓ libqb logging enabled (TRACE level)");
+
+    eprintln!("📡 Connecting to server...");
+    // Connect to abstract socket "pve2"
+    // Use a very large buffer size to rule out space issues
+    let client = QbIpcClient::connect("pve2", 8192 * 1024)?; // 8MB instead of 1MB
+    eprintln!("✓ Connected successfully");
+
+    eprintln!("🧪 Test 1: GET_FS_VERSION");
+    // Test 1: GET_FS_VERSION
+    let (error, data) = client.send_recv(1, &[], 5000)?;
+    eprintln!("✓ Got response: error={}, data_len={}", error, data.len());
+    if error == 0 {
+        let response = String::from_utf8_lossy(&data);
+        eprintln!("  Response: {response}");
+        assert!(
+            response.contains("version"),
+            "Response should contain version field"
+        );
+    }
+
+    eprintln!("🧪 Test 2: GET_CLUSTER_INFO");
+    // Test 2: GET_CLUSTER_INFO
+    let (error, data) = client.send_recv(2, &[], 5000)?;
+    eprintln!("✓ Got response: error={}, data_len={}", error, data.len());
+    if error == 0 {
+        let response = String::from_utf8_lossy(&data);
+        eprintln!("  Response: {response}");
+        assert!(
+            response.contains("nodes"),
+            "Response should contain nodes field"
+        );
+    }
+
+    eprintln!("🧪 Test 3: Request with data payload");
+    // Test 3: Request with data payload
+    let test_payload = b"test_payload_data";
+    let (_error, _data) = client.send_recv(1, test_payload, 5000)?;
+    eprintln!("✓ Request with payload succeeded");
+
+    eprintln!("🧪 Test 4: GET_GUEST_LIST");
+    // Test 4: GET_GUEST_LIST
+    let (_error, _data) = client.send_recv(3, &[], 5000)?;
+    eprintln!("✓ GET_GUEST_LIST succeeded");
+
+    Ok(())
+}
-- 
2.47.3





More information about the pve-devel mailing list