[pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool
Lukas Wagner
l.wagner at proxmox.com
Mon Jul 28 16:25:15 CEST 2025
Hey Aaron,
some comments inline.
On Sat Jul 26, 2025 at 3:05 AM CEST, Aaron Lauterer wrote:
> This tool is intended to migrate the Proxmox VE (PVE) RRD data files to
> the new schema.
>
> Up until PVE8 the schema has been the same for a long time. With PVE9 we
> introduced new columns to guests (vm) and nodes. We also switched all
> types (vm, node, storate) to the same aggregation schemas as we do it in
> PBS.
> The result of both are a much finer resolution for long time spans, but
> also larger RRD files.
>
> * node: 79K -> 1.4M
> * vm: 66K -> 1.3m
> * storage: 14K -> 156K
>
> The old directories for VMs used to be in `/var/lib/rrdcached/db/` with
> the following sub directories:
>
> * nodes: `pve2-node`
> * guests (VM/CT): `pve2-vm`
> * storage: `pve2-storage`
>
> With this change we also introduce a new key schema, that makes it
> easier in the future to introduce new ones. Instead of the
> `pve{version}-{type}` we are switching to `pve-{type}-{version}`.
>
> This enables us to add new columns with a new version, without breaking
> nodes that are not yet updated. We are NOT allowed to remove or re-use
> existing columns. That would be a breaking change.
> We are currently at version 9.0. But in the future, if needed, this tool
> can be adapted to do other migrations too.
> For example, {old, 9.0} -> 9.2, should that be necessary.
>
> The actual migration is handled by `librrd` to which we pass the path to
> the old and new files, and the new RRD definitions. The `rrd_create_r2`
> call then does the hard work of migrating and converting exisiting data
> into the new file and aggregation schema.
>
> This can take some time. Quick tests on a Ryzen 7900X with the following
> files:
> * 1 node RRD file
> * 10k vm RRD files
> * 1 storage RRD file
>
> showed the folling results:
>
> * 1 thread: 179.61s user 14.82s system 100% cpu 3:14.17 total
> * 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total
>
> This is why we do not migrate inline, but have it as a separate step
> during package upgrades.
>
> Behavior: By default nothing will be changed and a dry or test run will
> happen.
> Only if the `--migrate` parameter is added will the actual migration be
> done.
>
> For each found RRD source file, the tool checks if a matching target
> file already exists. By default, those will be skipped to not overwrite
> target files that might already store newer data.
> With the `--force` parameter this can be changed.
>
> That means, one can run the tool multiple times (without --force) and it
> will pick up where it might have left off. For example it the migration
> was interrupted for some reason.
>
> Once a source file has been processed it will be renamed with the `.old`
> appendix. It will be excluded from future runs as we check for files
> without an extension.
>
> The tool has some simple heuristic to determine how many threads should
> be used. Be default the range is between 1 to 4 threads. But the
> `--threads` parameter allows a manual override.
>
> Signed-off-by: Aaron Lauterer <a.lauterer at proxmox.com>
> ---
> .cargo/config.toml | 5 +
> .gitignore | 9 +
> Cargo.toml | 20 ++
> build.rs | 29 ++
> src/lib.rs | 5 +
> src/main.rs | 567 ++++++++++++++++++++++++++++++++++++++++
> src/parallel_handler.rs | 160 ++++++++++++
> wrapper.h | 1 +
> 8 files changed, 796 insertions(+)
> create mode 100644 .cargo/config.toml
> create mode 100644 .gitignore
> create mode 100644 Cargo.toml
> create mode 100644 build.rs
> create mode 100644 src/lib.rs
> create mode 100644 src/main.rs
> create mode 100644 src/parallel_handler.rs
> create mode 100644 wrapper.h
>
> diff --git a/.cargo/config.toml b/.cargo/config.toml
> new file mode 100644
> index 0000000..3b5b6e4
> --- /dev/null
> +++ b/.cargo/config.toml
> @@ -0,0 +1,5 @@
> +[source]
> +[source.debian-packages]
> +directory = "/usr/share/cargo/registry"
> +[source.crates-io]
> +replace-with = "debian-packages"
> diff --git a/.gitignore b/.gitignore
> new file mode 100644
> index 0000000..06ac1a1
> --- /dev/null
> +++ b/.gitignore
> @@ -0,0 +1,9 @@
> +*.build
> +*.buildinfo
> +*.changes
> +*.deb
> +*.dsc
> +*.tar*
> +target/
> +/Cargo.lock
> +/proxmox-rrd-migration-tool-[0-9]*/
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..d3523f3
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox_rrd_migration_8-9"
> +version = "0.1.0"
> +edition = "2021"
> +authors = [
> + "Aaron Lauterer <a.lauterer at proxmox.com>",
> + "Proxmox Support Team <support at proxmox.com>",
> +]
> +license = "AGPL-3"
> +homepage = "https://www.proxmox.com"
> +
> +[dependencies]
> +anyhow = "1.0.86"
> +pico-args = "0.5.0"
> +proxmox-async = "0.4"
> +crossbeam-channel = "0.5"
> +
> +[build-dependencies]
> +bindgen = "0.66.1"
> +pkg-config = "0.3"
> diff --git a/build.rs b/build.rs
> new file mode 100644
> index 0000000..56d07cc
> --- /dev/null
> +++ b/build.rs
> @@ -0,0 +1,29 @@
> +use std::env;
> +use std::path::PathBuf;
> +
> +fn main() {
> + println!("cargo:rustc-link-lib=rrd");
> +
> + println!("cargo:rerun-if-changed=wrapper.h");
> + // The bindgen::Builder is the main entry point
> + // to bindgen, and lets you build up options for
> + // the resulting bindings.
> +
> + let bindings = bindgen::Builder::default()
> + // The input header we would like to generate
> + // bindings for.
> + .header("wrapper.h")
> + // Tell cargo to invalidate the built crate whenever any of the
> + // included header files changed.
> + .parse_callbacks(Box::new(bindgen::CargoCallbacks))
> + // Finish the builder and generate the bindings.
> + .generate()
> + // Unwrap the Result and panic on failure.
> + .expect("Unable to generate bindings");
> +
> + // Write the bindings to the $OUT_DIR/bindings.rs file.
> + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
> + bindings
> + .write_to_file(out_path.join("bindings.rs"))
> + .expect("Couldn't write bindings!");
> +}
> diff --git a/src/lib.rs b/src/lib.rs
> new file mode 100644
> index 0000000..a38a13a
> --- /dev/null
> +++ b/src/lib.rs
> @@ -0,0 +1,5 @@
> +#![allow(non_upper_case_globals)]
> +#![allow(non_camel_case_types)]
> +#![allow(non_snake_case)]
> +
> +include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..5e6418c
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,567 @@
> +use anyhow::{bail, Error, Result};
> +use std::{
> + ffi::{CStr, CString, OsString},
> + fs,
> + os::unix::{ffi::OsStrExt, fs::PermissionsExt},
> + path::{Path, PathBuf},
> + sync::Arc,
> +};
> +
> +use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
> +
> +use crate::parallel_handler::ParallelHandler;
> +
> +pub mod parallel_handler;
> +
> +const BASE_DIR: &str = "/var/lib/rrdcached/db";
> +const SOURCE_SUBDIR_NODE: &str = "pve2-node";
> +const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
> +const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
> +const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
> +const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
> +const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
> +const RESOURCE_BASE_DIR: &str = "/etc/pve";
> +const MAX_THREADS: usize = 4;
> +const RRD_STEP_SIZE: usize = 60;
> +
> +type File = (CString, OsString);
Maybe use some different name here in order to avoid confusion with
std::fs::File? e.g. RRDFile
> +
> +// RRAs are defined in the following way:
> +//
> +// RRA:CF:xff:step:rows
> +// CF: AVERAGE or MAX
> +// xff: 0.5
> +// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
> +// e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
> +// rows: how many aggregated rows are kept, as in how far back in time we store data
> +//
> +// how many seconds are aggregated per RRA: steps * stepsize * rows
> +// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
> +// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
> +// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
> +
> +const RRD_VM_DEF: [&CStr; 25] = [
> + c"DS:maxcpu:GAUGE:120:0:U",
> + c"DS:cpu:GAUGE:120:0:U",
> + c"DS:maxmem:GAUGE:120:0:U",
> + c"DS:mem:GAUGE:120:0:U",
> + c"DS:maxdisk:GAUGE:120:0:U",
> + c"DS:disk:GAUGE:120:0:U",
> + c"DS:netin:DERIVE:120:0:U",
> + c"DS:netout:DERIVE:120:0:U",
> + c"DS:diskread:DERIVE:120:0:U",
> + c"DS:diskwrite:DERIVE:120:0:U",
> + c"DS:memhost:GAUGE:120:0:U",
> + c"DS:pressurecpusome:GAUGE:120:0:U",
> + c"DS:pressurecpufull:GAUGE:120:0:U",
> + c"DS:pressureiosome:GAUGE:120:0:U",
> + c"DS:pressureiofull:GAUGE:120:0:U",
> + c"DS:pressurememorysome:GAUGE:120:0:U",
> + c"DS:pressurememoryfull:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_NODE_DEF: [&CStr; 27] = [
> + c"DS:loadavg:GAUGE:120:0:U",
> + c"DS:maxcpu:GAUGE:120:0:U",
> + c"DS:cpu:GAUGE:120:0:U",
> + c"DS:iowait:GAUGE:120:0:U",
> + c"DS:memtotal:GAUGE:120:0:U",
> + c"DS:memused:GAUGE:120:0:U",
> + c"DS:swaptotal:GAUGE:120:0:U",
> + c"DS:swapused:GAUGE:120:0:U",
> + c"DS:roottotal:GAUGE:120:0:U",
> + c"DS:rootused:GAUGE:120:0:U",
> + c"DS:netin:DERIVE:120:0:U",
> + c"DS:netout:DERIVE:120:0:U",
> + c"DS:memfree:GAUGE:120:0:U",
> + c"DS:arcsize:GAUGE:120:0:U",
> + c"DS:pressurecpusome:GAUGE:120:0:U",
> + c"DS:pressureiosome:GAUGE:120:0:U",
> + c"DS:pressureiofull:GAUGE:120:0:U",
> + c"DS:pressurememorysome:GAUGE:120:0:U",
> + c"DS:pressurememoryfull:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_STORAGE_DEF: [&CStr; 10] = [
> + c"DS:total:GAUGE:120:0:U",
> + c"DS:used:GAUGE:120:0:U",
> + c"RRA:AVERAGE:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:AVERAGE:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:AVERAGE:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> + c"RRA:MAX:0.5:1:1440", // 1 min * 1440 => 1 day
> + c"RRA:MAX:0.5:30:1440", // 30 min * 1440 => 30 day
> + c"RRA:MAX:0.5:360:1440", // 6 hours * 1440 => 360 day ~1 year
> + c"RRA:MAX:0.5:10080:570", // 1 week * 570 => ~10 years
> +];
> +
> +const HELP: &str = "\
> +proxmox-rrd-migration tool
> +
> +Migrates existing RRD graph data to the new format.
> +
> +Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
> +
> +USAGE:
> + proxmox-rrd-migration [OPTIONS]
> +
> + FLAGS:
> + -h, --help Prints this help information
> +
> + OPTIONS:
> + --migrate Start the migration. Without it, only a dry run will be done.
> +
> + --force Migrate, even if the target already exists.
> + This will overwrite any migrated RRD files!
> +
> + --threads THREADS Number of paralell threads.
> +
> + --source <SOURCE DIR> Source base directory. Mainly for tests!
> + Default: /var/lib/rrdcached/db
> +
> + --target <TARGET DIR> Target base directory. Mainly for tests!
> + Default: /var/lib/rrdcached/db
> +
> + --resources <DIR> Directory that contains .vmlist and .member files. Mainly for tests!
> + Default: /etc/pve
> +
> +";
> +
> +#[derive(Debug)]
> +struct Args {
> + migrate: bool,
> + force: bool,
> + threads: Option<usize>,
> + source: Option<String>,
> + target: Option<String>,
> + resources: Option<String>,
> +}
> +
> +fn parse_args() -> Result<Args, Error> {
> + let mut pargs = pico_args::Arguments::from_env();
> +
> + // Help has a higher priority and should be handled separately.
> + if pargs.contains(["-h", "--help"]) {
> + print!("{}", HELP);
> + std::process::exit(0);
> + }
> +
> + let mut args = Args {
> + migrate: false,
> + threads: pargs
> + .opt_value_from_str("--threads")
> + .expect("Could not parse --threads parameter"),
> + force: false,
> + source: pargs
> + .opt_value_from_str("--source")
> + .expect("Could not parse --source parameter"),
> + target: pargs
> + .opt_value_from_str("--target")
> + .expect("Could not parse --target parameter"),
> + resources: pargs
> + .opt_value_from_str("--resources")
> + .expect("Could not parse --resources parameter"),
> + };
> +
> + if pargs.contains("--migrate") {
> + args.migrate = true;
> + }
> + if pargs.contains("--force") {
> + args.force = true;
> + }
> +
> + // It's up to the caller what to do with the remaining arguments.
> + let remaining = pargs.finish();
> + if !remaining.is_empty() {
> + bail!(format!("Warning: unused arguments left: {:?}", remaining));
No need to use format! here, bail! supports formatting natively:
bail!("Warning: .... {remaining:?}");
> + }
> +
> + Ok(args)
> +}
> +
> +fn main() {
> + let args = match parse_args() {
> + Ok(v) => v,
> + Err(e) => {
> + eprintln!("Error: {}.", e);
> + std::process::exit(1);
> + }
> + };
> +
> + let source_base_dir = match args.source {
> + Some(ref v) => v.as_str(),
> + None => BASE_DIR,
> + };
you can use this instead, it's shorter and a bit nicer to read IMO:
let source_base_dir = args.source.as_deref().unwrap_or(BASE_DIR);
> +
> + let target_base_dir = match args.target {
> + Some(ref v) => v.as_str(),
> + None => BASE_DIR,
> + };
> +
> + let resource_base_dir = match args.resources {
> + Some(ref v) => v.as_str(),
> + None => RESOURCE_BASE_DIR,
> + };
same for the previous two
> +
> + let source_dir_guests: PathBuf = [source_base_dir, SOURCE_SUBDIR_GUEST].iter().collect();
> + let target_dir_guests: PathBuf = [target_base_dir, TARGET_SUBDIR_GUEST].iter().collect();
> + let source_dir_nodes: PathBuf = [source_base_dir, SOURCE_SUBDIR_NODE].iter().collect();
> + let target_dir_nodes: PathBuf = [target_base_dir, TARGET_SUBDIR_NODE].iter().collect();
> + let source_dir_storage: PathBuf = [source_base_dir, SOURCE_SUBDIR_STORAGE].iter().collect();
> + let target_dir_storage: PathBuf = [target_base_dir, TARGET_SUBDIR_STORAGE].iter().collect();
What do you think about:
let source_base_dir = Path::new(args.source.as_deref().unwrap_or(BASE_DIR));
let target_base_dir = Path::new(args.target.as_deref().unwrap_or(BASE_DIR));
let source_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);
let target_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);
> +
> + if !args.migrate {
> + println!("DRYRUN! Use the --migrate parameter to start the migration.");
> + }
> + if args.force {
> + println!("Force mode! Will overwrite existing target RRD files!");
> + }
> +
> + if let Err(e) = migrate_nodes(
> + source_dir_nodes,
> + target_dir_nodes,
> + resource_base_dir,
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating nodes: {}", e);
> + std::process::exit(1);
> + }
> + if let Err(e) = migrate_storage(
> + source_dir_storage,
> + target_dir_storage,
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating storage: {}", e);
> + std::process::exit(1);
> + }
> + if let Err(e) = migrate_guests(
> + source_dir_guests,
> + target_dir_guests,
> + resource_base_dir,
> + set_threads(&args),
> + args.migrate,
> + args.force,
> + ) {
> + eprintln!("Error migrating guests: {}", e);
> + std::process::exit(1);
> + }
Error handling in this function could be a bit cleaner if broken out
into a separate function and by using anyhow's .context/.with_context:
fn do_main() -> Result<(), Error> {
let args = parse_args(...).context("Could not parse args")?;
...
migrate_guests(...).context("Error migrating guests")?;
Ok(())
}
fn main() {
if let Err(e) = do_main() {
eprintln!("{e}");
std::process:exit(1);
}
}
What do you think?
> +}
> +
> +/// Set number of threads
> +///
> +/// Either a fixed parameter or determining a range between 1 to 4 threads
> +/// based on the number of CPU cores available in the system.
> +fn set_threads(args: &Args) -> usize {
I think the name 'set_threads' is rather confusing for something that
*returns* the number of threads to use. Maybe call it
'threads_from_core_count' or something alike? (under the assumption that
you remove the let Some(...) as suggested below. If you keep it there,
'get_threads' might be an ok choice.
> + if let Some(threads) = args.threads {
> + return threads;
> + }
^ Personally I'd keep this part outside of the helper, but no hard
feelings.
fn do_main() {
...
let threads = args.threads.unwrap_or_else(threads_from_core_count);
migrate_guests(..., threads)?;
Ok(())
}
fn threads_from_core_count() -> usize {
...
}
> +
> + // check for a way to get physical cores and not threads?
> + let cpus: usize = String::from_utf8_lossy(
> + std::process::Command::new("nproc")
> + .output()
> + .expect("Error running nproc")
> + .stdout
> + .as_slice()
> + .trim_ascii(),
> + )
> + .parse::<usize>()
> + .expect("Could not parse nproc output");
> +
> + if cpus < 32 {
> + let threads = cpus / 8;
> + if threads == 0 {
> + return 1;
> + }
> + return threads;
> + }
> + MAX_THREADS
> +}
> +
> +/// Check if a VMID is currently configured
> +fn resource_present(path: &str, resource: &str) -> Result<bool> {
> + let resourcelist = fs::read_to_string(path)?;
> + Ok(resourcelist.contains(format!("\"{resource}\"").as_str()))
> +}
> +
> +/// Rename file to old, when migrated or resource not present at all -> old RRD file
> +fn mv_old(file: &str) -> Result<()> {
> + let old = format!("{}.old", file);
> + fs::rename(file, old)?;
> + Ok(())
> +}
> +
> +/// Colllect all RRD files in the provided directory
> +fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> {
^
Maybe use the type you've defined here? `File`, although I'd
prefer a different name to avoid confusion with std::fs::File.
> + let mut files: Vec<(CString, OsString)> = Vec::new();
> +
> + fs::read_dir(location)?
> + .filter(|f| f.is_ok())
> + .map(|f| f.unwrap().path())
You can use filter_map here, maybe like this:
fs::read_dir(location)?
.filter_map(|f| match f {
Ok(a) => Some(a.path()),
Err(e) => {
eprintln!("could not read dir entry: {e}");
None
}
})
or, if you don't want to log the error:
fs::read_dir(location)?
.filter_map(Result::ok)
.map(|entry| entry.path())
(untested, but you get the idea)
> + .filter(|f| f.is_file() && f.extension().is_none())
> + .for_each(|file| {
> + let path = CString::new(file.as_path().as_os_str().as_bytes())
> + .expect("Could not convert path to CString.");
^
> + let fname = file
> + .file_name()
> + .map(|v| v.to_os_string())
Reading the docs for the CString::new function, it should only fail if
there is a NUL byte in the string, which should AFAIK be impossible
here since the string came from the file name. Maybe express that in
some comment here?
v
> + .expect("Could not convert fname to OsString.");
> + files.push((path, fname))
> + });
> + Ok(files)
> +}
> +
> +/// Does the actual migration for the given file
> +fn do_rrd_migration(
> + file: File,
> + target_location: &Path,
> + rrd_def: &[&CStr],
> + migrate: bool,
> + force: bool,
> +) -> Result<()> {
> + if !migrate {
> + println!("would migrate but in dry run mode");
> + }
> +
> + let resource = file.1;
> + let mut target_path = target_location.to_path_buf();
Since the first thing you do with target_location is to convert it to
a PathBuf, I'd suggest just passing it as a PathBuf and let the caller
take care of the allocation.
> + target_path.push(resource);
> +
> + if target_path.exists() && !force {
> + println!(
> + "already migrated, use --force to overwrite target file: {}",
> + target_path.display()
> + );
> + }
> +
> + if !migrate || (target_path.exists() && !force) {
> + bail!("skipping");
> + }
you could pull out the 'target_path.exists() && !force' into a variable so
that you don't have to evaluate the same thing twice
> +
> + let mut source: [*const i8; 2] = [std::ptr::null(); 2];
> + source[0] = file.0.as_ptr();
> +
> + let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
> +
> + unsafe {
> + rrd_get_context();
> + rrd_clear_error();
> + let res = rrd_create_r2(
> + target_path.as_ptr(),
> + RRD_STEP_SIZE as u64,
> + 0,
> + 0,
> + source.as_mut_ptr(),
> + std::ptr::null(),
> + rrd_def.len() as i32,
> + rrd_def
> + .iter()
> + .map(|v| v.as_ptr())
> + .collect::<Vec<_>>()
> + .as_mut_ptr(),
> + );
> + if res != 0 {
> + bail!(
> + "RRD create Error: {}",
> + CStr::from_ptr(rrd_get_error()).to_string_lossy()
> + );
> + }
> + }
> + Ok(())
> +}
> +
> +/// Migrate guest RRD files
> +///
> +/// In parallel to speed up the process as most time is spent on converting the
> +/// data to the new format.
> +fn migrate_guests(
> + source_dir_guests: PathBuf,
> + target_dir_guests: PathBuf,
> + resources: &str,
> + threads: usize,
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for guests…");
> + println!("Using {} thread(s)", threads);
> +
> + let guest_source_files = collect_rrd_files(&source_dir_guests)?;
> +
> + if !target_dir_guests.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_guests.display());
> + std::fs::create_dir(&target_dir_guests)?;
> + }
> +
> + let total_guests = guest_source_files.len();
> + let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
> + let guests2 = guests.clone();
Just FIY, when cloning an Arc it's better to use Arc::clone(&guests),
because this makes it clearer *what* you are a actually cloning,
the Arc vs. the content of the Arc
> + let start_time = std::time::SystemTime::now();
Since you only measure the elapsed time it might be more idiomatic to
use std::time::Instant here, but not hard feelings.
> +
> + let migration_pool = ParallelHandler::new(
> + "guest rrd migration",
> + threads,
> + move |file: (CString, OsString)| {
Please add some comment regarding the .unwrap here.
> + let full_path = file.0.clone().into_string().unwrap();
> +
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_dir_guests,
> + RRD_VM_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + mv_old(full_path.as_str())?;
> + let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
> + if current_guests > 0 && current_guests % 200 == 0 {
> + println!("Migrated {} of {} guests", current_guests, total_guests);
> + }
> + }
> + Ok(())
> + },
> + );
> + let migration_channel = migration_pool.channel();
> +
> + for file in guest_source_files {
> + let node = file.1.clone().into_string().unwrap();
> + if !resource_present(format!("{resources}/.vmlist").as_str(), node.as_str())? {
> + println!("VMID: '{node}' not present. Skip and mark as old.");
> + mv_old(format!("{}", file.0.to_string_lossy()).as_str())?;
> + }
> + let migration_channel = migration_channel.clone();
Is this clone here needed? Seems to compile fine without here....
> + migration_channel.send(file)?;
> + }
> +
> + drop(migration_channel);
> + migration_pool.complete()?;
> +
> + let elapsed = start_time.elapsed()?.as_secs_f64();
> + let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
> + println!("Migrated {} guests", guests);
> + println!("It took {:.2}s", elapsed);
> +
> + Ok(())
> +}
> +
> +/// Migrate node RRD files
> +///
> +/// In serial as the number of nodes will not be high.
> +fn migrate_nodes(
> + source_dir_nodes: PathBuf,
> + target_dir_nodes: PathBuf,
> + resources: &str,
Any reason why this one is a &str instead of a PathBuf? As far as I can
tell it is also a path (/etc/pve by default). Also the
name of the variable somehow makes it not really clear that this is
suppose to be a path, I only deduced it from RESOURCE_BASE_DIR.
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for nodes…");
> +
> + if !target_dir_nodes.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_nodes.display());
> + std::fs::create_dir(&target_dir_nodes)?;
> + }
> +
> + let node_source_files = collect_rrd_files(&source_dir_nodes)?;
> +
> + for file in node_source_files {
> + let node = file.1.clone().into_string().unwrap();
> + let full_path = file.0.clone().into_string().unwrap();
Please add some comment why it is okay to .unwrap here (or just return
or ignore the error, if that makes more sense).
> + println!("Node: '{node}'");
> + if !resource_present(format!("{resources}/.members").as_str(), node.as_str())? {
You can just use &format!... and &node instead of the .as_str() calls
(a bit nicer to read and more idionmatic, but no hard feelings).
> + println!("Node: '{node}' not present. Skip and mark as old.");
> + mv_old(format!("{}/{}", file.0.to_string_lossy(), node).as_str())?;
> + }
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_dir_nodes,
> + RRD_NODE_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + mv_old(full_path.as_str())?;
> + }
> + }
> + println!("Migrated all nodes");
> +
> + Ok(())
> +}
> +
> +/// Migrate storage RRD files
> +///
> +/// In serial as the number of storage will not be that high.
> +fn migrate_storage(
> + source_dir_storage: PathBuf,
> + target_dir_storage: PathBuf,
> + migrate: bool,
> + force: bool,
> +) -> Result<(), Error> {
> + println!("Migrating RRD data for storages…");
> +
> + if !target_dir_storage.exists() && migrate {
> + println!("Creating new directory: '{}'", target_dir_storage.display());
> + std::fs::create_dir(&target_dir_storage)?;
> + }
> +
> + // storage has another layer of directories per node over which we need to iterate
> + fs::read_dir(&source_dir_storage)?
> + .filter(|f| f.is_ok())
> + .map(|f| f.unwrap().path())
> + .filter(|f| f.is_dir())
you can use filter_map here, as explained in collect_rrd_files
> + .try_for_each(|node| {
> + let mut source_storage_subdir = source_dir_storage.clone();
> + source_storage_subdir.push(node.file_name().unwrap());
> +
> + let mut target_storage_subdir = target_dir_storage.clone();
> + target_storage_subdir.push(node.file_name().unwrap());
> +
> + if !target_storage_subdir.exists() && migrate {
> + fs::create_dir(target_storage_subdir.as_path())?;
You can use & here instead of .as_path() :)
> + let metadata = target_storage_subdir.metadata()?;
> + let mut permissions = metadata.permissions();
> + permissions.set_mode(0o755);
You need to actually apply the permissions to the dir, here you only set
the permission bits in the Permissions data type.
std::fs::set_permissions(...)
> + }
> +
> + let storage_source_files = collect_rrd_files(&source_storage_subdir)?;
> +
> + for file in storage_source_files {
> + println!(
> + "Storage: '{}/{}'",
> + node.file_name()
> + .expect("no file name present")
Same thing here regarding the potential panic
> + .to_string_lossy(),
> + PathBuf::from(file.1.clone()).display()
Starting with rustc 1.87, you can directly call file.1.display() on the underlying OsStr(ing).
> + );
> +
> + let full_path = file.0.clone().into_string().unwrap();
> + if let Ok(()) = do_rrd_migration(
> + file,
> + &target_storage_subdir,
> + RRD_STORAGE_DEF.as_slice(),
> + migrate,
> + force,
> + ) {
> + mv_old(full_path.as_str())?;
> + }
Since do_rrd_migration does not return any data in the Option, you could
just
if do_rrd_migration(....).is_ok() {
....
}
> + }
> + Ok::<(), Error>(())
> + })?;
> + println!("Migrated all storages");
> +
> + Ok(())
> +}
> diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
> new file mode 100644
> index 0000000..d8ee3c7
> --- /dev/null
> +++ b/src/parallel_handler.rs
> @@ -0,0 +1,160 @@
> +//! A thread pool which run a closure in parallel.
> +
> +use std::sync::{Arc, Mutex};
> +use std::thread::JoinHandle;
> +
> +use anyhow::{bail, format_err, Error};
> +use crossbeam_channel::{bounded, Sender};
> +
> +/// A handle to send data to the worker thread (implements clone)
> +pub struct SendHandle<I> {
> + input: Sender<I>,
> + abort: Arc<Mutex<Option<String>>>,
> +}
> +
> +/// Returns the first error happened, if any
> +pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
> + let guard = abort.lock().unwrap();
> + if let Some(err_msg) = &*guard {
> + return Err(format_err!("{}", err_msg));
> + }
> + Ok(())
> +}
> +
> +impl<I: Send> SendHandle<I> {
> + /// Send data to the worker threads
> + pub fn send(&self, input: I) -> Result<(), Error> {
> + check_abort(&self.abort)?;
> + match self.input.send(input) {
> + Ok(()) => Ok(()),
> + Err(_) => bail!("send failed - channel closed"),
> + }
might be more idiomatic to use .map_err here
> + }
> +}
> +
> +/// A thread pool which run the supplied closure
> +///
> +/// The send command sends data to the worker threads. If one handler
> +/// returns an error, we mark the channel as failed and it is no
> +/// longer possible to send data.
> +///
> +/// When done, the 'complete()' method needs to be called to check for
> +/// outstanding errors.
> +pub struct ParallelHandler<I> {
> + handles: Vec<JoinHandle<()>>,
> + name: String,
> + input: Option<SendHandle<I>>,
> +}
> +
> +impl<I> Clone for SendHandle<I> {
> + fn clone(&self) -> Self {
> + Self {
> + input: self.input.clone(),
> + abort: Arc::clone(&self.abort),
> + }
> + }
> +}
> +
> +impl<I: Send + 'static> ParallelHandler<I> {
> + /// Create a new thread pool, each thread processing incoming data
> + /// with 'handler_fn'.
> + pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
> + where
> + F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
> + {
> + let mut handles = Vec::new();
> + let (input_tx, input_rx) = bounded::<I>(threads);
> +
> + let abort = Arc::new(Mutex::new(None));
> +
> + for i in 0..threads {
> + let input_rx = input_rx.clone();
> + let abort = Arc::clone(&abort);
> + let handler_fn = handler_fn.clone();
> +
> + handles.push(
> + std::thread::Builder::new()
> + .name(format!("{} ({})", name, i))
> + .spawn(move || loop {
> + let data = match input_rx.recv() {
> + Ok(data) => data,
> + Err(_) => return,
> + };
> + if let Err(err) = (handler_fn)(data) {
> + let mut guard = abort.lock().unwrap();
.unwrap on Mutex::lock is fine IMO, but should have a comment explaining
that it only .unwrap's on a poisioned mutex.
> + if guard.is_none() {
> + *guard = Some(err.to_string());
> + }
> + }
> + })
> + .unwrap(),
This shouldn't .unwrap() IMO, rather return an error from this function.
> + );
> + }
> + Self {
> + handles,
> + name: name.to_string(),
> + input: Some(SendHandle {
> + input: input_tx,
> + abort,
> + }),
> + }
> + }
> +
> + /// Returns a cloneable channel to send data to the worker threads
> + pub fn channel(&self) -> SendHandle<I> {
> + self.input.as_ref().unwrap().clone()
Please add a comment why .unwrap is okay here or bubble some error up
> + }
> +
> + /// Send data to the worker threads
> + pub fn send(&self, input: I) -> Result<(), Error> {
> + self.input.as_ref().unwrap().send(input)?;
Please add a comment why .unwrap is okay here or bubble some error up
> + Ok(())
> + }
> +
> + /// Wait for worker threads to complete and check for errors
> + pub fn complete(mut self) -> Result<(), Error> {
> + let input = self.input.take().unwrap();
> + let abort = Arc::clone(&input.abort);
> + check_abort(&abort)?;
> + drop(input);
> +
> + let msg_list = self.join_threads();
> +
> + // an error might be encountered while waiting for the join
> + check_abort(&abort)?;
> +
> + if msg_list.is_empty() {
> + return Ok(());
> + }
> + Err(format_err!("{}", msg_list.join("\n")))
I'd rather
if !msg_list.is_empty() {
bail!("{}", msg_list.join('\n'));
}
Ok(())
> + }
> +
> + fn join_threads(&mut self) -> Vec<String> {
> + let mut msg_list = Vec::new();
> +
> + let mut i = 0;
> + while let Some(handle) = self.handles.pop() {
> + if let Err(panic) = handle.join() {
> + if let Some(panic_msg) = panic.downcast_ref::<&str>() {
> + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> + } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
> + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> + } else {
> + msg_list.push(format!("thread {} ({i}) panicked", self.name));
> + }
> + }
> + i += 1;
> + }
> + msg_list
> + }
> +}
> +
> +// Note: We make sure that all threads will be joined
> +impl<I> Drop for ParallelHandler<I> {
> + fn drop(&mut self) {
> + drop(self.input.take());
> + while let Some(handle) = self.handles.pop() {
> + let _ = handle.join();
> + }
> + }
> +}
> diff --git a/wrapper.h b/wrapper.h
> new file mode 100644
> index 0000000..64d0aa6
> --- /dev/null
> +++ b/wrapper.h
> @@ -0,0 +1 @@
> +#include <rrd.h>
More information about the pve-devel
mailing list