[pbs-devel] [PATCH v8 proxmox-backup 41/69] upload stream: implement reused chunk injector
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Jun 4 10:50:59 CEST 2024
On May 28, 2024 11:42 am, Christian Ebner wrote:
> In order to be included in the backups index file, reused payload
> chunks have to be injected into the payload upload stream at a
> forced boundary. The chunker forces a chunk boundary and sends the
> list of reusable dynamic entries to be uploaded.
>
> This implements the logic to receive these dynamic entries via the
> corresponding communication channel from the chunker and inject the
> entries into the backup upload stream by looking for the matching
> chunk boundary, already forced by the chunker.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> changes since version 6:
> - no changes
>
> pbs-client/src/inject_reused_chunks.rs | 129 +++++++++++++++++++++++++
> pbs-client/src/lib.rs | 1 +
> 2 files changed, 130 insertions(+)
> create mode 100644 pbs-client/src/inject_reused_chunks.rs
>
> diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
> new file mode 100644
> index 000000000..ed147f5fb
> --- /dev/null
> +++ b/pbs-client/src/inject_reused_chunks.rs
> @@ -0,0 +1,129 @@
> +use std::cmp;
> +use std::pin::Pin;
> +use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::{mpsc, Arc};
> +use std::task::{Context, Poll};
> +
> +use anyhow::{anyhow, Error};
> +use futures::{ready, Stream};
> +use pin_project_lite::pin_project;
> +
> +use crate::pxar::create::ReusableDynamicEntry;
> +
> +pin_project! {
> + pub struct InjectReusedChunksQueue<S> {
> + #[pin]
> + input: S,
> + next_injection: Option<InjectChunks>,
> + buffer: Option<bytes::BytesMut>,
we successfully eliminated this buffer I think ;)
> + injections: Option<mpsc::Receiver<InjectChunks>>,
> + stream_len: Arc<AtomicUsize>,
> + }
> +}
> +
> +type StreamOffset = u64;
> +#[derive(Debug)]
> +/// Holds a list of chunks to inject at the given boundary by forcing a chunk boundary.
> +pub struct InjectChunks {
> + /// Offset at which to force the boundary
> + pub boundary: StreamOffset,
> + /// List of chunks to inject
> + pub chunks: Vec<ReusableDynamicEntry>,
> + /// Cumulative size of the chunks in the list
> + pub size: usize,
> +}
> +
> +/// Variants for stream consumer to distinguish between raw data chunks and injected ones.
> +pub enum InjectedChunksInfo {
> + Known(Vec<ReusableDynamicEntry>),
> + Raw(bytes::BytesMut),
> +}
> +
> +pub trait InjectReusedChunks: Sized {
> + fn inject_reused_chunks(
> + self,
> + injections: Option<mpsc::Receiver<InjectChunks>>,
> + stream_len: Arc<AtomicUsize>,
> + ) -> InjectReusedChunksQueue<Self>;
> +}
> +
> +impl<S> InjectReusedChunks for S
> +where
> + S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> + fn inject_reused_chunks(
> + self,
> + injections: Option<mpsc::Receiver<InjectChunks>>,
> + stream_len: Arc<AtomicUsize>,
> + ) -> InjectReusedChunksQueue<Self> {
> + InjectReusedChunksQueue {
> + input: self,
> + next_injection: None,
> + injections,
> + buffer: None,
> + stream_len,
> + }
> + }
> +}
> +
> +impl<S> Stream for InjectReusedChunksQueue<S>
> +where
> + S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> + type Item = Result<InjectedChunksInfo, Error>;
> +
> + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> + let mut this = self.project();
> +
> + // loop to skip over possible empty chunks
> + loop {
> + if this.next_injection.is_none() {
> + if let Some(injections) = this.injections.as_mut() {
> + if let Ok(injection) = injections.try_recv() {
> + *this.next_injection = Some(injection);
> + }
> + }
> + }
> +
> + if let Some(inject) = this.next_injection.take() {
> + // got reusable dynamic entries to inject
> + let offset = this.stream_len.load(Ordering::SeqCst) as u64;
> +
> + match inject.boundary.cmp(&offset) {
> + // inject now
> + cmp::Ordering::Equal => {
> + let chunk_info = InjectedChunksInfo::Known(inject.chunks);
> + return Poll::Ready(Some(Ok(chunk_info)));
> + }
> + // inject later
> + cmp::Ordering::Greater => *this.next_injection = Some(inject),
> + // incoming new chunks and injections didn't line up?
> + cmp::Ordering::Less => {
> + return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))))
> + }
> + }
> + }
> +
> + // nothing to inject now, await further input
> + match ready!(this.input.as_mut().poll_next(cx)) {
> + None => {
> + if let Some(injections) = this.injections.as_mut() {
> + if this.next_injection.is_some() || injections.try_recv().is_ok() {
> + // stream finished, but remaining dynamic entries to inject
> + return Poll::Ready(Some(Err(anyhow!(
> + "injection queue not fully consumed"
> + ))));
> + }
> + }
> + // stream finished and all dynamic entries already injected
> + return Poll::Ready(None);
> + }
> + Some(Err(err)) => return Poll::Ready(Some(Err(err))),
> + // ignore empty chunks, injected chunks from queue at forced boundary, but boundary
> + // did not require splitting of the raw stream buffer to force the boundary
> + Some(Ok(raw)) if raw.is_empty() => continue,
> + Some(Ok(raw)) => return Poll::Ready(Some(Ok(InjectedChunksInfo::Raw(raw)))),
> + }
> + }
> + }
> +}
> diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
> index 21cf8556b..3e7bd2a8b 100644
> --- a/pbs-client/src/lib.rs
> +++ b/pbs-client/src/lib.rs
> @@ -7,6 +7,7 @@ pub mod catalog_shell;
> pub mod pxar;
> pub mod tools;
>
> +mod inject_reused_chunks;
> mod merge_known_chunks;
> pub mod pipe_to_stream;
>
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list