[pbs-devel] applied: [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Feb 11 12:30:43 CET 2025


On January 24, 2025 1:46 pm, Christian Ebner wrote:
> Fixes a race condition where the backup upload stream can miss an
> error returned by pxar::create_archive, because the error state is
> only set after the backup stream was already polled.
> 
> On instantiation, `PxarBackupStream` spawns a future handling the
> pxar archive creation, which sends the encoded pxar archive stream
> (or streams in case of split archives) through a channel, received
> by the pxar backup stream on polling.
> 
> In case this channel is closed as signaled by returning an error, the
> poll logic will propagate an eventual error occurred during pxar
> creation by taking it from the `PxarBackupStream`.
> 
> As this error might not have been set just yet, this can lead to
> incorrectly terminating a backup snapshot with success, eventhough an
> error occurred.
> 
> To fix this, introduce a dedicated notifier for each stream instance
> and wait for the archiver to signal it has finished via this
> notification channel. In addition, extend the `PxarBackupStream` by a
> `finished` flag to allow early return on subsequent polls, which
> would otherwise block, waiting for a new notification.
> 
> In case of premature termination of the pxar backup stream, no
> additional measures have to been taken, as the abort handle already
> terminates the archive creation.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> Changes since version 4 (thanks a lot Fabian for comments!):
> - Drop unneeded shared state, only store error
> - Use dedicated notification channel instances per stream, so each
>   channel gets notified for sure.
> - Use `finished` flag per `PxarBackupStream` instance, allowing on early
>   return if the archiver finish has already been seen, avoiding to block
>   again on awaiting a notification.
> 
>  pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
>  1 file changed, 31 insertions(+), 3 deletions(-)
> 
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 2bfb5cf29..1303e8503 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -11,6 +11,7 @@ use futures::stream::Stream;
>  use nix::dir::Dir;
>  use nix::fcntl::OFlag;
>  use nix::sys::stat::Mode;
> +use tokio::sync::Notify;
>  
>  use proxmox_async::blocking::TokioWriterAdapter;
>  use proxmox_io::StdChannelWriter;
> @@ -31,6 +32,8 @@ pub struct PxarBackupStream {
>      pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
>      handle: Option<AbortHandle>,
>      error: Arc<Mutex<Option<Error>>>,
> +    finished: bool,
> +    archiver_finished_notification: Arc<Notify>,
>  }
>  
>  impl Drop for PxarBackupStream {
> @@ -80,6 +83,10 @@ impl PxarBackupStream {
>  
>          let error = Arc::new(Mutex::new(None));
>          let error2 = Arc::clone(&error);
> +        let stream_notifier = Arc::new(Notify::new());
> +        let stream_notification_receiver = stream_notifier.clone();
> +        let payload_stream_notifier = Arc::new(Notify::new());
> +        let payload_stream_notification_receiver = payload_stream_notifier.clone();
>          let handler = async move {
>              if let Err(err) = crate::pxar::create_archive(
>                  dir,
> @@ -101,6 +108,10 @@ impl PxarBackupStream {
>                  let mut error = error2.lock().unwrap();
>                  *error = Some(err);
>              }
> +
> +            // Notify upload streams that archiver is finished (with or without error)
> +            stream_notifier.notify_one();
> +            payload_stream_notifier.notify_one();
>          };
>  
>          let (handle, registration) = AbortHandle::new_pair();
> @@ -112,6 +123,8 @@ impl PxarBackupStream {
>              suggested_boundaries: None,
>              handle: Some(handle.clone()),
>              error: Arc::clone(&error),
> +            finished: false,
> +            archiver_finished_notification: stream_notification_receiver,
>          };
>  
>          let backup_payload_stream = payload_rx.map(|rx| Self {
> @@ -119,6 +132,8 @@ impl PxarBackupStream {
>              suggested_boundaries: suggested_boundaries_rx,
>              handle: Some(handle),
>              error,
> +            finished: false,
> +            archiver_finished_notification: payload_stream_notification_receiver,
>          });
>  
>          Ok((backup_stream, backup_payload_stream))
> @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
>      type Item = Result<Vec<u8>, Error>;
>  
>      fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +        if this.finished {
> +            // Channel has already been finished and eventual errors propagated,
> +            // early return to avoid blocking on further archiver finished notifications
> +            // by subsequent polls.
> +            return Poll::Ready(None);
> +        }
>          {
>              // limit lock scope
> -            let mut error = self.error.lock().unwrap();
> +            let mut error = this.error.lock().unwrap();
>              if let Some(err) = error.take() {
>                  return Poll::Ready(Some(Err(err)));
>              }
>          }
>  
> -        match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
> +        match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
>              Ok(data) => Poll::Ready(Some(data)),
>              Err(_) => {
> -                let mut error = self.error.lock().unwrap();
> +                // Wait for archiver to finish
> +                proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
> +                // Never block for archiver finished notification on subsequent calls.
> +                // Eventual error will already have been propagated.
> +                this.finished = true;
> +
> +                let mut error = this.error.lock().unwrap();
>                  if let Some(err) = error.take() {
>                      return Poll::Ready(Some(Err(err)));
>                  }
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list