[pve-devel] [PATCH proxmox-rrd-migration-tool v4 1/3] create proxmox-rrd-migration-tool

Lukas Wagner l.wagner at proxmox.com
Mon Jul 28 16:25:15 CEST 2025


Hey Aaron,

some comments inline.


On Sat Jul 26, 2025 at 3:05 AM CEST, Aaron Lauterer wrote:
> This tool is intended to migrate the Proxmox VE (PVE) RRD data files to
> the new schema.
>
> Up until PVE8 the schema has been the same for a long time. With PVE9 we
> introduced new columns to guests (vm) and nodes. We also switched all
> types (vm, node, storate) to the same aggregation schemas as we do it in
> PBS.
> The result of both are a much finer resolution for long time spans, but
> also larger RRD files.
>
> * node: 79K -> 1.4M
> * vm: 66K -> 1.3m
> * storage: 14K -> 156K
>
> The old directories for VMs used to be in `/var/lib/rrdcached/db/` with
> the following sub directories:
>
> * nodes: `pve2-node`
> * guests (VM/CT): `pve2-vm`
> * storage: `pve2-storage`
>
> With this change we also introduce a new key schema, that makes it
> easier in the future to introduce new ones. Instead of the
> `pve{version}-{type}` we are switching to `pve-{type}-{version}`.
>
> This enables us to add new columns with a new version, without breaking
> nodes that are not yet updated. We are NOT allowed to remove or re-use
> existing columns. That would be a breaking change.
> We are currently at version 9.0. But in the future, if needed, this tool
> can be adapted to do other migrations too.
> For example, {old, 9.0} -> 9.2, should that be necessary.
>
> The actual migration is handled by `librrd` to which we pass the path to
> the old and new files, and the new RRD definitions. The `rrd_create_r2`
> call then does the hard work of migrating and converting exisiting data
> into the new file and aggregation schema.
>
> This can take some time. Quick tests on a Ryzen 7900X with the following
> files:
> * 1 node RRD file
> * 10k vm RRD files
> * 1 storage RRD file
>
> showed the folling results:
>
> * 1 thread:  179.61s user 14.82s system 100% cpu 3:14.17 total
> * 4 threads: 187.57s user 16.98s system 399% cpu 51.198 total
>
> This is why we do not migrate inline, but have it as a separate step
> during package upgrades.
>
> Behavior: By default nothing will be changed and a dry or test run will
> happen.
> Only if the `--migrate` parameter is added will the actual migration be
> done.
>
> For each found RRD source file, the tool checks if a matching target
> file already exists. By default, those will be skipped to not overwrite
> target files that might already store newer data.
> With the `--force` parameter this can be changed.
>
> That means, one can run the tool multiple times (without --force) and it
> will pick up where it might have left off. For example it the migration
> was interrupted for some reason.
>
> Once a source file has been processed it will be renamed with the `.old`
> appendix. It will be excluded from future runs as we check for files
> without an extension.
>
> The tool has some simple heuristic to determine how many threads should
> be used. Be default the range is between 1 to 4 threads. But the
> `--threads` parameter allows a manual override.
>
> Signed-off-by: Aaron Lauterer <a.lauterer at proxmox.com>
> ---
>  .cargo/config.toml      |   5 +
>  .gitignore              |   9 +
>  Cargo.toml              |  20 ++
>  build.rs                |  29 ++
>  src/lib.rs              |   5 +
>  src/main.rs             | 567 ++++++++++++++++++++++++++++++++++++++++
>  src/parallel_handler.rs | 160 ++++++++++++
>  wrapper.h               |   1 +
>  8 files changed, 796 insertions(+)
>  create mode 100644 .cargo/config.toml
>  create mode 100644 .gitignore
>  create mode 100644 Cargo.toml
>  create mode 100644 build.rs
>  create mode 100644 src/lib.rs
>  create mode 100644 src/main.rs
>  create mode 100644 src/parallel_handler.rs
>  create mode 100644 wrapper.h
>
> diff --git a/.cargo/config.toml b/.cargo/config.toml
> new file mode 100644
> index 0000000..3b5b6e4
> --- /dev/null
> +++ b/.cargo/config.toml
> @@ -0,0 +1,5 @@
> +[source]
> +[source.debian-packages]
> +directory = "/usr/share/cargo/registry"
> +[source.crates-io]
> +replace-with = "debian-packages"
> diff --git a/.gitignore b/.gitignore
> new file mode 100644
> index 0000000..06ac1a1
> --- /dev/null
> +++ b/.gitignore
> @@ -0,0 +1,9 @@
> +*.build
> +*.buildinfo
> +*.changes
> +*.deb
> +*.dsc
> +*.tar*
> +target/
> +/Cargo.lock
> +/proxmox-rrd-migration-tool-[0-9]*/
> diff --git a/Cargo.toml b/Cargo.toml
> new file mode 100644
> index 0000000..d3523f3
> --- /dev/null
> +++ b/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox_rrd_migration_8-9"
> +version = "0.1.0"
> +edition = "2021"
> +authors = [
> +    "Aaron Lauterer <a.lauterer at proxmox.com>",
> +    "Proxmox Support Team <support at proxmox.com>",
> +]
> +license = "AGPL-3"
> +homepage = "https://www.proxmox.com"
> +
> +[dependencies]
> +anyhow = "1.0.86"
> +pico-args = "0.5.0"
> +proxmox-async = "0.4"
> +crossbeam-channel = "0.5"
> +
> +[build-dependencies]
> +bindgen = "0.66.1"
> +pkg-config = "0.3"
> diff --git a/build.rs b/build.rs
> new file mode 100644
> index 0000000..56d07cc
> --- /dev/null
> +++ b/build.rs
> @@ -0,0 +1,29 @@
> +use std::env;
> +use std::path::PathBuf;
> +
> +fn main() {
> +    println!("cargo:rustc-link-lib=rrd");
> +
> +    println!("cargo:rerun-if-changed=wrapper.h");
> +    // The bindgen::Builder is the main entry point
> +    // to bindgen, and lets you build up options for
> +    // the resulting bindings.
> +
> +    let bindings = bindgen::Builder::default()
> +        // The input header we would like to generate
> +        // bindings for.
> +        .header("wrapper.h")
> +        // Tell cargo to invalidate the built crate whenever any of the
> +        // included header files changed.
> +        .parse_callbacks(Box::new(bindgen::CargoCallbacks))
> +        // Finish the builder and generate the bindings.
> +        .generate()
> +        // Unwrap the Result and panic on failure.
> +        .expect("Unable to generate bindings");
> +
> +    // Write the bindings to the $OUT_DIR/bindings.rs file.
> +    let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
> +    bindings
> +        .write_to_file(out_path.join("bindings.rs"))
> +        .expect("Couldn't write bindings!");
> +}
> diff --git a/src/lib.rs b/src/lib.rs
> new file mode 100644
> index 0000000..a38a13a
> --- /dev/null
> +++ b/src/lib.rs
> @@ -0,0 +1,5 @@
> +#![allow(non_upper_case_globals)]
> +#![allow(non_camel_case_types)]
> +#![allow(non_snake_case)]
> +
> +include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
> diff --git a/src/main.rs b/src/main.rs
> new file mode 100644
> index 0000000..5e6418c
> --- /dev/null
> +++ b/src/main.rs
> @@ -0,0 +1,567 @@
> +use anyhow::{bail, Error, Result};
> +use std::{
> +    ffi::{CStr, CString, OsString},
> +    fs,
> +    os::unix::{ffi::OsStrExt, fs::PermissionsExt},
> +    path::{Path, PathBuf},
> +    sync::Arc,
> +};
> +
> +use proxmox_rrd_migration_tool::{rrd_clear_error, rrd_create_r2, rrd_get_context, rrd_get_error};
> +
> +use crate::parallel_handler::ParallelHandler;
> +
> +pub mod parallel_handler;
> +
> +const BASE_DIR: &str = "/var/lib/rrdcached/db";
> +const SOURCE_SUBDIR_NODE: &str = "pve2-node";
> +const SOURCE_SUBDIR_GUEST: &str = "pve2-vm";
> +const SOURCE_SUBDIR_STORAGE: &str = "pve2-storage";
> +const TARGET_SUBDIR_NODE: &str = "pve-node-9.0";
> +const TARGET_SUBDIR_GUEST: &str = "pve-vm-9.0";
> +const TARGET_SUBDIR_STORAGE: &str = "pve-storage-9.0";
> +const RESOURCE_BASE_DIR: &str = "/etc/pve";
> +const MAX_THREADS: usize = 4;
> +const RRD_STEP_SIZE: usize = 60;
> +
> +type File = (CString, OsString);

Maybe use some different name here in order to avoid confusion with
std::fs::File? e.g. RRDFile

> +
> +// RRAs are defined in the following way:
> +//
> +// RRA:CF:xff:step:rows
> +// CF: AVERAGE or MAX
> +// xff: 0.5
> +// steps: stepsize is defined on rrd file creation! example: with 60 seconds step size:
> +//	e.g. 1 => 60 sec, 30 => 1800 seconds or 30 min
> +// rows: how many aggregated rows are kept, as in how far back in time we store data
> +//
> +// how many seconds are aggregated per RRA: steps * stepsize * rows
> +// how many hours are aggregated per RRA: steps * stepsize * rows / 3600
> +// how many days are aggregated per RRA: steps * stepsize * rows / 3600 / 24
> +// https://oss.oetiker.ch/rrdtool/tut/rrd-beginners.en.html#Understanding_by_an_example
> +
> +const RRD_VM_DEF: [&CStr; 25] = [
> +    c"DS:maxcpu:GAUGE:120:0:U",
> +    c"DS:cpu:GAUGE:120:0:U",
> +    c"DS:maxmem:GAUGE:120:0:U",
> +    c"DS:mem:GAUGE:120:0:U",
> +    c"DS:maxdisk:GAUGE:120:0:U",
> +    c"DS:disk:GAUGE:120:0:U",
> +    c"DS:netin:DERIVE:120:0:U",
> +    c"DS:netout:DERIVE:120:0:U",
> +    c"DS:diskread:DERIVE:120:0:U",
> +    c"DS:diskwrite:DERIVE:120:0:U",
> +    c"DS:memhost:GAUGE:120:0:U",
> +    c"DS:pressurecpusome:GAUGE:120:0:U",
> +    c"DS:pressurecpufull:GAUGE:120:0:U",
> +    c"DS:pressureiosome:GAUGE:120:0:U",
> +    c"DS:pressureiofull:GAUGE:120:0:U",
> +    c"DS:pressurememorysome:GAUGE:120:0:U",
> +    c"DS:pressurememoryfull:GAUGE:120:0:U",
> +    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
> +    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
> +    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> +    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
> +    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
> +    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_NODE_DEF: [&CStr; 27] = [
> +    c"DS:loadavg:GAUGE:120:0:U",
> +    c"DS:maxcpu:GAUGE:120:0:U",
> +    c"DS:cpu:GAUGE:120:0:U",
> +    c"DS:iowait:GAUGE:120:0:U",
> +    c"DS:memtotal:GAUGE:120:0:U",
> +    c"DS:memused:GAUGE:120:0:U",
> +    c"DS:swaptotal:GAUGE:120:0:U",
> +    c"DS:swapused:GAUGE:120:0:U",
> +    c"DS:roottotal:GAUGE:120:0:U",
> +    c"DS:rootused:GAUGE:120:0:U",
> +    c"DS:netin:DERIVE:120:0:U",
> +    c"DS:netout:DERIVE:120:0:U",
> +    c"DS:memfree:GAUGE:120:0:U",
> +    c"DS:arcsize:GAUGE:120:0:U",
> +    c"DS:pressurecpusome:GAUGE:120:0:U",
> +    c"DS:pressureiosome:GAUGE:120:0:U",
> +    c"DS:pressureiofull:GAUGE:120:0:U",
> +    c"DS:pressurememorysome:GAUGE:120:0:U",
> +    c"DS:pressurememoryfull:GAUGE:120:0:U",
> +    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
> +    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
> +    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> +    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
> +    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
> +    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
> +];
> +
> +const RRD_STORAGE_DEF: [&CStr; 10] = [
> +    c"DS:total:GAUGE:120:0:U",
> +    c"DS:used:GAUGE:120:0:U",
> +    c"RRA:AVERAGE:0.5:1:1440",    // 1 min * 1440 => 1 day
> +    c"RRA:AVERAGE:0.5:30:1440",   // 30 min * 1440 => 30 day
> +    c"RRA:AVERAGE:0.5:360:1440",  // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:AVERAGE:0.5:10080:570", // 1 week * 570 => ~10 years
> +    c"RRA:MAX:0.5:1:1440",        // 1 min * 1440 => 1 day
> +    c"RRA:MAX:0.5:30:1440",       // 30 min * 1440 => 30 day
> +    c"RRA:MAX:0.5:360:1440",      // 6 hours * 1440 => 360 day ~1 year
> +    c"RRA:MAX:0.5:10080:570",     // 1 week * 570 => ~10 years
> +];
> +
> +const HELP: &str = "\
> +proxmox-rrd-migration tool
> +
> +Migrates existing RRD graph data to the new format.
> +
> +Use this only in the process of upgrading from Proxmox VE 8 to 9 according to the upgrade guide!
> +
> +USAGE:
> +    proxmox-rrd-migration [OPTIONS]
> +
> +    FLAGS:
> +        -h, --help              Prints this help information
> +
> +    OPTIONS:
> +        --migrate               Start the migration. Without it, only a dry run will be done.
> +
> +        --force                 Migrate, even if the target already exists.
> +                                This will overwrite any migrated RRD files!
> +
> +        --threads THREADS       Number of paralell threads.
> +
> +        --source <SOURCE DIR>   Source base directory. Mainly for tests!
> +                                Default: /var/lib/rrdcached/db
> +
> +        --target <TARGET DIR>   Target base directory. Mainly for tests!
> +                                Default: /var/lib/rrdcached/db
> +
> +        --resources <DIR>       Directory that contains .vmlist and .member files. Mainly for tests!
> +                                Default: /etc/pve
> +
> +";
> +
> +#[derive(Debug)]
> +struct Args {
> +    migrate: bool,
> +    force: bool,
> +    threads: Option<usize>,
> +    source: Option<String>,
> +    target: Option<String>,
> +    resources: Option<String>,
> +}
> +
> +fn parse_args() -> Result<Args, Error> {
> +    let mut pargs = pico_args::Arguments::from_env();
> +
> +    // Help has a higher priority and should be handled separately.
> +    if pargs.contains(["-h", "--help"]) {
> +        print!("{}", HELP);
> +        std::process::exit(0);
> +    }
> +
> +    let mut args = Args {
> +        migrate: false,
> +        threads: pargs
> +            .opt_value_from_str("--threads")
> +            .expect("Could not parse --threads parameter"),
> +        force: false,
> +        source: pargs
> +            .opt_value_from_str("--source")
> +            .expect("Could not parse --source parameter"),
> +        target: pargs
> +            .opt_value_from_str("--target")
> +            .expect("Could not parse --target parameter"),
> +        resources: pargs
> +            .opt_value_from_str("--resources")
> +            .expect("Could not parse --resources parameter"),
> +    };
> +
> +    if pargs.contains("--migrate") {
> +        args.migrate = true;
> +    }
> +    if pargs.contains("--force") {
> +        args.force = true;
> +    }
> +
> +    // It's up to the caller what to do with the remaining arguments.
> +    let remaining = pargs.finish();
> +    if !remaining.is_empty() {
> +        bail!(format!("Warning: unused arguments left: {:?}", remaining));

No need to use format! here, bail! supports formatting natively:

	bail!("Warning: .... {remaining:?}");

> +    }
> +
> +    Ok(args)
> +}
> +
> +fn main() {
> +    let args = match parse_args() {
> +        Ok(v) => v,
> +        Err(e) => {
> +            eprintln!("Error: {}.", e);
> +            std::process::exit(1);
> +        }
> +    };
> +
> +    let source_base_dir = match args.source {
> +        Some(ref v) => v.as_str(),
> +        None => BASE_DIR,
> +    };

you can use this instead, it's shorter and a bit nicer to read IMO:
       let source_base_dir = args.source.as_deref().unwrap_or(BASE_DIR);

> +
> +    let target_base_dir = match args.target {
> +        Some(ref v) => v.as_str(),
> +        None => BASE_DIR,
> +    };
> +
> +    let resource_base_dir = match args.resources {
> +        Some(ref v) => v.as_str(),
> +        None => RESOURCE_BASE_DIR,
> +    };

same for the previous two

> +
> +    let source_dir_guests: PathBuf = [source_base_dir, SOURCE_SUBDIR_GUEST].iter().collect();
> +    let target_dir_guests: PathBuf = [target_base_dir, TARGET_SUBDIR_GUEST].iter().collect();
> +    let source_dir_nodes: PathBuf = [source_base_dir, SOURCE_SUBDIR_NODE].iter().collect();
> +    let target_dir_nodes: PathBuf = [target_base_dir, TARGET_SUBDIR_NODE].iter().collect();
> +    let source_dir_storage: PathBuf = [source_base_dir, SOURCE_SUBDIR_STORAGE].iter().collect();
> +    let target_dir_storage: PathBuf = [target_base_dir, TARGET_SUBDIR_STORAGE].iter().collect();


What do you think about:

let source_base_dir = Path::new(args.source.as_deref().unwrap_or(BASE_DIR));
let target_base_dir = Path::new(args.target.as_deref().unwrap_or(BASE_DIR));

let source_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);
let target_dir_guests = source_base_dir.join(SOURCE_SUBDIR_GUEST);



> +
> +    if !args.migrate {
> +        println!("DRYRUN! Use the --migrate parameter to start the migration.");
> +    }
> +    if args.force {
> +        println!("Force mode! Will overwrite existing target RRD files!");
> +    }
> +
> +    if let Err(e) = migrate_nodes(
> +        source_dir_nodes,
> +        target_dir_nodes,
> +        resource_base_dir,
> +        args.migrate,
> +        args.force,
> +    ) {
> +        eprintln!("Error migrating nodes: {}", e);
> +        std::process::exit(1);
> +    }
> +    if let Err(e) = migrate_storage(
> +        source_dir_storage,
> +        target_dir_storage,
> +        args.migrate,
> +        args.force,
> +    ) {
> +        eprintln!("Error migrating storage: {}", e);
> +        std::process::exit(1);
> +    }
> +    if let Err(e) = migrate_guests(
> +        source_dir_guests,
> +        target_dir_guests,
> +        resource_base_dir,
> +        set_threads(&args),
> +        args.migrate,
> +        args.force,
> +    ) {
> +        eprintln!("Error migrating guests: {}", e);
> +        std::process::exit(1);
> +    }

Error handling in this function could be a bit cleaner if broken out
into a separate function and by using anyhow's .context/.with_context:

fn do_main() -> Result<(), Error> {
    let args = parse_args(...).context("Could not parse args")?;
    ...
    migrate_guests(...).context("Error migrating guests")?;

    Ok(())
}

fn main() {
    if let Err(e) = do_main() {
        eprintln!("{e}");
	std::process:exit(1);
    }
}

What do you think?


> +}
> +
> +/// Set number of threads
> +///
> +/// Either a fixed parameter or determining a range between 1 to 4 threads
> +///  based on the number of CPU cores available in the system.
> +fn set_threads(args: &Args) -> usize {

I think the name 'set_threads' is rather confusing for something that
*returns* the number of threads to use. Maybe call it
'threads_from_core_count' or something alike? (under the assumption that
you remove the let Some(...) as suggested below. If you keep it there,
'get_threads' might be an ok choice.



> +    if let Some(threads) = args.threads {
> +        return threads;
> +    }

^ Personally I'd keep this part outside of the helper, but no hard
feelings.

fn do_main() {
    ...

    let threads = args.threads.unwrap_or_else(threads_from_core_count);
    migrate_guests(..., threads)?;

    Ok(())
}


fn threads_from_core_count() -> usize {
  ...
}



> +
> +    // check for a way to get physical cores and not threads?
> +    let cpus: usize = String::from_utf8_lossy(
> +        std::process::Command::new("nproc")
> +            .output()
> +            .expect("Error running nproc")
> +            .stdout
> +            .as_slice()
> +            .trim_ascii(),
> +    )
> +    .parse::<usize>()
> +    .expect("Could not parse nproc output");
> +
> +    if cpus < 32 {
> +        let threads = cpus / 8;
> +        if threads == 0 {
> +            return 1;
> +        }
> +        return threads;
> +    }
> +    MAX_THREADS
> +}
> +
> +/// Check if a VMID is currently configured
> +fn resource_present(path: &str, resource: &str) -> Result<bool> {
> +    let resourcelist = fs::read_to_string(path)?;
> +    Ok(resourcelist.contains(format!("\"{resource}\"").as_str()))
> +}
> +
> +/// Rename file to old, when migrated or resource not present at all -> old RRD file
> +fn mv_old(file: &str) -> Result<()> {
> +    let old = format!("{}.old", file);
> +    fs::rename(file, old)?;
> +    Ok(())
> +}
> +
> +/// Colllect all RRD files in the provided directory
> +fn collect_rrd_files(location: &PathBuf) -> Result<Vec<(CString, OsString)>> {
                                                             ^
Maybe use the type you've defined here? `File`, although I'd 
prefer a different name to avoid confusion with std::fs::File.

> +    let mut files: Vec<(CString, OsString)> = Vec::new();
> +
> +    fs::read_dir(location)?
> +        .filter(|f| f.is_ok())
> +        .map(|f| f.unwrap().path())

You can use filter_map here, maybe like this:

fs::read_dir(location)?
    .filter_map(|f| match f {
        Ok(a) => Some(a.path()),
        Err(e) => {
            eprintln!("could not read dir entry: {e}");
            None
        }
    })

or, if you don't want to log the error:

fs::read_dir(location)?
    .filter_map(Result::ok)
    .map(|entry| entry.path())

(untested, but you get the idea)

> +        .filter(|f| f.is_file() && f.extension().is_none())
> +        .for_each(|file| {
> +            let path = CString::new(file.as_path().as_os_str().as_bytes())
> +                .expect("Could not convert path to CString.");
                      ^

> +            let fname = file
> +                .file_name()
> +                .map(|v| v.to_os_string())

Reading the docs for the CString::new function, it should only fail if
there is a NUL byte in the string, which should AFAIK be impossible
here since the string came from the file name. Maybe express that in
some comment here?
                       v

> +                .expect("Could not convert fname to OsString.");
> +            files.push((path, fname))
> +        });
> +    Ok(files)
> +}
> +
> +/// Does the actual migration for the given file
> +fn do_rrd_migration(
> +    file: File,
> +    target_location: &Path,
> +    rrd_def: &[&CStr],
> +    migrate: bool,
> +    force: bool,
> +) -> Result<()> {
> +    if !migrate {
> +        println!("would migrate but in dry run mode");
> +    }
> +
> +    let resource = file.1;
> +    let mut target_path = target_location.to_path_buf();

Since the first thing you do with target_location is to convert it to
a PathBuf, I'd suggest just passing it as a PathBuf and let the caller
take care of the allocation.

> +    target_path.push(resource);
> +
> +    if target_path.exists() && !force {
> +        println!(
> +            "already migrated, use --force to overwrite target file: {}",
> +            target_path.display()
> +        );
> +    }
> +
> +    if !migrate || (target_path.exists() && !force) {
> +        bail!("skipping");
> +    }

you could pull out the 'target_path.exists() && !force' into a variable so
that you don't have to evaluate the same thing twice

> +
> +    let mut source: [*const i8; 2] = [std::ptr::null(); 2];
> +    source[0] = file.0.as_ptr();
> +
> +    let target_path = CString::new(target_path.to_str().unwrap()).unwrap();
> +
> +    unsafe {
> +        rrd_get_context();
> +        rrd_clear_error();
> +        let res = rrd_create_r2(
> +            target_path.as_ptr(),
> +            RRD_STEP_SIZE as u64,
> +            0,
> +            0,
> +            source.as_mut_ptr(),
> +            std::ptr::null(),
> +            rrd_def.len() as i32,
> +            rrd_def
> +                .iter()
> +                .map(|v| v.as_ptr())
> +                .collect::<Vec<_>>()
> +                .as_mut_ptr(),
> +        );
> +        if res != 0 {
> +            bail!(
> +                "RRD create Error: {}",
> +                CStr::from_ptr(rrd_get_error()).to_string_lossy()
> +            );
> +        }
> +    }
> +    Ok(())
> +}
> +
> +/// Migrate guest RRD files
> +///
> +/// In parallel to speed up the process as most time is spent on converting the
> +/// data to the new format.
> +fn migrate_guests(
> +    source_dir_guests: PathBuf,
> +    target_dir_guests: PathBuf,
> +    resources: &str,
> +    threads: usize,
> +    migrate: bool,
> +    force: bool,
> +) -> Result<(), Error> {
> +    println!("Migrating RRD data for guests…");
> +    println!("Using {} thread(s)", threads);
> +
> +    let guest_source_files = collect_rrd_files(&source_dir_guests)?;
> +
> +    if !target_dir_guests.exists() && migrate {
> +        println!("Creating new directory: '{}'", target_dir_guests.display());
> +        std::fs::create_dir(&target_dir_guests)?;
> +    }
> +
> +    let total_guests = guest_source_files.len();
> +    let guests = Arc::new(std::sync::atomic::AtomicUsize::new(0));
> +    let guests2 = guests.clone();

Just FIY, when cloning an Arc it's better to use Arc::clone(&guests),
because this makes it clearer *what* you are a actually cloning,
the Arc vs. the content of the Arc

> +    let start_time = std::time::SystemTime::now();

Since you only measure the elapsed time it might be more idiomatic to
use std::time::Instant here, but not hard feelings.

> +
> +    let migration_pool = ParallelHandler::new(
> +        "guest rrd migration",
> +        threads,
> +        move |file: (CString, OsString)| {

Please add some comment regarding the .unwrap here.

> +            let full_path = file.0.clone().into_string().unwrap();
> +
> +            if let Ok(()) = do_rrd_migration(
> +                file,
> +                &target_dir_guests,
> +                RRD_VM_DEF.as_slice(),
> +                migrate,
> +                force,
> +            ) {


Since do_rrd_migration does not return any data in the Option, you could
just 

if do_rrd_migration(....).is_ok() {
   ....
}

> +                mv_old(full_path.as_str())?;
> +                let current_guests = guests2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
> +                if current_guests > 0 && current_guests % 200 == 0 {
> +                    println!("Migrated {} of {} guests", current_guests, total_guests);
> +                }
> +            }
> +            Ok(())
> +        },
> +    );
> +    let migration_channel = migration_pool.channel();
> +
> +    for file in guest_source_files {
> +        let node = file.1.clone().into_string().unwrap();
> +        if !resource_present(format!("{resources}/.vmlist").as_str(), node.as_str())? {
> +            println!("VMID: '{node}' not present. Skip and mark as old.");
> +            mv_old(format!("{}", file.0.to_string_lossy()).as_str())?;
> +        }
> +        let migration_channel = migration_channel.clone();

Is this clone here needed? Seems to compile fine without here....

> +        migration_channel.send(file)?;
> +    }
> +
> +    drop(migration_channel);
> +    migration_pool.complete()?;
> +
> +    let elapsed = start_time.elapsed()?.as_secs_f64();
> +    let guests = guests.load(std::sync::atomic::Ordering::SeqCst);
> +    println!("Migrated {} guests", guests);
> +    println!("It took {:.2}s", elapsed);
> +
> +    Ok(())
> +}
> +
> +/// Migrate node RRD files
> +///
> +/// In serial as the number of nodes will not be high.
> +fn migrate_nodes(
> +    source_dir_nodes: PathBuf,
> +    target_dir_nodes: PathBuf,
> +    resources: &str,

Any reason why this one is a &str instead of a PathBuf? As far as I can
tell it is also a path (/etc/pve by default). Also the
name of the variable somehow makes it not really clear that this is
suppose to be a path, I only deduced it from RESOURCE_BASE_DIR.

> +    migrate: bool,
> +    force: bool,
> +) -> Result<(), Error> {
> +    println!("Migrating RRD data for nodes…");
> +
> +    if !target_dir_nodes.exists() && migrate {
> +        println!("Creating new directory: '{}'", target_dir_nodes.display());
> +        std::fs::create_dir(&target_dir_nodes)?;
> +    }
> +
> +    let node_source_files = collect_rrd_files(&source_dir_nodes)?;
> +
> +    for file in node_source_files {
> +        let node = file.1.clone().into_string().unwrap();
> +        let full_path = file.0.clone().into_string().unwrap();

Please add some comment why it is okay to .unwrap here (or just return
or ignore the error, if that makes more sense).


> +        println!("Node: '{node}'");
> +        if !resource_present(format!("{resources}/.members").as_str(), node.as_str())? {

You can just use &format!... and &node instead of the .as_str() calls
(a bit nicer to read and more idionmatic, but no hard feelings).

> +            println!("Node: '{node}' not present. Skip and mark as old.");
> +            mv_old(format!("{}/{}", file.0.to_string_lossy(), node).as_str())?;
> +        }
> +        if let Ok(()) = do_rrd_migration(
> +            file,
> +            &target_dir_nodes,
> +            RRD_NODE_DEF.as_slice(),
> +            migrate,
> +            force,
> +        ) {


Since do_rrd_migration does not return any data in the Option, you could
just 

if do_rrd_migration(....).is_ok() {
   ....
}

> +            mv_old(full_path.as_str())?;
> +        }
> +    }
> +    println!("Migrated all nodes");
> +
> +    Ok(())
> +}
> +
> +/// Migrate storage RRD files
> +///
> +/// In serial as the number of storage will not be that high.
> +fn migrate_storage(
> +    source_dir_storage: PathBuf,
> +    target_dir_storage: PathBuf,
> +    migrate: bool,
> +    force: bool,
> +) -> Result<(), Error> {
> +    println!("Migrating RRD data for storages…");
> +
> +    if !target_dir_storage.exists() && migrate {
> +        println!("Creating new directory: '{}'", target_dir_storage.display());
> +        std::fs::create_dir(&target_dir_storage)?;
> +    }
> +
> +    // storage has another layer of directories per node over which we need to iterate
> +    fs::read_dir(&source_dir_storage)?
> +        .filter(|f| f.is_ok())
> +        .map(|f| f.unwrap().path())
> +        .filter(|f| f.is_dir())

you can use filter_map here, as explained in collect_rrd_files

> +        .try_for_each(|node| {
> +            let mut source_storage_subdir = source_dir_storage.clone();
> +            source_storage_subdir.push(node.file_name().unwrap());
> +
> +            let mut target_storage_subdir = target_dir_storage.clone();
> +            target_storage_subdir.push(node.file_name().unwrap());
> +
> +            if !target_storage_subdir.exists() && migrate {
> +                fs::create_dir(target_storage_subdir.as_path())?;
You can use & here instead of .as_path() :)

> +                let metadata = target_storage_subdir.metadata()?;
> +                let mut permissions = metadata.permissions();
> +                permissions.set_mode(0o755);

You need to actually apply the permissions to the dir, here you only set
the permission bits in the Permissions data type.

std::fs::set_permissions(...)

> +            }
> +
> +            let storage_source_files = collect_rrd_files(&source_storage_subdir)?;
> +
> +            for file in storage_source_files {
> +                println!(
> +                    "Storage: '{}/{}'",
> +                    node.file_name()
> +                        .expect("no file name present")

Same thing here regarding the potential panic

> +                        .to_string_lossy(),
> +                    PathBuf::from(file.1.clone()).display()

Starting with rustc 1.87, you can directly call file.1.display() on the underlying OsStr(ing).

> +                );
> +
> +                let full_path = file.0.clone().into_string().unwrap();
> +                if let Ok(()) = do_rrd_migration(
> +                    file,
> +                    &target_storage_subdir,
> +                    RRD_STORAGE_DEF.as_slice(),
> +                    migrate,
> +                    force,
> +                ) {
> +                    mv_old(full_path.as_str())?;
> +                }

Since do_rrd_migration does not return any data in the Option, you could
just 

if do_rrd_migration(....).is_ok() {
   ....
}

> +            }
> +            Ok::<(), Error>(())
> +        })?;
> +    println!("Migrated all storages");
> +
> +    Ok(())
> +}
> diff --git a/src/parallel_handler.rs b/src/parallel_handler.rs
> new file mode 100644
> index 0000000..d8ee3c7
> --- /dev/null
> +++ b/src/parallel_handler.rs
> @@ -0,0 +1,160 @@
> +//! A thread pool which run a closure in parallel.
> +
> +use std::sync::{Arc, Mutex};
> +use std::thread::JoinHandle;
> +
> +use anyhow::{bail, format_err, Error};
> +use crossbeam_channel::{bounded, Sender};
> +
> +/// A handle to send data to the worker thread (implements clone)
> +pub struct SendHandle<I> {
> +    input: Sender<I>,
> +    abort: Arc<Mutex<Option<String>>>,
> +}
> +
> +/// Returns the first error happened, if any
> +pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
> +    let guard = abort.lock().unwrap();
> +    if let Some(err_msg) = &*guard {
> +        return Err(format_err!("{}", err_msg));
> +    }
> +    Ok(())
> +}
> +
> +impl<I: Send> SendHandle<I> {
> +    /// Send data to the worker threads
> +    pub fn send(&self, input: I) -> Result<(), Error> {
> +        check_abort(&self.abort)?;
> +        match self.input.send(input) {
> +            Ok(()) => Ok(()),
> +            Err(_) => bail!("send failed - channel closed"),
> +        }

might be more idiomatic to use .map_err here

> +    }
> +}
> +
> +/// A thread pool which run the supplied closure
> +///
> +/// The send command sends data to the worker threads. If one handler
> +/// returns an error, we mark the channel as failed and it is no
> +/// longer possible to send data.
> +///
> +/// When done, the 'complete()' method needs to be called to check for
> +/// outstanding errors.
> +pub struct ParallelHandler<I> {
> +    handles: Vec<JoinHandle<()>>,
> +    name: String,
> +    input: Option<SendHandle<I>>,
> +}
> +
> +impl<I> Clone for SendHandle<I> {
> +    fn clone(&self) -> Self {
> +        Self {
> +            input: self.input.clone(),
> +            abort: Arc::clone(&self.abort),
> +        }
> +    }
> +}
> +
> +impl<I: Send + 'static> ParallelHandler<I> {
> +    /// Create a new thread pool, each thread processing incoming data
> +    /// with 'handler_fn'.
> +    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
> +    where
> +        F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
> +    {
> +        let mut handles = Vec::new();
> +        let (input_tx, input_rx) = bounded::<I>(threads);
> +
> +        let abort = Arc::new(Mutex::new(None));
> +
> +        for i in 0..threads {
> +            let input_rx = input_rx.clone();
> +            let abort = Arc::clone(&abort);
> +            let handler_fn = handler_fn.clone();
> +
> +            handles.push(
> +                std::thread::Builder::new()
> +                    .name(format!("{} ({})", name, i))
> +                    .spawn(move || loop {
> +                        let data = match input_rx.recv() {
> +                            Ok(data) => data,
> +                            Err(_) => return,
> +                        };
> +                        if let Err(err) = (handler_fn)(data) {
> +                            let mut guard = abort.lock().unwrap();

.unwrap on Mutex::lock is fine IMO, but should have a comment explaining
that it only .unwrap's on a poisioned mutex.

> +                            if guard.is_none() {
> +                                *guard = Some(err.to_string());
> +                            }
> +                        }
> +                    })
> +                    .unwrap(),

This shouldn't .unwrap() IMO, rather return an error from this function.

> +            );
> +        }
> +        Self {
> +            handles,
> +            name: name.to_string(),
> +            input: Some(SendHandle {
> +                input: input_tx,
> +                abort,
> +            }),
> +        }
> +    }
> +
> +    /// Returns a cloneable channel to send data to the worker threads
> +    pub fn channel(&self) -> SendHandle<I> {
> +        self.input.as_ref().unwrap().clone()

Please add a comment why .unwrap is okay here or bubble some error up

> +    }
> +
> +    /// Send data to the worker threads
> +    pub fn send(&self, input: I) -> Result<(), Error> {
> +        self.input.as_ref().unwrap().send(input)?;

Please add a comment why .unwrap is okay here or bubble some error up

> +        Ok(())
> +    }
> +
> +    /// Wait for worker threads to complete and check for errors
> +    pub fn complete(mut self) -> Result<(), Error> {
> +        let input = self.input.take().unwrap();
> +        let abort = Arc::clone(&input.abort);
> +        check_abort(&abort)?;
> +        drop(input);
> +
> +        let msg_list = self.join_threads();
> +
> +        // an error might be encountered while waiting for the join
> +        check_abort(&abort)?;
> +
> +        if msg_list.is_empty() {
> +            return Ok(());
> +        }
> +        Err(format_err!("{}", msg_list.join("\n")))

I'd rather

if !msg_list.is_empty() {
    bail!("{}", msg_list.join('\n'));
}

Ok(())

> +    }
> +
> +    fn join_threads(&mut self) -> Vec<String> {
> +        let mut msg_list = Vec::new();
> +
> +        let mut i = 0;
> +        while let Some(handle) = self.handles.pop() {
> +            if let Err(panic) = handle.join() {
> +                if let Some(panic_msg) = panic.downcast_ref::<&str>() {
> +                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> +                } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
> +                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
> +                } else {
> +                    msg_list.push(format!("thread {} ({i}) panicked", self.name));
> +                }
> +            }
> +            i += 1;
> +        }
> +        msg_list
> +    }
> +}
> +
> +// Note: We make sure that all threads will be joined
> +impl<I> Drop for ParallelHandler<I> {
> +    fn drop(&mut self) {
> +        drop(self.input.take());
> +        while let Some(handle) = self.handles.pop() {
> +            let _ = handle.join();
> +        }
> +    }
> +}
> diff --git a/wrapper.h b/wrapper.h
> new file mode 100644
> index 0000000..64d0aa6
> --- /dev/null
> +++ b/wrapper.h
> @@ -0,0 +1 @@
> +#include <rrd.h>





More information about the pve-devel mailing list