[pbs-devel] [PATCH v6 proxmox-backup 17/29] fix #3174: upload stream: impl reused chunk injector

Christian Ebner c.ebner at proxmox.com
Thu Jan 25 14:25:56 CET 2024


In order to be included in the backups index file, the reused chunks
which store the payload of skipped files during pxar encoding have to be
inserted after the encoder has written the pxar appendix entry type.

The chunker forces a chunk boundary after this marker and queues the
list of chunks to be uploaded thereafter.
This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since v5:
- refator to use AppendableDynamicEntry
- fix formatting via `cargo fmt`

 pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
 pbs-client/src/lib.rs                  |   1 +
 2 files changed, 153 insertions(+)
 create mode 100644 pbs-client/src/inject_reused_chunks.rs

diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 00000000..7c0f7780
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,152 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use pbs_datastore::dynamic_index::AppendableDynamicEntry;
+
+pin_project! {
+    pub struct InjectReusedChunksQueue<S> {
+        #[pin]
+        input: S,
+        current: Option<InjectChunks>,
+        buffer: Option<bytes::BytesMut>,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+    pub boundary: u64,
+    pub chunks: Vec<AppendableDynamicEntry>,
+    pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+    Known(Vec<(u64, [u8; 32])>),
+    Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self> {
+        InjectReusedChunksQueue {
+            input: self,
+            current: None,
+            injection_queue,
+            buffer: None,
+            stream_len,
+            reused_len,
+            index_csum,
+        }
+    }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    type Item = Result<InjectedChunksInfo, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        loop {
+            let current = this.current.take();
+            if let Some(current) = current {
+                let mut chunks = Vec::new();
+                let mut guard = this.index_csum.lock().unwrap();
+                let csum = guard.as_mut().unwrap();
+
+                for chunk in current.chunks {
+                    let offset = this
+                        .stream_len
+                        .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+                        as u64;
+                    this.reused_len
+                        .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+                    let digest = chunk.digest();
+                    chunks.push((offset, digest));
+                    let end_offset = offset + chunk.size();
+                    csum.update(&end_offset.to_le_bytes());
+                    csum.update(&digest);
+                }
+                let chunk_info = InjectedChunksInfo::Known(chunks);
+                return Poll::Ready(Some(Ok(chunk_info)));
+            }
+
+            let buffer = this.buffer.take();
+            if let Some(buffer) = buffer {
+                let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+                let data = InjectedChunksInfo::Raw((offset, buffer));
+                return Poll::Ready(Some(Ok(data)));
+            }
+
+            match ready!(this.input.as_mut().poll_next(cx)) {
+                None => return Poll::Ready(None),
+                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+                Some(Ok(raw)) => {
+                    let chunk_size = raw.len();
+                    let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+                    let mut injections = this.injection_queue.lock().unwrap();
+
+                    if let Some(inject) = injections.pop_front() {
+                        if inject.boundary == offset {
+                            if this.current.replace(inject).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced injection queue not empty"
+                                ))));
+                            }
+                            if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced buffer not empty"
+                                ))));
+                            }
+                            continue;
+                        } else if inject.boundary == offset + chunk_size as u64 {
+                            let _ = this.current.insert(inject);
+                        } else if inject.boundary < offset + chunk_size as u64 {
+                            return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+                        } else {
+                            injections.push_front(inject);
+                        }
+                    }
+
+                    if chunk_size == 0 {
+                        return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+                    }
+
+                    let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+                    let data = InjectedChunksInfo::Raw((offset, raw));
+
+                    return Poll::Ready(Some(Ok(data)));
+                }
+            }
+        }
+    }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556..3e7bd2a8 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
 pub mod pxar;
 pub mod tools;
 
+mod inject_reused_chunks;
 mod merge_known_chunks;
 pub mod pipe_to_stream;
 
-- 
2.39.2





More information about the pbs-devel mailing list