[pbs-devel] applied: [PATCH v5 proxmox-backup] client: pxar: fix race in pxar backup stream
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Feb 11 12:30:43 CET 2025
On January 24, 2025 1:46 pm, Christian Ebner wrote:
> Fixes a race condition where the backup upload stream can miss an
> error returned by pxar::create_archive, because the error state is
> only set after the backup stream was already polled.
>
> On instantiation, `PxarBackupStream` spawns a future handling the
> pxar archive creation, which sends the encoded pxar archive stream
> (or streams in case of split archives) through a channel, received
> by the pxar backup stream on polling.
>
> In case this channel is closed as signaled by returning an error, the
> poll logic will propagate an eventual error occurred during pxar
> creation by taking it from the `PxarBackupStream`.
>
> As this error might not have been set just yet, this can lead to
> incorrectly terminating a backup snapshot with success, eventhough an
> error occurred.
>
> To fix this, introduce a dedicated notifier for each stream instance
> and wait for the archiver to signal it has finished via this
> notification channel. In addition, extend the `PxarBackupStream` by a
> `finished` flag to allow early return on subsequent polls, which
> would otherwise block, waiting for a new notification.
>
> In case of premature termination of the pxar backup stream, no
> additional measures have to been taken, as the abort handle already
> terminates the archive creation.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> Changes since version 4 (thanks a lot Fabian for comments!):
> - Drop unneeded shared state, only store error
> - Use dedicated notification channel instances per stream, so each
> channel gets notified for sure.
> - Use `finished` flag per `PxarBackupStream` instance, allowing on early
> return if the archiver finish has already been seen, avoiding to block
> again on awaiting a notification.
>
> pbs-client/src/pxar_backup_stream.rs | 34 +++++++++++++++++++++++++---
> 1 file changed, 31 insertions(+), 3 deletions(-)
>
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 2bfb5cf29..1303e8503 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -11,6 +11,7 @@ use futures::stream::Stream;
> use nix::dir::Dir;
> use nix::fcntl::OFlag;
> use nix::sys::stat::Mode;
> +use tokio::sync::Notify;
>
> use proxmox_async::blocking::TokioWriterAdapter;
> use proxmox_io::StdChannelWriter;
> @@ -31,6 +32,8 @@ pub struct PxarBackupStream {
> pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
> handle: Option<AbortHandle>,
> error: Arc<Mutex<Option<Error>>>,
> + finished: bool,
> + archiver_finished_notification: Arc<Notify>,
> }
>
> impl Drop for PxarBackupStream {
> @@ -80,6 +83,10 @@ impl PxarBackupStream {
>
> let error = Arc::new(Mutex::new(None));
> let error2 = Arc::clone(&error);
> + let stream_notifier = Arc::new(Notify::new());
> + let stream_notification_receiver = stream_notifier.clone();
> + let payload_stream_notifier = Arc::new(Notify::new());
> + let payload_stream_notification_receiver = payload_stream_notifier.clone();
> let handler = async move {
> if let Err(err) = crate::pxar::create_archive(
> dir,
> @@ -101,6 +108,10 @@ impl PxarBackupStream {
> let mut error = error2.lock().unwrap();
> *error = Some(err);
> }
> +
> + // Notify upload streams that archiver is finished (with or without error)
> + stream_notifier.notify_one();
> + payload_stream_notifier.notify_one();
> };
>
> let (handle, registration) = AbortHandle::new_pair();
> @@ -112,6 +123,8 @@ impl PxarBackupStream {
> suggested_boundaries: None,
> handle: Some(handle.clone()),
> error: Arc::clone(&error),
> + finished: false,
> + archiver_finished_notification: stream_notification_receiver,
> };
>
> let backup_payload_stream = payload_rx.map(|rx| Self {
> @@ -119,6 +132,8 @@ impl PxarBackupStream {
> suggested_boundaries: suggested_boundaries_rx,
> handle: Some(handle),
> error,
> + finished: false,
> + archiver_finished_notification: payload_stream_notification_receiver,
> });
>
> Ok((backup_stream, backup_payload_stream))
> @@ -141,18 +156,31 @@ impl Stream for PxarBackupStream {
> type Item = Result<Vec<u8>, Error>;
>
> fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> + let this = self.get_mut();
> + if this.finished {
> + // Channel has already been finished and eventual errors propagated,
> + // early return to avoid blocking on further archiver finished notifications
> + // by subsequent polls.
> + return Poll::Ready(None);
> + }
> {
> // limit lock scope
> - let mut error = self.error.lock().unwrap();
> + let mut error = this.error.lock().unwrap();
> if let Some(err) = error.take() {
> return Poll::Ready(Some(Err(err)));
> }
> }
>
> - match proxmox_async::runtime::block_in_place(|| self.rx.as_ref().unwrap().recv()) {
> + match proxmox_async::runtime::block_in_place(|| this.rx.as_ref().unwrap().recv()) {
> Ok(data) => Poll::Ready(Some(data)),
> Err(_) => {
> - let mut error = self.error.lock().unwrap();
> + // Wait for archiver to finish
> + proxmox_async::runtime::block_on(this.archiver_finished_notification.notified());
> + // Never block for archiver finished notification on subsequent calls.
> + // Eventual error will already have been propagated.
> + this.finished = true;
> +
> + let mut error = this.error.lock().unwrap();
> if let Some(err) = error.take() {
> return Poll::Ready(Some(Err(err)));
> }
> --
> 2.39.5
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list