[pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector
Christian Ebner
c.ebner at proxmox.com
Wed Feb 28 15:02:15 CET 2024
In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream.
The chunker forces a chunk boundary and queues the list of chunks to
be uploaded thereafter.
This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
pbs-client/src/lib.rs | 1 +
2 files changed, 153 insertions(+)
create mode 100644 pbs-client/src/inject_reused_chunks.rs
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 00000000..7c0f7780
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,152 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use pbs_datastore::dynamic_index::AppendableDynamicEntry;
+
+pin_project! {
+ pub struct InjectReusedChunksQueue<S> {
+ #[pin]
+ input: S,
+ current: Option<InjectChunks>,
+ buffer: Option<bytes::BytesMut>,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+ pub boundary: u64,
+ pub chunks: Vec<AppendableDynamicEntry>,
+ pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+ Known(Vec<(u64, [u8; 32])>),
+ Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self> {
+ InjectReusedChunksQueue {
+ input: self,
+ current: None,
+ injection_queue,
+ buffer: None,
+ stream_len,
+ reused_len,
+ index_csum,
+ }
+ }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ type Item = Result<InjectedChunksInfo, Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ loop {
+ let current = this.current.take();
+ if let Some(current) = current {
+ let mut chunks = Vec::new();
+ let mut guard = this.index_csum.lock().unwrap();
+ let csum = guard.as_mut().unwrap();
+
+ for chunk in current.chunks {
+ let offset = this
+ .stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+ as u64;
+ this.reused_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ let digest = chunk.digest();
+ chunks.push((offset, digest));
+ let end_offset = offset + chunk.size();
+ csum.update(&end_offset.to_le_bytes());
+ csum.update(&digest);
+ }
+ let chunk_info = InjectedChunksInfo::Known(chunks);
+ return Poll::Ready(Some(Ok(chunk_info)));
+ }
+
+ let buffer = this.buffer.take();
+ if let Some(buffer) = buffer {
+ let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, buffer));
+ return Poll::Ready(Some(Ok(data)));
+ }
+
+ match ready!(this.input.as_mut().poll_next(cx)) {
+ None => return Poll::Ready(None),
+ Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+ Some(Ok(raw)) => {
+ let chunk_size = raw.len();
+ let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+ let mut injections = this.injection_queue.lock().unwrap();
+
+ if let Some(inject) = injections.pop_front() {
+ if inject.boundary == offset {
+ if this.current.replace(inject).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced injection queue not empty"
+ ))));
+ }
+ if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced buffer not empty"
+ ))));
+ }
+ continue;
+ } else if inject.boundary == offset + chunk_size as u64 {
+ let _ = this.current.insert(inject);
+ } else if inject.boundary < offset + chunk_size as u64 {
+ return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+ } else {
+ injections.push_front(inject);
+ }
+ }
+
+ if chunk_size == 0 {
+ return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+ }
+
+ let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, raw));
+
+ return Poll::Ready(Some(Ok(data)));
+ }
+ }
+ }
+ }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556..3e7bd2a8 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
pub mod pxar;
pub mod tools;
+mod inject_reused_chunks;
mod merge_known_chunks;
pub mod pipe_to_stream;
--
2.39.2
More information about the pbs-devel
mailing list