[pve-devel] [PATCH pve9-rrd-migration-tool 1/1] introduce rrd migration tool for pve8 -> pve9
Aaron Lauterer
a.lauterer at proxmox.com
Fri May 23 18:00:13 CEST 2025
Signed-off-by: Aaron Lauterer <a.lauterer at proxmox.com>
---
.cargo/config.toml | 8 +
.gitignore | 5 +
Cargo.toml | 20 ++
build.rs | 29 +++
src/lib.rs | 5 +
src/main.rs | 504 ++++++++++++++++++++++++++++++++++++++++
src/parallel_handler.rs | 162 +++++++++++++
wrapper.h | 1 +
8 files changed, 734 insertions(+)
create mode 100644 .cargo/config.toml
create mode 100644 .gitignore
create mode 100644 Cargo.toml
create mode 100644 build.rs
create mode 100644 src/lib.rs
create mode 100644 src/main.rs
create mode 100644 src/parallel_handler.rs
create mode 100644 wrapper.h
diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..a439c97
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,8 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
+
+[profile.release]
+debug=true
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7741e63
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+./target
+./build
+
+Cargo.lock
+
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+ "Aaron Lauterer <a.lauterer at proxmox.com>",
+ "Proxmox Support Team <support at proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com"
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+ println!("cargo:rustc-link-lib=rrd");
+
+ println!("cargo:rerun-if-changed=wrapper.h");
+ // The bindgen::Builder is the main entry point
+ // to bindgen, and lets you build up options for
+ // the resulting bindings.
+
+ let bindings = bindgen::Builder::default()
+ // The input header we would like to generate
+ // bindings for.
+ .header("wrapper.h")
+ // Tell cargo to invalidate the built crate whenever any of the
+ // included header files changed.
+ .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+ // Finish the builder and generate the bindings.
+ .generate()
+ // Unwrap the Result and panic on failure.
+ .expect("Unable to generate bindings");
+
+ // Write the bindings to the $OUT_DIR/bindings.rs file.
+ let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+ bindings
+ .write_to_file(out_path.join("bindings.rs"))
+ .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..43f181c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,504 @@
+use anyhow::{bail, Error, Result};
+use proxmox_rrd_migration_8_9::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
+use std::ffi::{CStr, CString, OsString};
+use std::fs;
+use std::os::unix::ffi::OsStrExt;
+use std::os::unix::fs::PermissionsExt;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve9-node";
+const TARGET_SUBDIR_GUEST: &str = "pve9-vm";
+const TARGET_SUBDIR_STORAGE: &str = "pve9-storage";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
+// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:maxmem:GAUGE:120:0:U",
+ c"DS:mem:GAUGE:120:0:U",
+ c"DS:maxdisk:GAUGE:120:0:U",
+ c"DS:disk:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:diskread:DERIVE:120:0:U",
+ c"DS:diskwrite:DERIVE:120:0:U",
+ c"DS:memhost:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressurecpufull:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 29] = [
+ c"DS:loadavg:GAUGE:120:0:U",
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:iowait:GAUGE:120:0:U",
+ c"DS:memtotal:GAUGE:120:0:U",
+ c"DS:memused:GAUGE:120:0:U",
+ c"DS:swaptotal:GAUGE:120:0:U",
+ c"DS:swapused:GAUGE:120:0:U",
+ c"DS:roottotal:GAUGE:120:0:U",
+ c"DS:rootused:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:memfree:GAUGE:120:0:U",
+ c"DS:membuffers:GAUGE:120:0:U",
+ c"DS:memcached:GAUGE:120:0:U",
+ c"DS:arcsize:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+ c"DS:total:GAUGE:120:0:U",
+ c"DS:used:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
+
+USAGE:
+ proxmox-rrd-migration [OPTIONS]
+
+ FLAGS:
+ -h, --help Prints this help information
+
+ OPTIONS:
+ --force Migrate, even if the target already exists.
+ This will overwrite any migrated RRD files!
+
+ --threads THREADS Number of paralell threads. Default from 1 to 4.
+
+ --test For internal use only.
+ Tests parallel guest migration only!
+ --source For internal use only. Source directory.
+ --target For internal use only. Target directory.
+ ";
+
+#[derive(Debug)]
+struct Args {
+ force: bool,
+ threads: Option<usize>,
+ test: bool,
+ source: Option<PathBuf>,
+ target: Option<PathBuf>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+ let mut pargs = pico_args::Arguments::from_env();
+
+ // Help has a higher priority and should be handled separately.
+ if pargs.contains(["-h", "--help"]) {
+ print!("{}", HELP);
+ std::process::exit(0);
+ }
+
+ let mut args = Args {
+ threads: pargs.opt_value_from_str("--threads").unwrap(),
+ force: false,
+ test: false,
+ source: pargs.opt_value_from_str("--source").unwrap(),
+ target: pargs.opt_value_from_str("--target").unwrap(),
+ };
+
+ if pargs.contains("--test") {
+ args.test = true;
+ }
+ if pargs.contains("--force") {
+ args.force = true;
+ }
+
+ // It's up to the caller what to do with the remaining arguments.
+ let remaining = pargs.finish();
+ if !remaining.is_empty() {
+ bail!(format!("Warning: unused arguments left: {:?}", remaining));
+ }
+
+ Ok(args)
+}
+
+fn main() {
+ let args = match parse_args() {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("Error: {}.", e);
+ std::process::exit(1);
+ }
+ };
+
+ let mut source_dir_guests: PathBuf = [BASE_DIR, SOURCE_SUBDIR_GUEST].iter().collect();
+ let mut target_dir_guests: PathBuf = [BASE_DIR, TARGET_SUBDIR_GUEST].iter().collect();
+ let source_dir_nodes: PathBuf = [BASE_DIR, SOURCE_SUBDIR_NODE].iter().collect();
+ let target_dir_nodes: PathBuf = [BASE_DIR, TARGET_SUBDIR_NODE].iter().collect();
+ let source_dir_storage: PathBuf = [BASE_DIR, SOURCE_SUBDIR_STORAGE].iter().collect();
+ let target_dir_storage: PathBuf = [BASE_DIR, TARGET_SUBDIR_STORAGE].iter().collect();
+
+ if args.test {
+ source_dir_guests = args.source.clone().unwrap();
+ target_dir_guests = args.target.clone().unwrap();
+ }
+
+ if !args.force && target_dir_guests.exists() {
+ eprintln!(
+ "Aborting! Target path for guests already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+ if !args.force && target_dir_nodes.exists() {
+ eprintln!(
+ "Aborting! Target path for nodes already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+ if !args.force && target_dir_storage.exists() {
+ eprintln!(
+ "Aborting! Target path for storages already exists. Use '--force' to still migrate. It will overwrite existing files!"
+ );
+ std::process::exit(1);
+ }
+
+ if !args.test {
+ if let Err(e) = migrate_nodes(source_dir_nodes, target_dir_nodes) {
+ eprintln!("Error migrating nodes: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_storage(source_dir_storage, target_dir_storage) {
+ eprintln!("Error migrating storage: {}", e);
+ std::process::exit(1);
+ }
+ }
+ if let Err(e) = migrate_guests(source_dir_guests, target_dir_guests, set_threads(&args)) {
+ eprintln!("Error migrating guests: {}", e);
+ std::process::exit(1);
+ }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+/// based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+ if args.threads.is_some() {
+ return args.threads.unwrap();
+ }
+ // check for a way to get physical cores and not threads?
+ let cpus: usize = String::from_utf8_lossy(
+ std::process::Command::new("nproc")
+ .output()
+ .expect("Error running nproc")
+ .stdout
+ .as_slice()
+ .trim_ascii(),
+ )
+ .parse::<usize>()
+ .expect("Could not parse nproc output");
+
+ if cpus < 32 {
+ let threads = cpus / 8;
+ if threads == 0 {
+ return 1;
+ }
+ return threads;
+ }
+ return MAX_THREADS;
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+ source_dir_guests: PathBuf,
+ target_dir_guests: PathBuf,
+ threads: usize,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for guests…");
+ println!("Using {} thread(s)", threads);
+
+ let mut guest_source_files: Vec<(CString, OsString)> = Vec::new();
+
+ fs::read_dir(&source_dir_guests)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ guest_source_files.push((path, fname))
+ });
+ if !target_dir_guests.exists() {
+ println!("Creating new directory: '{}'", target_dir_guests.display());
+ std::fs::create_dir(&target_dir_guests)?;
+ }
+
+ let total_guests = guest_source_files.len();
+ let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let guests2 = guests.clone();
+ let start_time = std::time::SystemTime::now();
+
+ let migration_pool = ParallelHandler::new(
+ "guest rrd migration",
+ threads,
+ move |(path, fname): (CString, OsString)| {
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+ source[0] = path.as_ptr();
+
+ let node_name = fname;
+ let mut target_path = target_dir_guests.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_VM_DEF.len() as i32,
+ RRD_VM_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ if current_guests > 0 && current_guests % 200 == 0 {
+ println!("Migrated {} of {} guests", current_guests, total_guests);
+ }
+ Ok(())
+ },
+ );
+ let migration_channel = migration_pool.channel();
+
+ for file in guest_source_files {
+ let migration_channel = migration_channel.clone();
+ migration_channel.send(file)?;
+ }
+
+ drop(migration_channel);
+ migration_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+ let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+ println!("Migrated {} guests in {:.2}s", guests, elapsed,);
+
+ Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(source_dir_nodes: PathBuf, target_dir_nodes: PathBuf) -> Result<(), Error> {
+ println!("Migrating RRD data for nodes…");
+
+ if !target_dir_nodes.exists() {
+ println!("Creating new directory: '{}'", target_dir_nodes.display());
+ std::fs::create_dir(&target_dir_nodes)?;
+ }
+
+ let mut node_source_files: Vec<(CString, OsString)> = Vec::new();
+ fs::read_dir(&source_dir_nodes)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ node_source_files.push((path, fname))
+ });
+
+ for file in node_source_files {
+ println!("Node: '{}'", PathBuf::from(file.1.clone()).display());
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+ source[0] = file.0.as_ptr();
+
+ let node_name = file.1;
+ let mut target_path = target_dir_nodes.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_NODE_DEF.len() as i32,
+ RRD_NODE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ }
+ println!("Migrated all nodes");
+
+ Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(source_dir_storage: PathBuf, target_dir_storage: PathBuf) -> Result<(), Error> {
+ println!("Migrating RRD data for storages…");
+
+ if !target_dir_storage.exists() {
+ println!("Creating new directory: '{}'", target_dir_storage.display());
+ std::fs::create_dir(&target_dir_storage)?;
+ }
+
+ // storage has another layer of directories per node over which we need to iterate
+ fs::read_dir(&source_dir_storage)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_dir())
+ .try_for_each(|node| {
+ let mut storage_source_files: Vec<(CString, OsString)> = Vec::new();
+
+ let mut source_node_subdir = source_dir_storage.clone();
+ source_node_subdir.push(&node.file_name().unwrap());
+
+ let mut target_node_subdir = target_dir_storage.clone();
+ target_node_subdir.push(&node.file_name().unwrap());
+
+ fs::create_dir(target_node_subdir.as_path())?;
+ let metadata = target_node_subdir.metadata()?;
+ let mut permissions = metadata.permissions();
+ permissions.set_mode(0o755);
+
+ fs::read_dir(&source_node_subdir)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ storage_source_files.push((path, fname))
+ });
+
+ for file in storage_source_files {
+ println!("Storage: '{}'", PathBuf::from(file.1.clone()).display());
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+
+ source[0] = file.0.as_ptr();
+
+ let node_name = file.1;
+ let mut target_path = target_node_subdir.clone();
+ target_path.push(node_name);
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ RRD_STORAGE_DEF.len() as i32,
+ RRD_STORAGE_DEF.map(|v| v.as_ptr()).as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ }
+ Ok(())
+ })?;
+ println!("Migrated all nodes");
+
+ Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..787742a
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,162 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{Error, bail, format_err};
+use crossbeam_channel::{Sender, bounded};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{} ({})", name, i))
+ .spawn(move || {
+ loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
--
2.39.5
More information about the pve-devel
mailing list