[pbs-devel] [PATCH v3 proxmox-backup 38/58] upload stream: impl reused chunk injector
Fabian Grünbichler
f.gruenbichler at proxmox.com
Thu Apr 4 16:24:08 CEST 2024
On March 28, 2024 1:36 pm, Christian Ebner wrote:
> In order to be included in the backups index file, reused payload
> chunks have to be injected into the payload upload stream.
>
> The chunker forces a chunk boundary and queues the list of chunks to
> be uploaded thereafter.
>
> This implements the logic to inject the chunks into the chunk upload
> stream after such a boundary is requested, by looping over the queued
> chunks and inserting them into the stream.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 2:
> - no changes
>
> pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
> pbs-client/src/lib.rs | 1 +
> 2 files changed, 153 insertions(+)
> create mode 100644 pbs-client/src/inject_reused_chunks.rs
>
> diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
> new file mode 100644
> index 000000000..5cc19ce5d
> --- /dev/null
> +++ b/pbs-client/src/inject_reused_chunks.rs
> @@ -0,0 +1,152 @@
> +use std::collections::VecDeque;
> +use std::pin::Pin;
> +use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::{Arc, Mutex};
> +use std::task::{Context, Poll};
> +
> +use anyhow::{anyhow, Error};
> +use futures::{ready, Stream};
> +use pin_project_lite::pin_project;
> +
> +use crate::pxar::create::ReusableDynamicEntry;
> +
> +pin_project! {
> + pub struct InjectReusedChunksQueue<S> {
> + #[pin]
> + input: S,
> + current: Option<InjectChunks>,
> + buffer: Option<bytes::BytesMut>,
> + injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
> + stream_len: Arc<AtomicUsize>,
> + reused_len: Arc<AtomicUsize>,
> + index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
> + }
> +}
> +
> +#[derive(Debug)]
> +pub struct InjectChunks {
> + pub boundary: u64,
> + pub chunks: Vec<ReusableDynamicEntry>,
> + pub size: usize,
> +}
> +
> +pub enum InjectedChunksInfo {
> + Known(Vec<(u64, [u8; 32])>),
> + Raw((u64, bytes::BytesMut)),
this ones might benefit from a comment or typedef to explain what the
u64 is/u64s are..
> +}
> +
> +pub trait InjectReusedChunks: Sized {
> + fn inject_reused_chunks(
> + self,
> + injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
> + stream_len: Arc<AtomicUsize>,
> + reused_len: Arc<AtomicUsize>,
> + index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
this doesn't actually need to be an Option I think. it's always there
after all, and we just need the Arc<Mutex<_>> to ensure updates are
serialized. for the final `finish` call we can just use Arc::to_inner
and Mutex::to_inner to get the owned Sha256 out of it (there can't be
any `update`s afterwards in any case).
> + ) -> InjectReusedChunksQueue<Self>;
> +}
> +
> +impl<S> InjectReusedChunks for S
> +where
> + S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> + fn inject_reused_chunks(
> + self,
> + injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
> + stream_len: Arc<AtomicUsize>,
> + reused_len: Arc<AtomicUsize>,
> + index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
> + ) -> InjectReusedChunksQueue<Self> {
> + InjectReusedChunksQueue {
> + input: self,
> + current: None,
> + injection_queue,
> + buffer: None,
> + stream_len,
> + reused_len,
> + index_csum,
> + }
> + }
> +}
> +
> +impl<S> Stream for InjectReusedChunksQueue<S>
> +where
> + S: Stream<Item = Result<bytes::BytesMut, Error>>,
> +{
> + type Item = Result<InjectedChunksInfo, Error>;
> +
> + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
and this fn here could use some comments as well IMHO ;) I hope I didn't
misunderstand anything and my suggestions below are correct..
> + let mut this = self.project();
> + loop {
> + let current = this.current.take();
> + if let Some(current) = current {
the take can be inlined here..
> + let mut chunks = Vec::new();
> + let mut guard = this.index_csum.lock().unwrap();
> + let csum = guard.as_mut().unwrap();
> +
> + for chunk in current.chunks {
> + let offset = this
> + .stream_len
> + .fetch_add(chunk.size() as usize, Ordering::SeqCst)
> + as u64;
> + this.reused_len
> + .fetch_add(chunk.size() as usize, Ordering::SeqCst);
> + let digest = chunk.digest();
> + chunks.push((offset, digest));
> + let end_offset = offset + chunk.size();
> + csum.update(&end_offset.to_le_bytes());
> + csum.update(&digest);
> + }
> + let chunk_info = InjectedChunksInfo::Known(chunks);
> + return Poll::Ready(Some(Ok(chunk_info)));
okay, so this part here takes care of accounting known chunks, updating
the index digest and passing them along
> + }
> +
> + let buffer = this.buffer.take();
> + if let Some(buffer) = buffer {
take can be inlined
> + let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
> + let data = InjectedChunksInfo::Raw((offset, buffer));
> + return Poll::Ready(Some(Ok(data)));
this part here takes care of accounting for and passing along a new chunk
and its data, if it had to be buffered because injected chunks came
first..
> + }
> +
> + match ready!(this.input.as_mut().poll_next(cx)) {
> + None => return Poll::Ready(None),
that one's purpose is pretty clear - should we also check that there is
no more injection stuff in the queue here as that would mean something
fundamental went wrong?
> + Some(Err(err)) => return Poll::Ready(Some(Err(err))),
> + Some(Ok(raw)) => {
> + let chunk_size = raw.len();
> + let offset = this.stream_len.load(Ordering::SeqCst) as u64;
> + let mut injections = this.injection_queue.lock().unwrap();
> +
> + if let Some(inject) = injections.pop_front() {
> + if inject.boundary == offset {
if we do what I suggest below (X), then this branch here is the only one
touching current and buffer. that in turn means we can inline the
handling of current (dropping it altogether from
InjectReusedChunksQueue) and drop the loop and continue
> + if this.current.replace(inject).is_some() {
> + return Poll::Ready(Some(Err(anyhow!(
> + "replaced injection queue not empty"
> + ))));
> + }
> + if chunk_size > 0 && this.buffer.replace(raw).is_some() {
I guess this means chunks with size 0 are used in case everything is
re-used? or is the difference between here and below (where chunk_size 0
is a fatal error) accidental?
> + return Poll::Ready(Some(Err(anyhow!(
> + "replaced buffer not empty"
> + ))));
with all the other changes, this should be impossible to trigger.. then
again, it probably doesn't hurt as a safeguard either..
> + }
> + continue;
> + } else if inject.boundary == offset + chunk_size as u64 {
> + let _ = this.current.insert(inject);
X: since we add the chunk size to the offset below, this means that the next poll
ends up in the previous branch of the if (boundary == offset), even if
we remove this whole condition and branch
> + } else if inject.boundary < offset + chunk_size as u64 {
> + return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
> + } else {
> + injections.push_front(inject);
I normally dislike this kind of code (pop - check - push), but I guess
here it doesn't hurt throughput too much since the rest of the things we
do is more expensive anyway..
> + }
> + }
> +
> + if chunk_size == 0 {
> + return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
> + }
> +
> + let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
> + let data = InjectedChunksInfo::Raw((offset, raw));
> +
> + return Poll::Ready(Some(Ok(data)));
> + }
> + }
> + }
> + }
> +}
anyhow, here's a slightly simplified version of poll_next:
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(buffer) = this.buffer.take() {
let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
let data = InjectedChunksInfo::Raw((offset, buffer));
return Poll::Ready(Some(Ok(data)));
}
match ready!(this.input.as_mut().poll_next(cx)) {
None => Poll::Ready(None),
Some(Err(err)) => Poll::Ready(Some(Err(err))),
Some(Ok(raw)) => {
let chunk_size = raw.len();
let offset = this.stream_len.load(Ordering::SeqCst) as u64;
let mut injections = this.injection_queue.lock().unwrap();
if let Some(inject) = injections.pop_front() {
// inject chunk now, buffer incoming data for later
if inject.boundary == offset {
if chunk_size > 0 && this.buffer.replace(raw).is_some() {
return Poll::Ready(Some(Err(anyhow!("replaced buffer not empty"))));
}
let mut chunks = Vec::new();
let mut csum = this.index_csum.lock().unwrap();
for chunk in inject.chunks {
let offset = this
.stream_len
.fetch_add(chunk.size() as usize, Ordering::SeqCst)
as u64;
this.reused_len
.fetch_add(chunk.size() as usize, Ordering::SeqCst);
let digest = chunk.digest();
chunks.push((offset, digest));
let end_offset = offset + chunk.size();
csum.update(&end_offset.to_le_bytes());
csum.update(&digest);
}
let chunk_info = InjectedChunksInfo::Known(chunks);
return Poll::Ready(Some(Ok(chunk_info)));
} else if inject.boundary < offset + chunk_size as u64 {
return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
} else {
injections.push_front(inject);
}
}
if chunk_size == 0 {
return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
}
let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
let data = InjectedChunksInfo::Raw((offset, raw));
Poll::Ready(Some(Ok(data)))
}
}
}
this has the index_csum no longer an Option folded in, so requires a few
adaptations in other parts as well.
but I'd like the following even better, since it allows us to get rid of
the buffer altogether:
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let mut injections = this.injection_queue.lock().unwrap();
// check whether we have something to inject
if let Some(inject) = injections.pop_front() {
let offset = this.stream_len.load(Ordering::SeqCst) as u64;
if inject.boundary == offset {
// inject now
let mut chunks = Vec::new();
let mut csum = this.index_csum.lock().unwrap();
// account for injected chunks
for chunk in inject.chunks {
let offset = this
.stream_len
.fetch_add(chunk.size() as usize, Ordering::SeqCst)
as u64;
this.reused_len
.fetch_add(chunk.size() as usize, Ordering::SeqCst);
let digest = chunk.digest();
chunks.push((offset, digest));
let end_offset = offset + chunk.size();
csum.update(&end_offset.to_le_bytes());
csum.update(&digest);
}
let chunk_info = InjectedChunksInfo::Known(chunks);
return Poll::Ready(Some(Ok(chunk_info)));
} else if inject.boundary < offset {
// incoming new chunks and injections didn't line up?
return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
} else {
// inject later
injections.push_front(inject);
}
}
// nothing to inject now, let's see if there's further input
match ready!(this.input.as_mut().poll_next(cx)) {
None => Poll::Ready(None),
Some(Err(err)) => Poll::Ready(Some(Err(err))),
Some(Ok(raw)) if raw.is_empty() => {
Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))))
}
Some(Ok(raw)) => {
let offset = this.stream_len.fetch_add(raw.len(), Ordering::SeqCst) as u64;
let data = InjectedChunksInfo::Raw((offset, raw));
Poll::Ready(Some(Ok(data)))
}
}
}
but technically all this accounting could move back to the backup_writer
as well, if the injected chunk info also contained the size..
> diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
> index 21cf8556b..3e7bd2a8b 100644
> --- a/pbs-client/src/lib.rs
> +++ b/pbs-client/src/lib.rs
> @@ -7,6 +7,7 @@ pub mod catalog_shell;
> pub mod pxar;
> pub mod tools;
>
> +mod inject_reused_chunks;
> mod merge_known_chunks;
> pub mod pipe_to_stream;
>
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list