[pbs-devel] [PATCH v3 proxmox-backup 5/5] client: backup reader: call finish before dropping backup readers

Christian Ebner c.ebner at proxmox.com
Thu Jan 9 15:06:23 CET 2025


Signal the backup server that the readers have terminated with
success, so the server gracefully handles disconnections and does not
log them as error.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- not present in previous version

 examples/download-speed.rs           |  2 ++
 proxmox-backup-client/src/catalog.rs |  2 ++
 proxmox-backup-client/src/main.rs    |  7 +++++++
 proxmox-backup-client/src/mount.rs   |  3 +++
 proxmox-file-restore/src/main.rs     | 19 +++++++++++++-----
 src/bin/proxmox_backup_debug/diff.rs | 13 ++++++++----
 src/server/pull.rs                   |  7 ++++++-
 src/server/push.rs                   |  2 +-
 src/server/sync.rs                   | 30 +++++++++++++++++-----------
 9 files changed, 62 insertions(+), 23 deletions(-)

diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index fe700982b..3583135fb 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -62,6 +62,8 @@ async fn run() -> Result<(), Error> {
         (bytes as f64) / (elapsed * 1024.0 * 1024.0)
     );
 
+    client.finish().await?;
+
     Ok(())
 }
 
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index b1b22ff24..60c59137c 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -152,6 +152,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
     catalog_reader.dump()?;
 
     record_repository(&repo);
+    client.finish().await?;
 
     Ok(Value::Null)
 }
@@ -287,6 +288,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
     state.shell().await?;
 
     record_repository(&repo);
+    client.finish().await?;
 
     Ok(())
 }
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 632a29170..a19591cd9 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -999,6 +999,7 @@ async fn create_backup(
 
     let mut catalog = None;
     let mut catalog_result_rx = None;
+    let mut prev_backup_reader = None;
 
     let log_file = |desc: &str, file: &str, target: &str| {
         let what = if dry_run { "Would upload" } else { "Upload" };
@@ -1103,6 +1104,8 @@ async fn create_backup(
                                 true,
                             )
                             .await?;
+                            // Allows to finish the backup reader instance
+                            prev_backup_reader = Some(backup_reader.clone());
                             previous_ref = prepare_reference(
                                 &target,
                                 manifest.clone(),
@@ -1250,6 +1253,9 @@ async fn create_backup(
         .await?;
 
     client.finish().await?;
+    if let Some(backup_reader) = prev_backup_reader {
+        backup_reader.finish().await?;
+    }
 
     let end_time = std::time::Instant::now();
     let elapsed = end_time.duration_since(start_time);
@@ -1744,6 +1750,7 @@ async fn restore(
         )
         .await?;
     }
+    client.finish().await?;
 
     Ok(Value::Null)
 }
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index a5fee8329..275339a4f 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -299,6 +299,8 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
                 // exit on interrupted
             }
         }
+
+        client.finish().await?;
     } else if server_archive_name.archive_type() == ArchiveType::FixedIndex {
         let file_info = manifest.lookup_file_info(&server_archive_name)?;
         let index = client
@@ -361,6 +363,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
         }
 
         log::info!("Image unmapped");
+        client.finish().await?;
     } else {
         bail!("unknown archive file extension (expected .pxar or .img)");
     }
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 572e2d188..ee83519a2 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -122,7 +122,7 @@ async fn list_files(
     let (manifest, _) = client.download_manifest().await?;
     manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
 
-    match path {
+    let result = match path {
         ExtractPath::ListArchives => {
             let mut entries = vec![];
             for file in manifest.files() {
@@ -207,7 +207,11 @@ async fn list_files(
             };
             data_list(driver, details, file, path).await
         }
-    }
+    };
+
+    client.finish().await?;
+
+    result
 }
 
 #[api(
@@ -487,9 +491,13 @@ async fn extract(
             .await?;
 
             let reader = if let Some(payload_archive_name) = payload_archive_name {
-                let (payload_reader, payload_size) =
-                    get_remote_pxar_reader(&payload_archive_name, client, &manifest, crypt_config)
-                        .await?;
+                let (payload_reader, payload_size) = get_remote_pxar_reader(
+                    &payload_archive_name,
+                    client.clone(),
+                    &manifest,
+                    crypt_config,
+                )
+                .await?;
                 pxar::PxarVariant::Split(reader, (payload_reader, payload_size))
             } else {
                 pxar::PxarVariant::Unified(reader)
@@ -542,6 +550,7 @@ async fn extract(
             bail!("cannot extract '{orig_path}'");
         }
     }
+    client.finish().await?;
 
     Ok(())
 }
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index fc65f3120..4462a1187 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -163,8 +163,10 @@ async fn diff_archive(
     compare_contents: bool,
     output_params: &OutputParams,
 ) -> Result<(), Error> {
-    let (index_a, accessor_a) = open_dynamic_index(snapshot_a, file_name, repo_params).await?;
-    let (index_b, accessor_b) = open_dynamic_index(snapshot_b, file_name, repo_params).await?;
+    let (index_a, accessor_a, backup_reader_a) =
+        open_dynamic_index(snapshot_a, file_name, repo_params).await?;
+    let (index_b, accessor_b, backup_reader_b) =
+        open_dynamic_index(snapshot_b, file_name, repo_params).await?;
 
     // vecs of chunk digests, in their correct order
     let chunks_a = chunk_digests_for_index(&index_a);
@@ -217,6 +219,9 @@ async fn diff_archive(
 
     show_file_list(&added_files, &deleted_files, &modified_files, output_params)?;
 
+    backup_reader_a.finish().await?;
+    backup_reader_b.finish().await?;
+
     Ok(())
 }
 
@@ -248,7 +253,7 @@ async fn open_dynamic_index(
     snapshot: &str,
     archive_name: &BackupArchiveName,
     params: &RepoParams,
-) -> Result<(DynamicIndexReader, Accessor), Error> {
+) -> Result<(DynamicIndexReader, Accessor, Arc<BackupReader>), Error> {
     let backup_dir = match snapshot.parse::<BackupPart>()? {
         BackupPart::Dir(dir) => dir,
         BackupPart::Group(_group) => {
@@ -291,7 +296,7 @@ async fn open_dynamic_index(
     let reader: Arc<dyn ReadAt + Send + Sync> = Arc::new(LocalDynamicReadAt::new(reader));
     let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
 
-    Ok((lookup_index, accessor))
+    Ok((lookup_index, accessor, backup_reader))
 }
 
 /// Get a list of chunk digests for an index file.
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 516abfe5d..1b914501f 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -608,13 +608,18 @@ async fn pull_group(
             .store
             .backup_dir(target_ns.clone(), from_snapshot.clone())?;
 
-        let reader = params
+        let (reader, backup_reader) = params
             .source
             .reader(source_namespace, &from_snapshot)
             .await?;
         let result =
             pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
 
+        if let Some(backup_reader) = backup_reader {
+            // ignore errors
+            let _result = backup_reader.finish().await;
+        }
+
         progress.done_snapshots = pos as u64 + 1;
         info!("percentage done: {progress}");
 
diff --git a/src/server/push.rs b/src/server/push.rs
index 6498f316b..c326bad1f 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -787,7 +787,7 @@ pub(crate) async fn push_snapshot(
         .backup_dir(namespace.clone(), snapshot.clone())?;
 
     // Reader locks the snapshot
-    let reader = params.source.reader(namespace, snapshot).await?;
+    let (reader, _) = params.source.reader(namespace, snapshot).await?;
 
     // Does not lock the manifest, but the reader already assures a locked snapshot
     let source_manifest = match backup_dir.load_manifest() {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..42ddd967f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -255,7 +255,7 @@ pub(crate) trait SyncSource: Send + Sync {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+    ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error>;
 }
 
 pub(crate) struct RemoteSource {
@@ -402,13 +402,16 @@ impl SyncSource for RemoteSource {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+    ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
         let backup_reader =
             BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
-        Ok(Arc::new(RemoteSourceReader {
-            backup_reader,
-            dir: dir.clone(),
-        }))
+        Ok((
+            Arc::new(RemoteSourceReader {
+                backup_reader: backup_reader.clone(),
+                dir: dir.clone(),
+            }),
+            Some(backup_reader),
+        ))
     }
 }
 
@@ -475,18 +478,21 @@ impl SyncSource for LocalSource {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+    ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
         let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
         let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
             &dir.full_path(),
             "snapshot",
             "locked by another operation",
         )?;
-        Ok(Arc::new(LocalSourceReader {
-            _dir_lock: Arc::new(Mutex::new(dir_lock)),
-            path: dir.full_path(),
-            datastore: dir.datastore().clone(),
-        }))
+        Ok((
+            Arc::new(LocalSourceReader {
+                _dir_lock: Arc::new(Mutex::new(dir_lock)),
+                path: dir.full_path(),
+                datastore: dir.datastore().clone(),
+            }),
+            None,
+        ))
     }
 }
 
-- 
2.39.5





More information about the pbs-devel mailing list