[pbs-devel] [PATCH v2 proxmox-backup] client: pxar: fix race in pxar backup stream

Christian Ebner c.ebner at proxmox.com
Mon Nov 18 16:47:42 CET 2024


Fixes a race condition where the backup upload stream can miss an
error returned by pxar::create_archive, because the error state is
only set after the backup stream was already polled.

On instantiation, `PxarBackupStream` spawns a future handling the
pxar archive creation, which sends the encoded pxar archive stream
(or streams in case of split archives) through a channel, received
by the pxar backup stream on polling.

In case this channel is closed as signaled by returning an error, the
poll logic will propagate an eventual error occurred during pxar
creation by taking it from the `PxarBackupStream`.

As this error might not have been set just yet, this can lead to
incorrectly terminating a backup snapshot with success, eventhough an
error occurred.

To fix this, signal the end of the archive creation to the pxar
backup stream via a notification.

In case of premature termination of the pxar backup stream, no
additional measures have to been taken, as the abort handle already
terminates the archive creation.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- improved variable naming, distinguish notification sender and notified receiver

 pbs-client/src/pxar_backup_stream.rs | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 4370da6cc..6005a8d1e 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -11,6 +11,7 @@ use futures::stream::Stream;
 use nix::dir::Dir;
 use nix::fcntl::OFlag;
 use nix::sys::stat::Mode;
+use tokio::sync::Notify;
 
 use proxmox_async::blocking::TokioWriterAdapter;
 use proxmox_io::StdChannelWriter;
@@ -30,6 +31,7 @@ pub struct PxarBackupStream {
     pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
     handle: Option<AbortHandle>,
     error: Arc<Mutex<Option<Error>>>,
+    archive_finished_notification: Arc<Notify>,
 }
 
 impl Drop for PxarBackupStream {
@@ -79,6 +81,8 @@ impl PxarBackupStream {
 
         let error = Arc::new(Mutex::new(None));
         let error2 = Arc::clone(&error);
+        let pxar_backup_stream_notifier = Arc::new(Notify::new());
+        let archive_finished_notification = pxar_backup_stream_notifier.clone();
         let handler = async move {
             if let Err(err) = crate::pxar::create_archive(
                 dir,
@@ -100,6 +104,8 @@ impl PxarBackupStream {
                 let mut error = error2.lock().unwrap();
                 *error = Some(err);
             }
+            // Notify upload stream that archiver is finished (with or without error)
+            pxar_backup_stream_notifier.notify_one();
         };
 
         let (handle, registration) = AbortHandle::new_pair();
@@ -111,6 +117,7 @@ impl PxarBackupStream {
             suggested_boundaries: None,
             handle: Some(handle.clone()),
             error: Arc::clone(&error),
+            archive_finished_notification: archive_finished_notification.clone(),
         };
 
         let backup_payload_stream = payload_rx.map(|rx| Self {
@@ -118,6 +125,7 @@ impl PxarBackupStream {
             suggested_boundaries: suggested_boundaries_rx,
             handle: Some(handle),
             error,
+            archive_finished_notification,
         });
 
         Ok((backup_stream, backup_payload_stream))
@@ -151,6 +159,10 @@ impl Stream for PxarBackupStream {
         match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
             Ok(data) => Poll::Ready(Some(data)),
             Err(_) => {
+                // Wait until archiver signals finished to catch eventual errors
+                proxmox_async::runtime::block_in_place(|| {
+                    self.archive_finished_notification.notified()
+                });
                 let mut error = self.error.lock().unwrap();
                 if let Some(err) = error.take() {
                     return Poll::Ready(Some(Err(err)));
-- 
2.39.5





More information about the pbs-devel mailing list