[pbs-devel] [PATCH v3 proxmox-backup 5/5] client: backup reader: call finish before dropping backup readers
Christian Ebner
c.ebner at proxmox.com
Thu Jan 9 15:06:23 CET 2025
Signal the backup server that the readers have terminated with
success, so the server gracefully handles disconnections and does not
log them as error.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- not present in previous version
examples/download-speed.rs | 2 ++
proxmox-backup-client/src/catalog.rs | 2 ++
proxmox-backup-client/src/main.rs | 7 +++++++
proxmox-backup-client/src/mount.rs | 3 +++
proxmox-file-restore/src/main.rs | 19 +++++++++++++-----
src/bin/proxmox_backup_debug/diff.rs | 13 ++++++++----
src/server/pull.rs | 7 ++++++-
src/server/push.rs | 2 +-
src/server/sync.rs | 30 +++++++++++++++++-----------
9 files changed, 62 insertions(+), 23 deletions(-)
diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index fe700982b..3583135fb 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -62,6 +62,8 @@ async fn run() -> Result<(), Error> {
(bytes as f64) / (elapsed * 1024.0 * 1024.0)
);
+ client.finish().await?;
+
Ok(())
}
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index b1b22ff24..60c59137c 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -152,6 +152,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
catalog_reader.dump()?;
record_repository(&repo);
+ client.finish().await?;
Ok(Value::Null)
}
@@ -287,6 +288,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
state.shell().await?;
record_repository(&repo);
+ client.finish().await?;
Ok(())
}
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 632a29170..a19591cd9 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -999,6 +999,7 @@ async fn create_backup(
let mut catalog = None;
let mut catalog_result_rx = None;
+ let mut prev_backup_reader = None;
let log_file = |desc: &str, file: &str, target: &str| {
let what = if dry_run { "Would upload" } else { "Upload" };
@@ -1103,6 +1104,8 @@ async fn create_backup(
true,
)
.await?;
+ // Allows to finish the backup reader instance
+ prev_backup_reader = Some(backup_reader.clone());
previous_ref = prepare_reference(
&target,
manifest.clone(),
@@ -1250,6 +1253,9 @@ async fn create_backup(
.await?;
client.finish().await?;
+ if let Some(backup_reader) = prev_backup_reader {
+ backup_reader.finish().await?;
+ }
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
@@ -1744,6 +1750,7 @@ async fn restore(
)
.await?;
}
+ client.finish().await?;
Ok(Value::Null)
}
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index a5fee8329..275339a4f 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -299,6 +299,8 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
// exit on interrupted
}
}
+
+ client.finish().await?;
} else if server_archive_name.archive_type() == ArchiveType::FixedIndex {
let file_info = manifest.lookup_file_info(&server_archive_name)?;
let index = client
@@ -361,6 +363,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
}
log::info!("Image unmapped");
+ client.finish().await?;
} else {
bail!("unknown archive file extension (expected .pxar or .img)");
}
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 572e2d188..ee83519a2 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -122,7 +122,7 @@ async fn list_files(
let (manifest, _) = client.download_manifest().await?;
manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
- match path {
+ let result = match path {
ExtractPath::ListArchives => {
let mut entries = vec![];
for file in manifest.files() {
@@ -207,7 +207,11 @@ async fn list_files(
};
data_list(driver, details, file, path).await
}
- }
+ };
+
+ client.finish().await?;
+
+ result
}
#[api(
@@ -487,9 +491,13 @@ async fn extract(
.await?;
let reader = if let Some(payload_archive_name) = payload_archive_name {
- let (payload_reader, payload_size) =
- get_remote_pxar_reader(&payload_archive_name, client, &manifest, crypt_config)
- .await?;
+ let (payload_reader, payload_size) = get_remote_pxar_reader(
+ &payload_archive_name,
+ client.clone(),
+ &manifest,
+ crypt_config,
+ )
+ .await?;
pxar::PxarVariant::Split(reader, (payload_reader, payload_size))
} else {
pxar::PxarVariant::Unified(reader)
@@ -542,6 +550,7 @@ async fn extract(
bail!("cannot extract '{orig_path}'");
}
}
+ client.finish().await?;
Ok(())
}
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index fc65f3120..4462a1187 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -163,8 +163,10 @@ async fn diff_archive(
compare_contents: bool,
output_params: &OutputParams,
) -> Result<(), Error> {
- let (index_a, accessor_a) = open_dynamic_index(snapshot_a, file_name, repo_params).await?;
- let (index_b, accessor_b) = open_dynamic_index(snapshot_b, file_name, repo_params).await?;
+ let (index_a, accessor_a, backup_reader_a) =
+ open_dynamic_index(snapshot_a, file_name, repo_params).await?;
+ let (index_b, accessor_b, backup_reader_b) =
+ open_dynamic_index(snapshot_b, file_name, repo_params).await?;
// vecs of chunk digests, in their correct order
let chunks_a = chunk_digests_for_index(&index_a);
@@ -217,6 +219,9 @@ async fn diff_archive(
show_file_list(&added_files, &deleted_files, &modified_files, output_params)?;
+ backup_reader_a.finish().await?;
+ backup_reader_b.finish().await?;
+
Ok(())
}
@@ -248,7 +253,7 @@ async fn open_dynamic_index(
snapshot: &str,
archive_name: &BackupArchiveName,
params: &RepoParams,
-) -> Result<(DynamicIndexReader, Accessor), Error> {
+) -> Result<(DynamicIndexReader, Accessor, Arc<BackupReader>), Error> {
let backup_dir = match snapshot.parse::<BackupPart>()? {
BackupPart::Dir(dir) => dir,
BackupPart::Group(_group) => {
@@ -291,7 +296,7 @@ async fn open_dynamic_index(
let reader: Arc<dyn ReadAt + Send + Sync> = Arc::new(LocalDynamicReadAt::new(reader));
let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
- Ok((lookup_index, accessor))
+ Ok((lookup_index, accessor, backup_reader))
}
/// Get a list of chunk digests for an index file.
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 516abfe5d..1b914501f 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -608,13 +608,18 @@ async fn pull_group(
.store
.backup_dir(target_ns.clone(), from_snapshot.clone())?;
- let reader = params
+ let (reader, backup_reader) = params
.source
.reader(source_namespace, &from_snapshot)
.await?;
let result =
pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
+ if let Some(backup_reader) = backup_reader {
+ // ignore errors
+ let _result = backup_reader.finish().await;
+ }
+
progress.done_snapshots = pos as u64 + 1;
info!("percentage done: {progress}");
diff --git a/src/server/push.rs b/src/server/push.rs
index 6498f316b..c326bad1f 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -787,7 +787,7 @@ pub(crate) async fn push_snapshot(
.backup_dir(namespace.clone(), snapshot.clone())?;
// Reader locks the snapshot
- let reader = params.source.reader(namespace, snapshot).await?;
+ let (reader, _) = params.source.reader(namespace, snapshot).await?;
// Does not lock the manifest, but the reader already assures a locked snapshot
let source_manifest = match backup_dir.load_manifest() {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..42ddd967f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -255,7 +255,7 @@ pub(crate) trait SyncSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error>;
}
pub(crate) struct RemoteSource {
@@ -402,13 +402,16 @@ impl SyncSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteSourceReader {
- backup_reader,
- dir: dir.clone(),
- }))
+ Ok((
+ Arc::new(RemoteSourceReader {
+ backup_reader: backup_reader.clone(),
+ dir: dir.clone(),
+ }),
+ Some(backup_reader),
+ ))
}
}
@@ -475,18 +478,21 @@ impl SyncSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ ) -> Result<(Arc<dyn SyncSourceReader>, Option<Arc<BackupReader>>), Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalSourceReader {
- _dir_lock: Arc::new(Mutex::new(dir_lock)),
- path: dir.full_path(),
- datastore: dir.datastore().clone(),
- }))
+ Ok((
+ Arc::new(LocalSourceReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }),
+ None,
+ ))
}
}
--
2.39.5
More information about the pbs-devel
mailing list