[pbs-devel] [PATCH vma-to-pbs 2/4] add support for bulk import of a dump directory

Filip Schauer f.schauer at proxmox.com
Wed Jul 24 18:18:54 CEST 2024


When no vmid is provided, treat the vma_file argument as a path to a
directory containing VMA backups. This also handles compressed VMA
files, notes and logs.

This makes it ideal for use on a dump directory:

PBS_FINGERPRINT='PBS_FINGERPRINT' vma-to-pbs \
        --repository 'user at realm!token at server:port:datastore' \
        /var/lib/vz/dump

Signed-off-by: Filip Schauer <f.schauer at proxmox.com>
---
 Cargo.toml     |   3 +
 src/main.rs    | 114 +++++++++++++++++++++---
 src/vma2pbs.rs | 234 +++++++++++++++++++++++++++++++------------------
 3 files changed, 254 insertions(+), 97 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 0522902..54b5ebb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,13 +7,16 @@ edition = "2021"
 [dependencies]
 anyhow = "1.0"
 bincode = "1.3"
+chrono = "0.4"
 hyper = "0.14.5"
 pico-args = "0.4"
 md5 = "0.7.0"
+regex = "1.7"
 scopeguard = "1.1.0"
 serde = "1.0"
 serde_json = "1.0"
 serde-big-array = "0.4.1"
+walkdir = "2"
 
 proxmox-async = "0.4"
 proxmox-io = "1.0.1"
diff --git a/src/main.rs b/src/main.rs
index de789c1..233992b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,26 +1,30 @@
 use std::ffi::OsString;
+use std::path::PathBuf;
 
 use anyhow::{bail, Context, Error};
+use chrono::NaiveDateTime;
 use proxmox_sys::linux::tty;
 use proxmox_time::epoch_i64;
+use regex::Regex;
+use walkdir::WalkDir;
 
 mod vma;
 mod vma2pbs;
-use vma2pbs::{backup_vma_to_pbs, BackupVmaToPbsArgs};
+use vma2pbs::{vma2pbs, BackupVmaToPbsArgs, Compression, PbsArgs, VmaBackupArgs};
 
 const CMD_HELP: &str = "\
 Usage: vma-to-pbs [OPTIONS] --repository <auth_id at host:port:datastore> --vmid <VMID> [vma_file]
 
 Arguments:
-  [vma_file]
+  [vma_file | dump_directory]
 
 Options:
       --repository <auth_id at host:port:datastore>
           Repository URL
       [--ns <NAMESPACE>]
           Namespace
-      --vmid <VMID>
-          Backup ID
+      [--vmid <VMID>]
+          Backup ID (If not specified, bulk import all VMA backups in the provided directory)
       [--backup-time <EPOCH>]
           Backup timestamp
       --fingerprint <FINGERPRINT>
@@ -87,7 +91,7 @@ fn parse_args() -> Result<BackupVmaToPbsArgs, Error> {
 
     let pbs_repository = args.value_from_str("--repository")?;
     let namespace = args.opt_value_from_str("--ns")?;
-    let vmid = args.value_from_str("--vmid")?;
+    let vmid = args.opt_value_from_str("--vmid")?;
     let backup_time: Option<i64> = args.opt_value_from_str("--backup-time")?;
     let backup_time = backup_time.unwrap_or_else(epoch_i64);
     let fingerprint = args.opt_value_from_str("--fingerprint")?;
@@ -184,12 +188,9 @@ fn parse_args() -> Result<BackupVmaToPbsArgs, Error> {
         None
     };
 
-    let options = BackupVmaToPbsArgs {
-        vma_file_path: vma_file_path.cloned(),
+    let pbs_args = PbsArgs {
         pbs_repository,
         namespace,
-        backup_id: vmid,
-        backup_time,
         pbs_password,
         keyfile,
         key_password,
@@ -197,16 +198,105 @@ fn parse_args() -> Result<BackupVmaToPbsArgs, Error> {
         fingerprint,
         compress,
         encrypt,
-        notes,
-        log_file_path,
     };
 
+    let mut vmas = Vec::new();
+
+    if let Some(vmid) = vmid {
+        let backup_args = VmaBackupArgs {
+            vma_file_path: vma_file_path.cloned(),
+            compression: None,
+            backup_id: vmid,
+            backup_time,
+            notes,
+            log_file_path,
+        };
+        vmas.push(backup_args);
+    } else {
+        let dump_dir_path =
+            PathBuf::from(vma_file_path.expect("no directory specified for bulk import"));
+
+        if !dump_dir_path.is_dir() {
+            bail!("specified path for bulk import is not a directory");
+        }
+
+        for entry in WalkDir::new(dump_dir_path)
+            .into_iter()
+            .filter_map(Result::ok)
+        {
+            let path = entry.path();
+
+            if !path.is_file() {
+                continue;
+            }
+
+            if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
+                let re = Regex::new(
+                    r"vzdump-qemu-(\d+)-(\d{4}_\d{2}_\d{2}-\d{2}_\d{2}_\d{2}).vma(|.zst|.lzo|.gz)$",
+                )?;
+
+                let caps = match re.captures(file_name) {
+                    Some(caps) => caps,
+                    None => continue,
+                };
+
+                let Some(vmid) = caps.get(1) else { continue };
+                let Some(timestr) = caps.get(2) else { continue };
+                let Some(ext) = caps.get(3) else { continue };
+
+                let compression = match ext.as_str() {
+                    "" => None,
+                    ".zst" => Some(Compression::Zstd),
+                    ".lzo" => Some(Compression::Lzo),
+                    ".gz" => Some(Compression::GZip),
+                    _ => continue,
+                };
+
+                let backup_time =
+                    NaiveDateTime::parse_from_str(timestr.as_str(), "%Y_%m_%d-%H_%M_%S")?
+                        .timestamp();
+
+                let mut notes_path_os_string: OsString = path.into();
+                notes_path_os_string.push(".notes");
+                let notes_path: PathBuf = notes_path_os_string.into();
+                let notes = if notes_path.exists() {
+                    Some(std::fs::read_to_string(notes_path)?)
+                } else {
+                    None
+                };
+
+                let mut log_path_os_string: OsString = path.into();
+                log_path_os_string.push(".log");
+                let log_path: PathBuf = log_path_os_string.into();
+                let log_file_path = if log_path.exists() {
+                    Some(log_path.to_path_buf().into_os_string())
+                } else {
+                    None
+                };
+
+                let backup_args = VmaBackupArgs {
+                    vma_file_path: Some(path.into()),
+                    compression,
+                    backup_id: vmid.as_str().to_string(),
+                    backup_time,
+                    notes,
+                    log_file_path,
+                };
+                vmas.push(backup_args);
+            }
+        }
+
+        vmas.sort_by_key(|d| d.backup_time);
+    }
+
+    let options = BackupVmaToPbsArgs { pbs_args, vmas };
+
     Ok(options)
 }
 
 fn main() -> Result<(), Error> {
     let args = parse_args()?;
-    backup_vma_to_pbs(args)?;
+    vma2pbs(args)?;
 
     Ok(())
 }
diff --git a/src/vma2pbs.rs b/src/vma2pbs.rs
index d2ce437..3e9689d 100644
--- a/src/vma2pbs.rs
+++ b/src/vma2pbs.rs
@@ -4,6 +4,7 @@ use std::collections::HashMap;
 use std::ffi::{c_char, CStr, CString, OsString};
 use std::fs::File;
 use std::io::{stdin, BufRead, BufReader, Read};
+use std::process::{Command, Stdio};
 use std::ptr;
 use std::time::SystemTime;
 
@@ -29,11 +30,13 @@ use crate::vma::VmaReader;
 const VMA_CLUSTER_SIZE: usize = 65536;
 
 pub struct BackupVmaToPbsArgs {
-    pub vma_file_path: Option<OsString>,
+    pub pbs_args: PbsArgs,
+    pub vmas: Vec<VmaBackupArgs>,
+}
+
+pub struct PbsArgs {
     pub pbs_repository: String,
     pub namespace: Option<String>,
-    pub backup_id: String,
-    pub backup_time: i64,
     pub pbs_password: String,
     pub keyfile: Option<String>,
     pub key_password: Option<String>,
@@ -41,6 +44,19 @@ pub struct BackupVmaToPbsArgs {
     pub fingerprint: String,
     pub compress: bool,
     pub encrypt: bool,
+}
+
+pub enum Compression {
+    Zstd,
+    Lzo,
+    GZip,
+}
+
+pub struct VmaBackupArgs {
+    pub vma_file_path: Option<OsString>,
+    pub compression: Option<Compression>,
+    pub backup_id: String,
+    pub backup_time: i64,
     pub notes: Option<String>,
     pub log_file_path: Option<OsString>,
 }
@@ -61,25 +77,25 @@ fn handle_pbs_error(pbs_err: *mut c_char, function_name: &str) -> Result<(), Err
     bail!("{function_name} failed: {pbs_err_str}");
 }
 
-fn create_pbs_backup_task(args: &BackupVmaToPbsArgs) -> Result<*mut ProxmoxBackupHandle, Error> {
-    println!("PBS repository: {}", args.pbs_repository);
-    if let Some(ns) = &args.namespace {
+fn create_pbs_backup_task(pbs_args: &PbsArgs, backup_args: &VmaBackupArgs) -> Result<*mut ProxmoxBackupHandle, Error> {
+    println!("PBS repository: {}", pbs_args.pbs_repository);
+    if let Some(ns) = &pbs_args.namespace {
         println!("PBS namespace: {}", ns);
     }
-    println!("PBS fingerprint: {}", args.fingerprint);
-    println!("compress: {}", args.compress);
-    println!("encrypt: {}", args.encrypt);
+    println!("PBS fingerprint: {}", pbs_args.fingerprint);
+    println!("compress: {}", pbs_args.compress);
+    println!("encrypt: {}", pbs_args.encrypt);
 
-    println!("backup time: {}", epoch_to_rfc3339(args.backup_time)?);
+    println!("backup time: {}", epoch_to_rfc3339(backup_args.backup_time)?);
 
     let mut pbs_err: *mut c_char = ptr::null_mut();
 
-    let pbs_repository_cstr = CString::new(args.pbs_repository.as_str())?;
-    let ns_cstr = CString::new(args.namespace.as_deref().unwrap_or(""))?;
-    let backup_id_cstr = CString::new(args.backup_id.as_str())?;
-    let pbs_password_cstr = CString::new(args.pbs_password.as_str())?;
-    let fingerprint_cstr = CString::new(args.fingerprint.as_str())?;
-    let keyfile_cstr = args
+    let pbs_repository_cstr = CString::new(pbs_args.pbs_repository.as_str())?;
+    let ns_cstr = CString::new(pbs_args.namespace.as_deref().unwrap_or(""))?;
+    let backup_id_cstr = CString::new(backup_args.backup_id.as_str())?;
+    let pbs_password_cstr = CString::new(pbs_args.pbs_password.as_str())?;
+    let fingerprint_cstr = CString::new(pbs_args.fingerprint.as_str())?;
+    let keyfile_cstr = pbs_args
         .keyfile
         .as_ref()
         .map(|v| CString::new(v.as_str()).unwrap());
@@ -87,7 +103,7 @@ fn create_pbs_backup_task(args: &BackupVmaToPbsArgs) -> Result<*mut ProxmoxBacku
         .as_ref()
         .map(|v| v.as_ptr())
         .unwrap_or(ptr::null());
-    let key_password_cstr = args
+    let key_password_cstr = pbs_args
         .key_password
         .as_ref()
         .map(|v| CString::new(v.as_str()).unwrap());
@@ -95,7 +111,7 @@ fn create_pbs_backup_task(args: &BackupVmaToPbsArgs) -> Result<*mut ProxmoxBacku
         .as_ref()
         .map(|v| v.as_ptr())
         .unwrap_or(ptr::null());
-    let master_keyfile_cstr = args
+    let master_keyfile_cstr = pbs_args
         .master_keyfile
         .as_ref()
         .map(|v| CString::new(v.as_str()).unwrap());
@@ -108,14 +124,14 @@ fn create_pbs_backup_task(args: &BackupVmaToPbsArgs) -> Result<*mut ProxmoxBacku
         pbs_repository_cstr.as_ptr(),
         ns_cstr.as_ptr(),
         backup_id_cstr.as_ptr(),
-        args.backup_time as u64,
+        backup_args.backup_time as u64,
         PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE,
         pbs_password_cstr.as_ptr(),
         keyfile_ptr,
         key_password_ptr,
         master_keyfile_ptr,
-        args.compress,
-        args.encrypt,
+        pbs_args.compress,
+        pbs_args.encrypt,
         fingerprint_cstr.as_ptr(),
         &mut pbs_err,
     );
@@ -361,17 +377,24 @@ where
     Ok(())
 }
 
-fn pbs_client_setup(args: &BackupVmaToPbsArgs) -> Result<(HttpClient, String, Value), Error> {
-    let repo: BackupRepository = args.pbs_repository.parse()?;
+fn pbs_client_setup(
+    pbs_args: &PbsArgs,
+    backup_args: &VmaBackupArgs,
+) -> Result<(HttpClient, String, Value), Error> {
+    let repo: BackupRepository = pbs_args.pbs_repository.parse()?;
     let options = HttpClientOptions::new_interactive(
-        Some(args.pbs_password.clone()),
-        Some(args.fingerprint.clone()),
+        Some(pbs_args.pbs_password.clone()),
+        Some(pbs_args.fingerprint.clone()),
     );
     let client = HttpClient::new(repo.host(), repo.port(), repo.auth_id(), options)?;
 
-    let backup_dir = BackupDir::from((BackupType::Vm, args.backup_id.clone(), args.backup_time));
+    let backup_dir = BackupDir::from((
+        BackupType::Vm,
+        backup_args.backup_id.clone(),
+        backup_args.backup_time,
+    ));
 
-    let namespace = match &args.namespace {
+    let namespace = match &pbs_args.namespace {
         Some(namespace) => BackupNamespace::new(namespace)?,
         None => BackupNamespace::root(),
     };
@@ -386,45 +409,44 @@ fn pbs_client_setup(args: &BackupVmaToPbsArgs) -> Result<(HttpClient, String, Va
 
 fn upload_log(
     client: &HttpClient,
-    args: &BackupVmaToPbsArgs,
+    log_file_path: &OsString,
+    pbs_args: &PbsArgs,
     store: &str,
     request_args: Value,
 ) -> Result<(), Error> {
-    if let Some(log_file_path) = &args.log_file_path {
-        let path = format!("api2/json/admin/datastore/{}/upload-backup-log", store);
-        let data = std::fs::read(log_file_path)?;
-
-        let blob = if args.encrypt {
-            let crypt_config = match &args.keyfile {
-                None => None,
-                Some(keyfile) => {
-                    let key = std::fs::read(keyfile)?;
-                    let (key, _created, _) = decrypt_key(&key, &|| -> Result<Vec<u8>, Error> {
-                        match &args.key_password {
-                            Some(key_password) => Ok(key_password.clone().into_bytes()),
-                            None => bail!("no key password provided"),
-                        }
-                    })?;
-                    let crypt_config = CryptConfig::new(key)?;
-                    Some(crypt_config)
-                }
-            };
-
-            DataBlob::encode(&data, crypt_config.as_ref(), args.compress)?
-        } else {
-            // fixme: howto sign log?
-            DataBlob::encode(&data, None, args.compress)?
+    let path = format!("api2/json/admin/datastore/{}/upload-backup-log", store);
+    let data = std::fs::read(log_file_path)?;
+
+    let blob = if pbs_args.encrypt {
+        let crypt_config = match &pbs_args.keyfile {
+            None => None,
+            Some(keyfile) => {
+                let key = std::fs::read(keyfile)?;
+                let (key, _created, _) = decrypt_key(&key, &|| -> Result<Vec<u8>, Error> {
+                    match &pbs_args.key_password {
+                        Some(key_password) => Ok(key_password.clone().into_bytes()),
+                        None => bail!("no key password provided"),
+                    }
+                })?;
+                let crypt_config = CryptConfig::new(key)?;
+                Some(crypt_config)
+            }
         };
 
-        let body = hyper::Body::from(blob.into_inner());
+        DataBlob::encode(&data, crypt_config.as_ref(), pbs_args.compress)?
+    } else {
+        // fixme: howto sign log?
+        DataBlob::encode(&data, None, pbs_args.compress)?
+    };
 
-        block_on(async {
-            client
-                .upload("application/octet-stream", body, &path, Some(request_args))
-                .await
-                .unwrap();
-        });
-    }
+    let body = hyper::Body::from(blob.into_inner());
+
+    block_on(async {
+        client
+            .upload("application/octet-stream", body, &path, Some(request_args))
+            .await
+            .unwrap();
+    });
 
     Ok(())
 }
@@ -444,17 +466,64 @@ fn set_notes(
     Ok(())
 }
 
-pub fn backup_vma_to_pbs(args: BackupVmaToPbsArgs) -> Result<(), Error> {
-    let vma_file: Box<dyn BufRead> = match &args.vma_file_path {
-        Some(vma_file_path) => match File::open(vma_file_path) {
-            Err(why) => return Err(anyhow!("Couldn't open file: {}", why)),
-            Ok(file) => Box::new(BufReader::new(file)),
+pub fn vma2pbs(args: BackupVmaToPbsArgs) -> Result<(), Error> {
+    let start_transfer_time = SystemTime::now();
+
+    for backup_args in args.vmas {
+        upload_vma_file(&args.pbs_args, backup_args)?;
+    }
+
+    let transfer_duration = SystemTime::now().duration_since(start_transfer_time)?;
+    let total_seconds = transfer_duration.as_secs();
+    let minutes = total_seconds / 60;
+    let seconds = total_seconds % 60;
+    let milliseconds = transfer_duration.as_millis() % 1000;
+    println!("Backup finished within {minutes} minutes, {seconds} seconds and {milliseconds} ms");
+
+    Ok(())
+}
+
+fn upload_vma_file(pbs_args: &PbsArgs, backup_args: VmaBackupArgs) -> Result<(), Error> {
+    match &backup_args.vma_file_path {
+        Some(vma_file_path) => println!("Uploading VMA backup from {:?}", vma_file_path),
+        None => println!("Uploading VMA backup from (stdin)"),
+    };
+
+    let vma_file: Box<dyn BufRead> = match &backup_args.compression {
+        Some(compression) => {
+            let vma_file_path = backup_args
+                .vma_file_path
+                .as_ref()
+                .expect("No VMA file path provided");
+            let mut cmd = match compression {
+                Compression::Zstd => {
+                    let mut cmd = Command::new("zstd");
+                    cmd.args(["-q", "-d", "-c"]);
+                    cmd
+                }
+                Compression::Lzo => {
+                    let mut cmd = Command::new("lzop");
+                    cmd.args(["-d", "-c"]);
+                    cmd
+                }
+                Compression::GZip => Command::new("zcat"),
+            };
+            let process = cmd.arg(vma_file_path).stdout(Stdio::piped()).spawn()?;
+            let stdout = process.stdout.expect("Failed to capture stdout");
+            Box::new(BufReader::new(stdout))
+        }
+        None => match &backup_args.vma_file_path {
+            Some(vma_file_path) => match File::open(vma_file_path) {
+                Err(why) => return Err(anyhow!("Couldn't open file: {}", why)),
+                Ok(file) => Box::new(BufReader::new(file)),
+            },
+            None => Box::new(BufReader::new(stdin())),
         },
-        None => Box::new(BufReader::new(stdin())),
     };
+
     let vma_reader = VmaReader::new(vma_file)?;
 
-    let pbs = create_pbs_backup_task(&args)?;
+    let pbs = create_pbs_backup_task(pbs_args, &backup_args)?;
 
     defer! {
         proxmox_backup_disconnect(pbs);
@@ -467,10 +536,6 @@ pub fn backup_vma_to_pbs(args: BackupVmaToPbsArgs) -> Result<(), Error> {
         handle_pbs_error(pbs_err, "proxmox_backup_connect")?;
     }
 
-    println!("Connected to Proxmox Backup Server");
-
-    let start_transfer_time = SystemTime::now();
-
     upload_configs(&vma_reader, pbs)?;
     upload_block_devices(vma_reader, pbs)?;
 
@@ -478,24 +543,23 @@ pub fn backup_vma_to_pbs(args: BackupVmaToPbsArgs) -> Result<(), Error> {
         handle_pbs_error(pbs_err, "proxmox_backup_finish")?;
     }
 
-    if args.notes.is_some() || args.log_file_path.is_some() {
-        let (client, store, request_args) = pbs_client_setup(&args)?;
-
-        if args.log_file_path.is_some() {
-            upload_log(&client, &args, &store, request_args.clone())?;
+    if backup_args.notes.is_some() || backup_args.log_file_path.is_some() {
+        let (client, store, request_args) = pbs_client_setup(pbs_args, &backup_args)?;
+
+        if let Some(log_file_path) = backup_args.log_file_path {
+            upload_log(
+                &client,
+                &log_file_path,
+                pbs_args,
+                &store,
+                request_args.clone(),
+            )?;
         }
 
-        if let Some(notes) = args.notes {
-            set_notes(&client, &notes, &store, request_args)?;
+        if let Some(notes) = &backup_args.notes {
+            set_notes(&client, notes, &store, request_args)?;
         }
     }
 
-    let transfer_duration = SystemTime::now().duration_since(start_transfer_time)?;
-    let total_seconds = transfer_duration.as_secs();
-    let minutes = total_seconds / 60;
-    let seconds = total_seconds % 60;
-    let milliseconds = transfer_duration.as_millis() % 1000;
-    println!("Backup finished within {minutes} minutes, {seconds} seconds and {milliseconds} ms");
-
     Ok(())
 }
-- 
2.39.2





More information about the pbs-devel mailing list