[pbs-devel] [PATCH proxmox-backup 1/3] pull: rustfmt

Fabian Grünbichler f.gruenbichler at proxmox.com
Fri Jan 15 11:48:53 CET 2021


Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
this module should be fairly quiet at the moment, so a good opportunity
to get this out of the way ;)

 src/client/pull.rs | 250 ++++++++++++++++++++++++++++++---------------
 1 file changed, 168 insertions(+), 82 deletions(-)

diff --git a/src/client/pull.rs b/src/client/pull.rs
index ed8256fa..f6ef7cde 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -2,22 +2,21 @@
 
 use anyhow::{bail, format_err, Error};
 use serde_json::json;
+use std::collections::{HashMap, HashSet};
 use std::convert::TryFrom;
-use std::sync::{Arc, Mutex};
-use std::collections::{HashSet, HashMap};
 use std::io::{Seek, SeekFrom};
-use std::time::SystemTime;
 use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::time::SystemTime;
 
-use proxmox::api::error::{StatusCode, HttpError};
 use crate::{
-    tools::{ParallelHandler, compute_file_csum},
-    server::WorkerTask,
-    backup::*,
     api2::types::*,
+    backup::*,
     client::*,
+    server::WorkerTask,
+    tools::{compute_file_csum, ParallelHandler},
 };
-
+use proxmox::api::error::{HttpError, StatusCode};
 
 // fixme: implement filters
 // fixme: delete vanished groups
@@ -28,9 +27,8 @@ async fn pull_index_chunks<I: IndexFile>(
     chunk_reader: RemoteChunkReader,
     target: Arc<DataStore>,
     index: I,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     use futures::stream::{self, StreamExt, TryStreamExt};
 
     let start_time = SystemTime::now();
@@ -47,18 +45,19 @@ async fn pull_index_chunks<I: IndexFile>(
                     guard.insert(info.digest);
                 }
                 !done
-            })
+            }),
     );
 
     let target2 = target.clone();
     let verify_pool = ParallelHandler::new(
-        "sync chunk writer", 4,
-        move |(chunk, digest, size): (DataBlob, [u8;32], u64)|  {
+        "sync chunk writer",
+        4,
+        move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
             target2.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+        },
     );
 
     let verify_and_write_channel = verify_pool.channel();
@@ -67,14 +66,15 @@ async fn pull_index_chunks<I: IndexFile>(
 
     stream
         .map(|info| {
-
             let target = Arc::clone(&target);
             let chunk_reader = chunk_reader.clone();
             let bytes = Arc::clone(&bytes);
             let verify_and_write_channel = verify_and_write_channel.clone();
 
             Ok::<_, Error>(async move {
-                let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?;
+                let chunk_exists = crate::tools::runtime::block_in_place(|| {
+                    target.cond_touch_chunk(&info.digest, false)
+                })?;
                 if chunk_exists {
                     //worker.log(format!("chunk {} exists {}", pos, proxmox::tools::digest_to_hex(digest)));
                     return Ok::<_, Error>(());
@@ -84,12 +84,14 @@ async fn pull_index_chunks<I: IndexFile>(
                 let raw_size = chunk.raw_size() as usize;
 
                 // decode, verify and write in a separate threads to maximize throughput
-                crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?;
+                crate::tools::runtime::block_in_place(|| {
+                    verify_and_write_channel.send((chunk, info.digest, info.size()))
+                })?;
 
                 bytes.fetch_add(raw_size, Ordering::SeqCst);
 
                 Ok(())
-           })
+            })
         })
         .try_buffer_unordered(20)
         .try_for_each(|_res| futures::future::ok(()))
@@ -103,7 +105,11 @@ async fn pull_index_chunks<I: IndexFile>(
 
     let bytes = bytes.load(Ordering::SeqCst);
 
-    worker.log(format!("downloaded {} bytes ({:.2} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed)));
+    worker.log(format!(
+        "downloaded {} bytes ({:.2} MiB/s)",
+        bytes,
+        (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+    ));
 
     Ok(())
 }
@@ -112,7 +118,6 @@ async fn download_manifest(
     reader: &BackupReader,
     filename: &std::path::Path,
 ) -> Result<std::fs::File, Error> {
-
     let mut tmp_manifest_file = std::fs::OpenOptions::new()
         .write(true)
         .create(true)
@@ -120,20 +125,23 @@ async fn download_manifest(
         .read(true)
         .open(&filename)?;
 
-    reader.download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file).await?;
+    reader
+        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
+        .await?;
 
     tmp_manifest_file.seek(SeekFrom::Start(0))?;
 
     Ok(tmp_manifest_file)
 }
 
-fn verify_archive(
-    info: &FileInfo,
-    csum: &[u8; 32],
-    size: u64,
-) -> Result<(), Error> {
+fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
-        bail!("wrong size for file '{}' ({} != {})", info.filename, info.size, size);
+        bail!(
+            "wrong size for file '{}' ({} != {})",
+            info.filename,
+            info.size,
+            size
+        );
     }
 
     if csum != &info.csum {
@@ -150,9 +158,8 @@ async fn pull_single_archive(
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
     archive_info: &FileInfo,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let archive_name = &archive_info.filename;
     let mut path = tgt_store.base_path();
     path.push(snapshot.relative_path());
@@ -172,20 +179,36 @@ async fn pull_single_archive(
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
-            let index = DynamicIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read dynamic index {:?} - {}", tmp_path, err))?;
+            let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
+                format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+            })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
+            pull_index_chunks(
+                worker,
+                chunk_reader.clone(),
+                tgt_store.clone(),
+                index,
+                downloaded_chunks,
+            )
+            .await?;
         }
         ArchiveType::FixedIndex => {
-            let index = FixedIndexReader::new(tmpfile)
-                .map_err(|err| format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err))?;
+            let index = FixedIndexReader::new(tmpfile).map_err(|err| {
+                format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+            })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(worker, chunk_reader.clone(), tgt_store.clone(), index, downloaded_chunks).await?;
+            pull_index_chunks(
+                worker,
+                chunk_reader.clone(),
+                tgt_store.clone(),
+                index,
+                downloaded_chunks,
+            )
+            .await?;
         }
         ArchiveType::Blob => {
             let (csum, size) = compute_file_csum(&mut tmpfile)?;
@@ -205,7 +228,6 @@ async fn try_client_log_download(
     reader: Arc<BackupReader>,
     path: &std::path::Path,
 ) -> Result<(), Error> {
-
     let mut tmp_path = path.to_owned();
     tmp_path.set_extension("tmp");
 
@@ -231,9 +253,8 @@ async fn pull_snapshot(
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let mut manifest_name = tgt_store.base_path();
     manifest_name.push(snapshot.relative_path());
     manifest_name.push(MANIFEST_BLOB_NAME);
@@ -250,34 +271,45 @@ async fn pull_snapshot(
         Ok(manifest_file) => manifest_file,
         Err(err) => {
             match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => {
-                    match *code {
-                        StatusCode::NOT_FOUND => {
-                            worker.log(format!("skipping snapshot {} - vanished since start of sync", snapshot));
-                            return Ok(());
-                        },
-                        _ => {
-                            bail!("HTTP error {} - {}", code, message);
-                        },
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        worker.log(format!(
+                            "skipping snapshot {} - vanished since start of sync",
+                            snapshot
+                        ));
+                        return Ok(());
+                    }
+                    _ => {
+                        bail!("HTTP error {} - {}", code, message);
                     }
                 },
                 None => {
                     return Err(err);
-                },
+                }
             };
-        },
+        }
     };
     let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
 
     if manifest_name.exists() {
         let manifest_blob = proxmox::try_block!({
-            let mut manifest_file = std::fs::File::open(&manifest_name)
-                .map_err(|err| format_err!("unable to open local manifest {:?} - {}", manifest_name, err))?;
+            let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
+                format_err!(
+                    "unable to open local manifest {:?} - {}",
+                    manifest_name,
+                    err
+                )
+            })?;
 
             let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
             Ok(manifest_blob)
-        }).map_err(|err: Error| {
-            format_err!("unable to read local manifest {:?} - {}", manifest_name, err)
+        })
+        .map_err(|err: Error| {
+            format_err!(
+                "unable to read local manifest {:?} - {}",
+                manifest_name,
+                err
+            )
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
@@ -332,7 +364,12 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(reader.clone(), None, item.chunk_crypt_mode(), HashMap::new());
+        let mut chunk_reader = RemoteChunkReader::new(
+            reader.clone(),
+            None,
+            item.chunk_crypt_mode(),
+            HashMap::new(),
+        );
 
         pull_single_archive(
             worker,
@@ -342,7 +379,8 @@ async fn pull_snapshot(
             snapshot,
             &item,
             downloaded_chunks.clone(),
-        ).await?;
+        )
+        .await?;
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -364,15 +402,22 @@ pub async fn pull_snapshot_from(
     reader: Arc<BackupReader>,
     tgt_store: Arc<DataStore>,
     snapshot: &BackupDir,
-    downloaded_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+    downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-
     let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&snapshot)?;
 
     if is_new {
         worker.log(format!("sync snapshot {:?}", snapshot.relative_path()));
 
-        if let Err(err) = pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await {
+        if let Err(err) = pull_snapshot(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks,
+        )
+        .await
+        {
             if let Err(cleanup_err) = tgt_store.remove_backup_dir(&snapshot, true) {
                 worker.log(format!("cleanup error - {}", cleanup_err));
             }
@@ -381,8 +426,18 @@ pub async fn pull_snapshot_from(
         worker.log(format!("sync snapshot {:?} done", snapshot.relative_path()));
     } else {
         worker.log(format!("re-sync snapshot {:?}", snapshot.relative_path()));
-        pull_snapshot(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks).await?;
-        worker.log(format!("re-sync snapshot {:?} done", snapshot.relative_path()));
+        pull_snapshot(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks,
+        )
+        .await?;
+        worker.log(format!(
+            "re-sync snapshot {:?} done",
+            snapshot.relative_path()
+        ));
     }
 
     Ok(())
@@ -397,7 +452,6 @@ pub async fn pull_group(
     delete: bool,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-
     let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
 
     let args = json!({
@@ -419,7 +473,7 @@ pub async fn pull_group(
     let mut remote_snapshots = std::collections::HashSet::new();
 
     // start with 16384 chunks (up to 65GB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*64)));
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
     progress.group_snapshots = list.len() as u64;
 
@@ -428,7 +482,10 @@ pub async fn pull_group(
 
         // in-progress backups can't be synced
         if item.size.is_none() {
-            worker.log(format!("skipping snapshot {} - in-progress backup", snapshot));
+            worker.log(format!(
+                "skipping snapshot {} - in-progress backup",
+                snapshot
+            ));
             continue;
         }
 
@@ -437,7 +494,9 @@ pub async fn pull_group(
         remote_snapshots.insert(backup_time);
 
         if let Some(last_sync_time) = last_sync {
-            if last_sync_time > backup_time { continue; }
+            if last_sync_time > backup_time {
+                continue;
+            }
         }
 
         // get updated auth_info (new tickets)
@@ -447,7 +506,12 @@ pub async fn pull_group(
             .password(Some(auth_info.ticket.clone()))
             .fingerprint(fingerprint.clone());
 
-        let new_client = HttpClient::new(src_repo.host(), src_repo.port(), src_repo.auth_id(), options)?;
+        let new_client = HttpClient::new(
+            src_repo.host(),
+            src_repo.port(),
+            src_repo.auth_id(),
+            options,
+        )?;
 
         let reader = BackupReader::start(
             new_client,
@@ -457,9 +521,17 @@ pub async fn pull_group(
             snapshot.group().backup_id(),
             backup_time,
             true,
-        ).await?;
+        )
+        .await?;
 
-        let result = pull_snapshot_from(worker, reader, tgt_store.clone(), &snapshot, downloaded_chunks.clone()).await;
+        let result = pull_snapshot_from(
+            worker,
+            reader,
+            tgt_store.clone(),
+            &snapshot,
+            downloaded_chunks.clone(),
+        )
+        .await;
 
         progress.done_snapshots = pos as u64 + 1;
         worker.log(format!("percentage done: {}", progress));
@@ -471,8 +543,13 @@ pub async fn pull_group(
         let local_list = group.list_backups(&tgt_store.base_path())?;
         for info in local_list {
             let backup_time = info.backup_dir.backup_time();
-            if remote_snapshots.contains(&backup_time) { continue; }
-            worker.log(format!("delete vanished snapshot {:?}", info.backup_dir.relative_path()));
+            if remote_snapshots.contains(&backup_time) {
+                continue;
+            }
+            worker.log(format!(
+                "delete vanished snapshot {:?}",
+                info.backup_dir.relative_path()
+            ));
             tgt_store.remove_backup_dir(&info.backup_dir, false)?;
         }
     }
@@ -488,7 +565,6 @@ pub async fn pull_store(
     delete: bool,
     auth_id: Authid,
 ) -> Result<(), Error> {
-
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
 
@@ -528,19 +604,23 @@ pub async fn pull_store(
         let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
             Ok(result) => result,
             Err(err) => {
-                worker.log(format!("sync group {}/{} failed - group lock failed: {}",
-                                   item.backup_type, item.backup_id, err));
+                worker.log(format!(
+                    "sync group {}/{} failed - group lock failed: {}",
+                    item.backup_type, item.backup_id, err
+                ));
                 errors = true; // do not stop here, instead continue
                 continue;
             }
         };
 
         // permission check
-        if auth_id != owner { // only the owner is allowed to create additional snapshots
-            worker.log(format!("sync group {}/{} failed - owner check failed ({} != {})",
-                               item.backup_type, item.backup_id, auth_id, owner));
+        if auth_id != owner {
+            // only the owner is allowed to create additional snapshots
+            worker.log(format!(
+                "sync group {}/{} failed - owner check failed ({} != {})",
+                item.backup_type, item.backup_id, auth_id, owner
+            ));
             errors = true; // do not stop here, instead continue
-
         } else if let Err(err) = pull_group(
             worker,
             client,
@@ -549,12 +629,12 @@ pub async fn pull_store(
             &group,
             delete,
             &mut progress,
-        ).await {
+        )
+        .await
+        {
             worker.log(format!(
                 "sync group {}/{} failed - {}",
-                item.backup_type,
-                item.backup_id,
-                err,
+                item.backup_type, item.backup_id, err,
             ));
             errors = true; // do not stop here, instead continue
         }
@@ -564,8 +644,14 @@ pub async fn pull_store(
         let result: Result<(), Error> = proxmox::try_block!({
             let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
             for local_group in local_groups {
-                if new_groups.contains(&local_group) { continue; }
-                worker.log(format!("delete vanished group '{}/{}'", local_group.backup_type(), local_group.backup_id()));
+                if new_groups.contains(&local_group) {
+                    continue;
+                }
+                worker.log(format!(
+                    "delete vanished group '{}/{}'",
+                    local_group.backup_type(),
+                    local_group.backup_id()
+                ));
                 if let Err(err) = tgt_store.remove_backup_group(&local_group) {
                     worker.log(err.to_string());
                     errors = true;
-- 
2.20.1






More information about the pbs-devel mailing list