[pbs-devel] [PATCH proxmox-backup 07/15] client: factor out UploadOptions

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon Jan 25 14:42:52 CET 2021


to reduce function signature complexity.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
breaks/requires corresponding patch in proxmox-backup-qemu

 src/bin/proxmox-backup-client.rs | 79 ++++++++++++++++++++++----------
 src/client/backup_writer.rs      | 44 ++++++++++--------
 2 files changed, 80 insertions(+), 43 deletions(-)

diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs
index dce8f0b8..af3b8464 100644
--- a/src/bin/proxmox-backup-client.rs
+++ b/src/bin/proxmox-backup-client.rs
@@ -280,7 +280,6 @@ pub async fn api_datastore_latest_snapshot(
 
 async fn backup_directory<P: AsRef<Path>>(
     client: &BackupWriter,
-    previous_manifest: Option<Arc<BackupManifest>>,
     dir_path: P,
     archive_name: &str,
     chunk_size: Option<usize>,
@@ -290,8 +289,7 @@ async fn backup_directory<P: AsRef<Path>>(
     catalog: Arc<Mutex<CatalogWriter<crate::tools::StdChannelWriter>>>,
     exclude_pattern: Vec<MatchEntry>,
     entries_max: usize,
-    compress: bool,
-    encrypt: bool,
+    upload_options: UploadOptions,
 ) -> Result<BackupStats, Error> {
 
     let pxar_stream = PxarBackupStream::open(
@@ -317,8 +315,12 @@ async fn backup_directory<P: AsRef<Path>>(
         }
     });
 
+    if upload_options.fixed_size.is_some() {
+        bail!("cannot backup directory with fixed chunk size!");
+    }
+
     let stats = client
-        .upload_stream(previous_manifest, archive_name, stream, "dynamic", None, compress, encrypt)
+        .upload_stream(archive_name, stream, upload_options)
         .await?;
 
     Ok(stats)
@@ -326,14 +328,10 @@ async fn backup_directory<P: AsRef<Path>>(
 
 async fn backup_image<P: AsRef<Path>>(
     client: &BackupWriter,
-    previous_manifest: Option<Arc<BackupManifest>>,
     image_path: P,
     archive_name: &str,
-    image_size: u64,
     chunk_size: Option<usize>,
-    compress: bool,
-    encrypt: bool,
-    _verbose: bool,
+    upload_options: UploadOptions,
 ) -> Result<BackupStats, Error> {
 
     let path = image_path.as_ref().to_owned();
@@ -345,8 +343,12 @@ async fn backup_image<P: AsRef<Path>>(
 
     let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024));
 
+    if upload_options.fixed_size.is_none() {
+        bail!("cannot backup image with dynamic chunk size!");
+    }
+
     let stats = client
-        .upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size), compress, encrypt)
+        .upload_stream(archive_name, stream, upload_options)
         .await?;
 
     Ok(stats)
@@ -604,9 +606,15 @@ fn spawn_catalog_upload(
 
     let (catalog_result_tx, catalog_result_rx) = tokio::sync::oneshot::channel();
 
+    let upload_options = UploadOptions {
+        encrypt,
+        compress: true,
+        ..UploadOptions::default()
+    };
+
     tokio::spawn(async move {
         let catalog_upload_result = client
-            .upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None, true, encrypt)
+            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
             .await;
 
         if let Err(ref err) = catalog_upload_result {
@@ -995,16 +1003,28 @@ async fn create_backup(
     for (backup_type, filename, target, size) in upload_list {
         match backup_type {
             BackupSpecificationType::CONFIG => {
+                let upload_options = UploadOptions {
+                    compress: true,
+                    encrypt: crypt_mode == CryptMode::Encrypt,
+                    ..UploadOptions::default()
+                };
+
                 println!("Upload config file '{}' to '{}' as {}", filename, repo, target);
                 let stats = client
-                    .upload_blob_from_file(&filename, &target, true, crypt_mode == CryptMode::Encrypt)
+                    .upload_blob_from_file(&filename, &target, upload_options)
                     .await?;
                 manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
             }
             BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ?
+                let upload_options = UploadOptions {
+                    compress: true,
+                    encrypt: crypt_mode == CryptMode::Encrypt,
+                    ..UploadOptions::default()
+                };
+
                 println!("Upload log file '{}' to '{}' as {}", filename, repo, target);
                 let stats = client
-                    .upload_blob_from_file(&filename, &target, true, crypt_mode == CryptMode::Encrypt)
+                    .upload_blob_from_file(&filename, &target, upload_options)
                     .await?;
                 manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
             }
@@ -1019,9 +1039,15 @@ async fn create_backup(
 
                 println!("Upload directory '{}' to '{}' as {}", filename, repo, target);
                 catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
+                let upload_options = UploadOptions {
+                    previous_manifest: previous_manifest.clone(),
+                    compress: true,
+                    encrypt: crypt_mode == CryptMode::Encrypt,
+                    ..UploadOptions::default()
+                };
+
                 let stats = backup_directory(
                     &client,
-                    previous_manifest.clone(),
                     &filename,
                     &target,
                     chunk_size_opt,
@@ -1031,24 +1057,27 @@ async fn create_backup(
                     catalog.clone(),
                     pattern_list.clone(),
                     entries_max as usize,
-                    true,
-                    crypt_mode == CryptMode::Encrypt,
+                    upload_options,
                 ).await?;
                 manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
                 catalog.lock().unwrap().end_directory()?;
             }
             BackupSpecificationType::IMAGE => {
                 println!("Upload image '{}' to '{:?}' as {}", filename, repo, target);
+
+                let upload_options = UploadOptions {
+                    previous_manifest: previous_manifest.clone(),
+                    fixed_size: Some(size),
+                    compress: true,
+                    encrypt: crypt_mode == CryptMode::Encrypt,
+                };
+
                 let stats = backup_image(
                     &client,
-                    previous_manifest.clone(),
-                     &filename,
+                    &filename,
                     &target,
-                    size,
                     chunk_size_opt,
-                    true,
-                    crypt_mode == CryptMode::Encrypt,
-                    verbose,
+                    upload_options,
                 ).await?;
                 manifest.add_file(target, stats.size, stats.csum, crypt_mode)?;
             }
@@ -1074,8 +1103,9 @@ async fn create_backup(
     if let Some(rsa_encrypted_key) = rsa_encrypted_key {
         let target = ENCRYPTED_KEY_BLOB_NAME;
         println!("Upload RSA encoded key to '{:?}' as {}", repo, target);
+        let options = UploadOptions { compress: false, encrypt: false, ..UploadOptions::default() };
         let stats = client
-            .upload_blob_from_data(rsa_encrypted_key, target, false, false)
+            .upload_blob_from_data(rsa_encrypted_key, target, options)
             .await?;
         manifest.add_file(target.to_string(), stats.size, stats.csum, crypt_mode)?;
 
@@ -1087,8 +1117,9 @@ async fn create_backup(
 
 
     if verbose { println!("Upload index.json to '{}'", repo) };
+    let options = UploadOptions { compress: true, encrypt: false, ..UploadOptions::default() };
     client
-        .upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, true, false)
+        .upload_blob_from_data(manifest.into_bytes(), MANIFEST_BLOB_NAME, options)
         .await?;
 
     client.finish().await?;
diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
index e7a6d6bd..38953a45 100644
--- a/src/client/backup_writer.rs
+++ b/src/client/backup_writer.rs
@@ -39,6 +39,15 @@ pub struct BackupStats {
     pub csum: [u8; 32],
 }
 
+/// Options for uploading blobs/streams to the server
+#[derive(Default, Clone)]
+pub struct UploadOptions {
+    pub previous_manifest: Option<Arc<BackupManifest>>,
+    pub compress: bool,
+    pub encrypt: bool,
+    pub fixed_size: Option<u64>,
+}
+
 type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
 
@@ -168,13 +177,12 @@ impl BackupWriter {
         &self,
         data: Vec<u8>,
         file_name: &str,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
-        let blob = match (encrypt, &self.crypt_config) {
-             (false, _) => DataBlob::encode(&data, None, compress)?,
+        let blob = match (options.encrypt, &self.crypt_config) {
+             (false, _) => DataBlob::encode(&data, None, options.compress)?,
              (true, None) => bail!("requested encryption without a crypt config"),
-             (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), compress)?,
+             (true, Some(crypt_config)) => DataBlob::encode(&data, Some(crypt_config), options.compress)?,
         };
 
         let raw_data = blob.into_inner();
@@ -190,8 +198,7 @@ impl BackupWriter {
         &self,
         src_path: P,
         file_name: &str,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
 
         let src_path = src_path.as_ref();
@@ -206,34 +213,33 @@ impl BackupWriter {
             .await
             .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?;
 
-        self.upload_blob_from_data(contents, file_name, compress, encrypt).await
+        self.upload_blob_from_data(contents, file_name, options).await
     }
 
     pub async fn upload_stream(
         &self,
-        previous_manifest: Option<Arc<BackupManifest>>,
         archive_name: &str,
         stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
-        prefix: &str,
-        fixed_size: Option<u64>,
-        compress: bool,
-        encrypt: bool,
+        options: UploadOptions,
     ) -> Result<BackupStats, Error> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
         let mut param = json!({ "archive-name": archive_name });
-        if let Some(size) = fixed_size {
+        let prefix = if let Some(size) = options.fixed_size {
             param["size"] = size.into();
-        }
+            "fixed"
+        } else {
+            "dynamic"
+        };
 
-        if encrypt && self.crypt_config.is_none() {
+        if options.encrypt && self.crypt_config.is_none() {
             bail!("requested encryption without a crypt config");
         }
 
         let index_path = format!("{}_index", prefix);
         let close_path = format!("{}_close", prefix);
 
-        if let Some(manifest) = previous_manifest {
+        if let Some(manifest) = options.previous_manifest {
             // try, but ignore errors
             match archive_type(archive_name) {
                 Ok(ArchiveType::FixedIndex) => {
@@ -255,8 +261,8 @@ impl BackupWriter {
                 stream,
                 &prefix,
                 known_chunks.clone(),
-                if encrypt { self.crypt_config.clone() } else { None },
-                compress,
+                if options.encrypt { self.crypt_config.clone() } else { None },
+                options.compress,
                 self.verbose,
             )
             .await?;
-- 
2.20.1






More information about the pbs-devel mailing list