[pbs-devel] [PATCH v8 proxmox-backup 41/69] upload stream: implement reused chunk injector

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Jun 4 10:50:59 CEST 2024


On May 28, 2024 11:42 am, Christian Ebner wrote:
> In order to be included in the backups index file, reused payload
> chunks have to be injected into the payload upload stream at a
> forced boundary. The chunker forces a chunk boundary and sends the
> list of reusable dynamic entries to be uploaded.
> 
> This implements the logic to receive these dynamic entries via the
> corresponding communication channel from the chunker and inject the
> entries into the backup upload stream by looking for the matching
> chunk boundary, already forced by the chunker.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
> changes since version 6:
> - no changes
> 
>  pbs-client/src/inject_reused_chunks.rs | 129 +++++++++++++++++++++++++
>  pbs-client/src/lib.rs                  |   1 +
>  2 files changed, 130 insertions(+)
>  create mode 100644 pbs-client/src/inject_reused_chunks.rs
> 
> diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
> new file mode 100644
> index 000000000..ed147f5fb
> --- /dev/null
> +++ b/pbs-client/src/inject_reused_chunks.rs
> @@ -0,0 +1,129 @@
> +use std::cmp;
> +use std::pin::Pin;
> +use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::{mpsc, Arc};
> +use std::task::{Context, Poll};
> +
> +use anyhow::{anyhow, Error};
> +use futures::{ready, Stream};
> +use pin_project_lite::pin_project;
> +
> +use crate::pxar::create::ReusableDynamicEntry;
> +
> +pin_project! {
> +    pub struct InjectReusedChunksQueue<S> {
> +        #[pin]
> +        input: S,
> +        next_injection: Option<InjectChunks>,
> +        buffer: Option<bytes::BytesMut>,

we successfully eliminated this buffer I think ;)

> +        injections: Option<mpsc::Receiver<InjectChunks>>,
> +        stream_len: Arc<AtomicUsize>,
> +    }
> +}
> +
> +type StreamOffset = u64;
> +#[derive(Debug)]
> +/// Holds a list of chunks to inject at the given boundary by forcing a chunk boundary.
> +pub struct InjectChunks {
> +    /// Offset at which to force the boundary
> +    pub boundary: StreamOffset,
> +    /// List of chunks to inject
> +    pub chunks: Vec<ReusableDynamicEntry>,
> +    /// Cumulative size of the chunks in the list
> +    pub size: usize,
> +}
> +
> +/// Variants for stream consumer to distinguish between raw data chunks and injected ones.
> +pub enum InjectedChunksInfo {
> +    Known(Vec<ReusableDynamicEntry>),
> +    Raw(bytes::BytesMut),
> +}
> +
> +pub trait InjectReusedChunks: Sized {
> +    fn inject_reused_chunks(
> +        self,
> +        injections: Option<mpsc::Receiver<InjectChunks>>,
> +        stream_len: Arc<AtomicUsize>,
> +    ) -> InjectReusedChunksQueue<Self>;
> +}
> +
> +impl<S> InjectReusedChunks for S
> +where
> +    S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> +    fn inject_reused_chunks(
> +        self,
> +        injections: Option<mpsc::Receiver<InjectChunks>>,
> +        stream_len: Arc<AtomicUsize>,
> +    ) -> InjectReusedChunksQueue<Self> {
> +        InjectReusedChunksQueue {
> +            input: self,
> +            next_injection: None,
> +            injections,
> +            buffer: None,
> +            stream_len,
> +        }
> +    }
> +}
> +
> +impl<S> Stream for InjectReusedChunksQueue<S>
> +where
> +    S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> +    type Item = Result<InjectedChunksInfo, Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let mut this = self.project();
> +
> +        // loop to skip over possible empty chunks
> +        loop {
> +            if this.next_injection.is_none() {
> +                if let Some(injections) = this.injections.as_mut() {
> +                    if let Ok(injection) = injections.try_recv() {
> +                        *this.next_injection = Some(injection);
> +                    }
> +                }
> +            }
> +
> +            if let Some(inject) = this.next_injection.take() {
> +                // got reusable dynamic entries to inject
> +                let offset = this.stream_len.load(Ordering::SeqCst) as u64;
> +
> +                match inject.boundary.cmp(&offset) {
> +                    // inject now
> +                    cmp::Ordering::Equal => {
> +                        let chunk_info = InjectedChunksInfo::Known(inject.chunks);
> +                        return Poll::Ready(Some(Ok(chunk_info)));
> +                    }
> +                    // inject later
> +                    cmp::Ordering::Greater => *this.next_injection = Some(inject),
> +                    // incoming new chunks and injections didn't line up?
> +                    cmp::Ordering::Less => {
> +                        return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))))
> +                    }
> +                }
> +            }
> +
> +            // nothing to inject now, await further input
> +            match ready!(this.input.as_mut().poll_next(cx)) {
> +                None => {
> +                    if let Some(injections) = this.injections.as_mut() {
> +                        if this.next_injection.is_some() || injections.try_recv().is_ok() {
> +                            // stream finished, but remaining dynamic entries to inject
> +                            return Poll::Ready(Some(Err(anyhow!(
> +                                "injection queue not fully consumed"
> +                            ))));
> +                        }
> +                    }
> +                    // stream finished and all dynamic entries already injected
> +                    return Poll::Ready(None);
> +                }
> +                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
> +                // ignore empty chunks, injected chunks from queue at forced boundary, but boundary
> +                // did not require splitting of the raw stream buffer to force the boundary
> +                Some(Ok(raw)) if raw.is_empty() => continue,
> +                Some(Ok(raw)) => return Poll::Ready(Some(Ok(InjectedChunksInfo::Raw(raw)))),
> +            }
> +        }
> +    }
> +}
> diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
> index 21cf8556b..3e7bd2a8b 100644
> --- a/pbs-client/src/lib.rs
> +++ b/pbs-client/src/lib.rs
> @@ -7,6 +7,7 @@ pub mod catalog_shell;
>  pub mod pxar;
>  pub mod tools;
>  
> +mod inject_reused_chunks;
>  mod merge_known_chunks;
>  pub mod pipe_to_stream;
>  
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list