[pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool
Aaron Lauterer
a.lauterer at proxmox.com
Sat Jul 26 03:05:56 CEST 2025
This tool is intended to migrate the Proxmox VE (PVE) RRD data files to
the new schema.
Up until PVE8 the schema has been the same for a long time. With PVE9 we
introduced new columns to guests (vm) and nodes. We also switched all
types (vm, node, storate) to the same aggregation schemas as we do it in
PBS.
The result of both are a much finer resolution for long time spans, but
also larger RRD files.
* node: 79K -> 1.4M
* vm: 66K -> 1.3m
* storage: 14K -> 156K
The old directories for VMs used to be in `/var/lib/rrdcached/db/` with
the following sub directories:
* nodes: `pve2-node`
* guests (VM/CT): `pve2-vm`
* storage: `pve2-storage`
With this change we also introduce a new key schema, that makes it
easier in the future to introduce new ones. Instead of the
`pve{version}-{type}` we are switching to `pve-{type}-{version}`.
This enables us to add new columns with a new version, without breaking
nodes that are not yet updated. We are NOT allowed to remove or re-use
existing columns. That would be a breaking change.
We are currently at version 9.0. But in the future, if needed, this tool
can be adapted to do other migrations too.
For example, {old, 9.0} -> 9.2, should that be necessary.
The actual migration is handled by `librrd` to which we pass the path to
the old and new files, and the new RRD definitions. The `rrd_create_r2`
call then does the hard work of migrating and converting exisiting data
into the new file and aggregation schema.
This can take some time. Quick tests on a Ryzen 7900X with the following
files:
* 1 node RRD file
* 10k vm RRD files
* 1 storage RRD file
showed the folling results:
* 1 thread: 179.61s user 14.82s system 100% cpu 3:14.17 total
* 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total
This is why we do not migrate inline, but have it as a separate step
during package upgrades.
Behavior: By default nothing will be changed and a dry or test run will
happen.
Only if the `--migrate` parameter is added will the actual migration be
done.
For each found RRD source file, the tool checks if a matching target
file already exists. By default, those will be skipped to not overwrite
target files that might already store newer data.
With the `--force` parameter this can be changed.
That means, one can run the tool multiple times (without --force) and it
will pick up where it might have left off. For example it the migration
was interrupted for some reason.
Once a source file has been processed it will be renamed with the `.old`
appendix. It will be excluded from future runs as we check for files
without an extension.
The tool has some simple heuristic to determine how many threads should
be used. Be default the range is between 1 to 4 threads. But the
`--threads` parameter allows a manual override.
Signed-off-by: Aaron Lauterer <a.lauterer at proxmox.com>
---
.cargo/config.toml | 5 +
.gitignore | 9 +
Cargo.toml | 20 ++
build.rs | 29 ++
src/lib.rs | 5 +
src/main.rs | 567 ++++++++++++++++++++++++++++++++++++++++
src/parallel_handler.rs | 160 ++++++++++++
wrapper.h | 1 +
8 files changed, 796 insertions(+)
create mode 100644 .cargo/config.toml
create mode 100644 .gitignore
create mode 100644 Cargo.toml
create mode 100644 build.rs
create mode 100644 src/lib.rs
create mode 100644 src/main.rs
create mode 100644 src/parallel_handler.rs
create mode 100644 wrapper.h
diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 0000000..3b5b6e4
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,5 @@
+[source]
+[source.debian-packages]
+directory = "/usr/share/cargo/registry"
+[source.crates-io]
+replace-with = "debian-packages"
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..06ac1a1
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,9 @@
+*.build
+*.buildinfo
+*.changes
+*.deb
+*.dsc
+*.tar*
+target/
+/Cargo.lock
+/proxmox-rrd-migration-tool-[0-9]*/
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..d3523f3
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox_rrd_migration_8-9"
+version = "0.1.0"
+edition = "2021"
+authors = [
+ "Aaron Lauterer <a.lauterer at proxmox.com>",
+ "Proxmox Support Team <support at proxmox.com>",
+]
+license = "AGPL-3"
+homepage = "https://www.proxmox.com"
+
+[dependencies]
+anyhow = "1.0.86"
+pico-args = "0.5.0"
+proxmox-async = "0.4"
+crossbeam-channel = "0.5"
+
+[build-dependencies]
+bindgen = "0.66.1"
+pkg-config = "0.3"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..56d07cc
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,29 @@
+use std::env;
+use std::path::PathBuf;
+
+fn main() {
+ println!("cargo:rustc-link-lib=rrd");
+
+ println!("cargo:rerun-if-changed=wrapper.h");
+ // The bindgen::Builder is the main entry point
+ // to bindgen, and lets you build up options for
+ // the resulting bindings.
+
+ let bindings = bindgen::Builder::default()
+ // The input header we would like to generate
+ // bindings for.
+ .header("wrapper.h")
+ // Tell cargo to invalidate the built crate whenever any of the
+ // included header files changed.
+ .parse_callbacks(Box::new(bindgen::CargoCallbacks))
+ // Finish the builder and generate the bindings.
+ .generate()
+ // Unwrap the Result and panic on failure.
+ .expect("Unable to generate bindings");
+
+ // Write the bindings to the $OUT_DIR/bindings.rs file.
+ let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
+ bindings
+ .write_to_file(out_path.join("bindings.rs"))
+ .expect("Couldn't write bindings!");
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..a38a13a
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,5 @@
+#![allow(non_upper_case_globals)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+
+include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..5e6418c
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,567 @@
+use anyhow::{bail, Error, Result};
+use std::{
+ ffi::{CStr, CString, OsString},
+ fs,
+ os::unix::{ffi::OsStrExt, fs::PermissionsExt},
+ path::{Path, PathBuf},
+ sync::Arc,
+};
+
+use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
+
+use crate::parallel_handler::ParallelHandler;
+
+pub mod parallel_handler;
+
+const BASE_DIR: &str = "/var/lib/rrdcached/db";
+const SOURCE_SUBDIR_NODE: &str = "pve2-node";
+const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
+const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
+const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
+const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
+const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
+const RESOURCE_BASE_DIR: &str = "/etc/pve";
+const MAX_THREADS: usize = 4;
+const RRD_STEP_SIZE: usize = 60;
+
+type File = (CString, OsString);
+
+// RRAs are defined in the following way:
+//
+// RRA:CF:xff:step:rows
+// CF: AVERAGE or MAX
+// xff: 0.5
+// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
+// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
+// rows: how many aggregated rows are kept, as in how far back in time we store data
+//
+// how many seconds are aggregated per RRA: steps * stepsize * rows
+// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
+// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
+// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
+
+const RRD_VM_DEF: [&CStr; 25] = [
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:maxmem:GAUGE:120:0:U",
+ c"DS:mem:GAUGE:120:0:U",
+ c"DS:maxdisk:GAUGE:120:0:U",
+ c"DS:disk:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:diskread:DERIVE:120:0:U",
+ c"DS:diskwrite:DERIVE:120:0:U",
+ c"DS:memhost:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressurecpufull:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_NODE_DEF: [&CStr; 27] = [
+ c"DS:loadavg:GAUGE:120:0:U",
+ c"DS:maxcpu:GAUGE:120:0:U",
+ c"DS:cpu:GAUGE:120:0:U",
+ c"DS:iowait:GAUGE:120:0:U",
+ c"DS:memtotal:GAUGE:120:0:U",
+ c"DS:memused:GAUGE:120:0:U",
+ c"DS:swaptotal:GAUGE:120:0:U",
+ c"DS:swapused:GAUGE:120:0:U",
+ c"DS:roottotal:GAUGE:120:0:U",
+ c"DS:rootused:GAUGE:120:0:U",
+ c"DS:netin:DERIVE:120:0:U",
+ c"DS:netout:DERIVE:120:0:U",
+ c"DS:memfree:GAUGE:120:0:U",
+ c"DS:arcsize:GAUGE:120:0:U",
+ c"DS:pressurecpusome:GAUGE:120:0:U",
+ c"DS:pressureiosome:GAUGE:120:0:U",
+ c"DS:pressureiofull:GAUGE:120:0:U",
+ c"DS:pressurememorysome:GAUGE:120:0:U",
+ c"DS:pressurememoryfull:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const RRD_STORAGE_DEF: [&CStr; 10] = [
+ c"DS:total:GAUGE:120:0:U",
+ c"DS:used:GAUGE:120:0:U",
+ c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
+ c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
+ c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
+ c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
+ c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
+];
+
+const HELP: &str = "\
+proxmox-rrd-migration tool
+
+Migrates existing RRD graph data to the new format.
+
+Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
+
+USAGE:
+ proxmox-rrd-migration [OPTIONS]
+
+ FLAGS:
+ -h, --help Prints this help information
+
+ OPTIONS:
+ --migrate Start the migration. Without it, only a dry run will be done.
+
+ --force Migrate, even if the target already exists.
+ This will overwrite any migrated RRD files!
+
+ --threads THREADS Number of paralell threads.
+
+ --source <SOURCE DIR> Source base directory. Mainly for tests!
+ Default: /var/lib/rrdcached/db
+
+ --target <TARGET DIR> Target base directory. Mainly for tests!
+ Default: /var/lib/rrdcached/db
+
+ --resources <DIR> Directory that contains .vmlist and .member files. Mainly for tests!
+ Default: /etc/pve
+
+";
+
+#[derive(Debug)]
+struct Args {
+ migrate: bool,
+ force: bool,
+ threads: Option<usize>,
+ source: Option<String>,
+ target: Option<String>,
+ resources: Option<String>,
+}
+
+fn parse_args() -> Result<Args, Error> {
+ let mut pargs = pico_args::Arguments::from_env();
+
+ // Help has a higher priority and should be handled separately.
+ if pargs.contains(["-h", "--help"]) {
+ print!("{}", HELP);
+ std::process::exit(0);
+ }
+
+ let mut args = Args {
+ migrate: false,
+ threads: pargs
+ .opt_value_from_str("--threads")
+ .expect("Could not parse --threads parameter"),
+ force: false,
+ source: pargs
+ .opt_value_from_str("--source")
+ .expect("Could not parse --source parameter"),
+ target: pargs
+ .opt_value_from_str("--target")
+ .expect("Could not parse --target parameter"),
+ resources: pargs
+ .opt_value_from_str("--resources")
+ .expect("Could not parse --resources parameter"),
+ };
+
+ if pargs.contains("--migrate") {
+ args.migrate = true;
+ }
+ if pargs.contains("--force") {
+ args.force = true;
+ }
+
+ // It's up to the caller what to do with the remaining arguments.
+ let remaining = pargs.finish();
+ if !remaining.is_empty() {
+ bail!(format!("Warning: unused arguments left: {:?}", remaining));
+ }
+
+ Ok(args)
+}
+
+fn main() {
+ let args = match parse_args() {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("Error: {}.", e);
+ std::process::exit(1);
+ }
+ };
+
+ let source_base_dir = match args.source {
+ Some(ref v) => v.as_str(),
+ None => BASE_DIR,
+ };
+
+ let target_base_dir = match args.target {
+ Some(ref v) => v.as_str(),
+ None => BASE_DIR,
+ };
+
+ let resource_base_dir = match args.resources {
+ Some(ref v) => v.as_str(),
+ None => RESOURCE_BASE_DIR,
+ };
+
+ let source_dir_guests: PathBuf = [source_base_dir, SOURCE_SUBDIR_GUEST].iter().collect();
+ let target_dir_guests: PathBuf = [target_base_dir, TARGET_SUBDIR_GUEST].iter().collect();
+ let source_dir_nodes: PathBuf = [source_base_dir, SOURCE_SUBDIR_NODE].iter().collect();
+ let target_dir_nodes: PathBuf = [target_base_dir, TARGET_SUBDIR_NODE].iter().collect();
+ let source_dir_storage: PathBuf = [source_base_dir, SOURCE_SUBDIR_STORAGE].iter().collect();
+ let target_dir_storage: PathBuf = [target_base_dir, TARGET_SUBDIR_STORAGE].iter().collect();
+
+ if !args.migrate {
+ println!("DRYRUN! Use the --migrate parameter to start the migration.");
+ }
+ if args.force {
+ println!("Force mode! Will overwrite existing target RRD files!");
+ }
+
+ if let Err(e) = migrate_nodes(
+ source_dir_nodes,
+ target_dir_nodes,
+ resource_base_dir,
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating nodes: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_storage(
+ source_dir_storage,
+ target_dir_storage,
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating storage: {}", e);
+ std::process::exit(1);
+ }
+ if let Err(e) = migrate_guests(
+ source_dir_guests,
+ target_dir_guests,
+ resource_base_dir,
+ set_threads(&args),
+ args.migrate,
+ args.force,
+ ) {
+ eprintln!("Error migrating guests: {}", e);
+ std::process::exit(1);
+ }
+}
+
+/// Set number of threads
+///
+/// Either a fixed parameter or determining a range between 1 to 4 threads
+/// based on the number of CPU cores available in the system.
+fn set_threads(args: &Args) -> usize {
+ if let Some(threads) = args.threads {
+ return threads;
+ }
+
+ // check for a way to get physical cores and not threads?
+ let cpus: usize = String::from_utf8_lossy(
+ std::process::Command::new("nproc")
+ .output()
+ .expect("Error running nproc")
+ .stdout
+ .as_slice()
+ .trim_ascii(),
+ )
+ .parse::<usize>()
+ .expect("Could not parse nproc output");
+
+ if cpus < 32 {
+ let threads = cpus / 8;
+ if threads == 0 {
+ return 1;
+ }
+ return threads;
+ }
+ MAX_THREADS
+}
+
+/// Check if a VMID is currently configured
+fn resource_present(path: &str, resource: &str) -> Result<bool> {
+ let resourcelist = fs::read_to_string(path)?;
+ Ok(resourcelist.contains(format!("\"{resource}\"").as_str()))
+}
+
+/// Rename file to old, when migrated or resource not present at all -> old RRD file
+fn mv_old(file: &str) -> Result<()> {
+ let old = format!("{}.old", file);
+ fs::rename(file, old)?;
+ Ok(())
+}
+
+/// Colllect all RRD files in the provided directory
+fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> {
+ let mut files: Vec<(CString, OsString)> = Vec::new();
+
+ fs::read_dir(location)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_file() && f.extension().is_none())
+ .for_each(|file| {
+ let path = CString::new(file.as_path().as_os_str().as_bytes())
+ .expect("Could not convert path to CString.");
+ let fname = file
+ .file_name()
+ .map(|v| v.to_os_string())
+ .expect("Could not convert fname to OsString.");
+ files.push((path, fname))
+ });
+ Ok(files)
+}
+
+/// Does the actual migration for the given file
+fn do_rrd_migration(
+ file: File,
+ target_location: &Path,
+ rrd_def: &[&CStr],
+ migrate: bool,
+ force: bool,
+) -> Result<()> {
+ if !migrate {
+ println!("would migrate but in dry run mode");
+ }
+
+ let resource = file.1;
+ let mut target_path = target_location.to_path_buf();
+ target_path.push(resource);
+
+ if target_path.exists() && !force {
+ println!(
+ "already migrated, use --force to overwrite target file: {}",
+ target_path.display()
+ );
+ }
+
+ if !migrate || (target_path.exists() && !force) {
+ bail!("skipping");
+ }
+
+ let mut source: [*const i8; 2] = [std::ptr::null(); 2];
+ source[0] = file.0.as_ptr();
+
+ let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
+
+ unsafe {
+ rrd_get_context();
+ rrd_clear_error();
+ let res = rrd_create_r2(
+ target_path.as_ptr(),
+ RRD_STEP_SIZE as u64,
+ 0,
+ 0,
+ source.as_mut_ptr(),
+ std::ptr::null(),
+ rrd_def.len() as i32,
+ rrd_def
+ .iter()
+ .map(|v| v.as_ptr())
+ .collect::<Vec<_>>()
+ .as_mut_ptr(),
+ );
+ if res != 0 {
+ bail!(
+ "RRD create Error: {}",
+ CStr::from_ptr(rrd_get_error()).to_string_lossy()
+ );
+ }
+ }
+ Ok(())
+}
+
+/// Migrate guest RRD files
+///
+/// In parallel to speed up the process as most time is spent on converting the
+/// data to the new format.
+fn migrate_guests(
+ source_dir_guests: PathBuf,
+ target_dir_guests: PathBuf,
+ resources: &str,
+ threads: usize,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for guests…");
+ println!("Using {} thread(s)", threads);
+
+ let guest_source_files = collect_rrd_files(&source_dir_guests)?;
+
+ if !target_dir_guests.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_guests.display());
+ std::fs::create_dir(&target_dir_guests)?;
+ }
+
+ let total_guests = guest_source_files.len();
+ let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let guests2 = guests.clone();
+ let start_time = std::time::SystemTime::now();
+
+ let migration_pool = ParallelHandler::new(
+ "guest rrd migration",
+ threads,
+ move |file: (CString, OsString)| {
+ let full_path = file.0.clone().into_string().unwrap();
+
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_dir_guests,
+ RRD_VM_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ if current_guests > 0 && current_guests % 200 == 0 {
+ println!("Migrated {} of {} guests", current_guests, total_guests);
+ }
+ }
+ Ok(())
+ },
+ );
+ let migration_channel = migration_pool.channel();
+
+ for file in guest_source_files {
+ let node = file.1.clone().into_string().unwrap();
+ if !resource_present(format!("{resources}/.vmlist").as_str(), node.as_str())? {
+ println!("VMID: '{node}' not present. Skip and mark as old.");
+ mv_old(format!("{}", file.0.to_string_lossy()).as_str())?;
+ }
+ let migration_channel = migration_channel.clone();
+ migration_channel.send(file)?;
+ }
+
+ drop(migration_channel);
+ migration_pool.complete()?;
+
+ let elapsed = start_time.elapsed()?.as_secs_f64();
+ let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
+ println!("Migrated {} guests", guests);
+ println!("It took {:.2}s", elapsed);
+
+ Ok(())
+}
+
+/// Migrate node RRD files
+///
+/// In serial as the number of nodes will not be high.
+fn migrate_nodes(
+ source_dir_nodes: PathBuf,
+ target_dir_nodes: PathBuf,
+ resources: &str,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for nodes…");
+
+ if !target_dir_nodes.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_nodes.display());
+ std::fs::create_dir(&target_dir_nodes)?;
+ }
+
+ let node_source_files = collect_rrd_files(&source_dir_nodes)?;
+
+ for file in node_source_files {
+ let node = file.1.clone().into_string().unwrap();
+ let full_path = file.0.clone().into_string().unwrap();
+ println!("Node: '{node}'");
+ if !resource_present(format!("{resources}/.members").as_str(), node.as_str())? {
+ println!("Node: '{node}' not present. Skip and mark as old.");
+ mv_old(format!("{}/{}", file.0.to_string_lossy(), node).as_str())?;
+ }
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_dir_nodes,
+ RRD_NODE_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ }
+ }
+ println!("Migrated all nodes");
+
+ Ok(())
+}
+
+/// Migrate storage RRD files
+///
+/// In serial as the number of storage will not be that high.
+fn migrate_storage(
+ source_dir_storage: PathBuf,
+ target_dir_storage: PathBuf,
+ migrate: bool,
+ force: bool,
+) -> Result<(), Error> {
+ println!("Migrating RRD data for storages…");
+
+ if !target_dir_storage.exists() && migrate {
+ println!("Creating new directory: '{}'", target_dir_storage.display());
+ std::fs::create_dir(&target_dir_storage)?;
+ }
+
+ // storage has another layer of directories per node over which we need to iterate
+ fs::read_dir(&source_dir_storage)?
+ .filter(|f| f.is_ok())
+ .map(|f| f.unwrap().path())
+ .filter(|f| f.is_dir())
+ .try_for_each(|node| {
+ let mut source_storage_subdir = source_dir_storage.clone();
+ source_storage_subdir.push(node.file_name().unwrap());
+
+ let mut target_storage_subdir = target_dir_storage.clone();
+ target_storage_subdir.push(node.file_name().unwrap());
+
+ if !target_storage_subdir.exists() && migrate {
+ fs::create_dir(target_storage_subdir.as_path())?;
+ let metadata = target_storage_subdir.metadata()?;
+ let mut permissions = metadata.permissions();
+ permissions.set_mode(0o755);
+ }
+
+ let storage_source_files = collect_rrd_files(&source_storage_subdir)?;
+
+ for file in storage_source_files {
+ println!(
+ "Storage: '{}/{}'",
+ node.file_name()
+ .expect("no file name present")
+ .to_string_lossy(),
+ PathBuf::from(file.1.clone()).display()
+ );
+
+ let full_path = file.0.clone().into_string().unwrap();
+ if let Ok(()) = do_rrd_migration(
+ file,
+ &target_storage_subdir,
+ RRD_STORAGE_DEF.as_slice(),
+ migrate,
+ force,
+ ) {
+ mv_old(full_path.as_str())?;
+ }
+ }
+ Ok::<(), Error>(())
+ })?;
+ println!("Migrated all storages");
+
+ Ok(())
+}
diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
new file mode 100644
index 0000000..d8ee3c7
--- /dev/null
+++ b/src/parallel_handler.rs
@@ -0,0 +1,160 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, Sender};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{} ({})", name, i))
+ .spawn(move || loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
diff --git a/wrapper.h b/wrapper.h
new file mode 100644
index 0000000..64d0aa6
--- /dev/null
+++ b/wrapper.h
@@ -0,0 +1 @@
+#include <rrd.h>
--
2.39.5
More information about the pve-devel
mailing list