[pbs-devel] [RFC v2 proxmox-backup 26/36] client: chunk stream: add chunk injection queues
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Mar 12 10:46:13 CET 2024
On March 5, 2024 10:26 am, Christian Ebner wrote:
> Adds a queue to the chunk stream to request forced boundaries at a
> given offset within the stream and inject reused chunks after this
> boundary.
>
> The chunks are then passed along to the uploader stream using the
> injection queue, which inserts them during upload.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
I think this patch here would benefit from a few more Option<..>
wrappings (to make it clear where injection can actually happen), and
possibly also combining some stuff into structs (to reduce the number of
parameters and group those only set/needed for injection/caching/..)
I haven't tested the proposed changes below, but AFAICT they should
work..
> ---
> changes since version 1:
> - refactor bail on non-existing payload target archive name
>
> examples/test_chunk_speed2.rs | 10 ++-
> pbs-client/src/backup_writer.rs | 89 +++++++++++--------
> pbs-client/src/chunk_stream.rs | 42 ++++++++-
> pbs-client/src/pxar/create.rs | 6 +-
> pbs-client/src/pxar_backup_stream.rs | 8 +-
> proxmox-backup-client/src/main.rs | 28 ++++--
> .../src/proxmox_restore_daemon/api.rs | 3 +
> pxar-bin/src/main.rs | 5 +-
> tests/catar.rs | 3 +
> 9 files changed, 147 insertions(+), 47 deletions(-)
>
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436..b20a5b59 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -1,3 +1,6 @@
> +use std::collections::VecDeque;
> +use std::sync::{Arc, Mutex};
> +
> use anyhow::Error;
> use futures::*;
>
> @@ -26,7 +29,12 @@ async fn run() -> Result<(), Error> {
> .map_err(Error::from);
>
> //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> - let mut chunk_stream = ChunkStream::new(stream, None);
> + let mut chunk_stream = ChunkStream::new(
> + stream,
> + None,
> + Arc::new(Mutex::new(VecDeque::new())),
> + Arc::new(Mutex::new(VecDeque::new())),
> + );
>
> let start_time = std::time::Instant::now();
>
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 8a03d8ea..e66b93df 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
> use std::future::Future;
> use std::os::unix::fs::OpenOptionsExt;
> use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>
> use proxmox_human_byte::HumanByte;
>
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
> use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>
> use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
> archive_name: &str,
> stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
> options: UploadOptions,
> + injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
this one is already properly optional :)
> ) -> Result<BackupStats, Error> {
> let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>
> @@ -341,6 +343,7 @@ impl BackupWriter {
> None
> },
> options.compress,
> + injection_queue,
> )
> .await?;
>
> @@ -637,6 +640,7 @@ impl BackupWriter {
> known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> crypt_config: Option<Arc<CryptConfig>>,
> compress: bool,
> + injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
> ) -> impl Future<Output = Result<UploadStats, Error>> {
> let total_chunks = Arc::new(AtomicUsize::new(0));
> let total_chunks2 = total_chunks.clone();
> @@ -663,48 +667,63 @@ impl BackupWriter {
> let index_csum_2 = index_csum.clone();
>
> stream
> - .and_then(move |data| {
> - let chunk_len = data.len();
> + .inject_reused_chunks(
> + injection_queue.unwrap_or_default(),
> + stream_len,
> + reused_len.clone(),
> + index_csum.clone(),
> + )
> + .and_then(move |chunk_info| match chunk_info {
> + InjectedChunksInfo::Known(chunks) => {
> + total_chunks.fetch_add(chunks.len(), Ordering::SeqCst);
> + future::ok(MergedChunkInfo::Known(chunks))
> + }
> + InjectedChunksInfo::Raw((offset, data)) => {
> + let chunk_len = data.len();
>
> - total_chunks.fetch_add(1, Ordering::SeqCst);
> - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
this house keeping is now split between here and inject_reused_chunks,
which makes it a bit hard to follow..
> + total_chunks.fetch_add(1, Ordering::SeqCst);
>
> - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
> + let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
>
> - if let Some(ref crypt_config) = crypt_config {
> - chunk_builder = chunk_builder.crypt_config(crypt_config);
> - }
> + if let Some(ref crypt_config) = crypt_config {
> + chunk_builder = chunk_builder.crypt_config(crypt_config);
> + }
>
> - let mut known_chunks = known_chunks.lock().unwrap();
> - let digest = chunk_builder.digest();
> + let mut known_chunks = known_chunks.lock().unwrap();
>
> - let mut guard = index_csum.lock().unwrap();
> - let csum = guard.as_mut().unwrap();
> + let digest = chunk_builder.digest();
>
> - let chunk_end = offset + chunk_len as u64;
> + let mut guard = index_csum.lock().unwrap();
> + let csum = guard.as_mut().unwrap();
>
> - if !is_fixed_chunk_size {
> - csum.update(&chunk_end.to_le_bytes());
> - }
> - csum.update(digest);
> + let chunk_end = offset + chunk_len as u64;
>
> - let chunk_is_known = known_chunks.contains(digest);
> - if chunk_is_known {
> - known_chunk_count.fetch_add(1, Ordering::SeqCst);
> - reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> - } else {
> - let compressed_stream_len2 = compressed_stream_len.clone();
> - known_chunks.insert(*digest);
> - future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> - MergedChunkInfo::New(ChunkInfo {
> - chunk,
> - digest,
> - chunk_len: chunk_len as u64,
> - offset,
> - })
> - }))
> + if !is_fixed_chunk_size {
> + csum.update(&chunk_end.to_le_bytes());
> + }
> + csum.update(digest);
> +
> + let chunk_is_known = known_chunks.contains(digest);
> + if chunk_is_known {
> + known_chunk_count.fetch_add(1, Ordering::SeqCst);
> + reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> +
> + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> + } else {
> + let compressed_stream_len2 = compressed_stream_len.clone();
> + known_chunks.insert(*digest);
> +
> + future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +
> + MergedChunkInfo::New(ChunkInfo {
> + chunk,
> + digest,
> + chunk_len: chunk_len as u64,
> + offset,
> + })
> + }))
> + }
> }
> })
> .merge_known_chunks()
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 895f6eae..891d6928 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -1,4 +1,6 @@
> +use std::collections::VecDeque;
> use std::pin::Pin;
> +use std::sync::{Arc, Mutex};
> use std::task::{Context, Poll};
>
> use anyhow::Error;
> @@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream};
>
> use pbs_datastore::Chunker;
>
> +use crate::inject_reused_chunks::InjectChunks;
> +
> /// Split input stream into dynamic sized chunks
> pub struct ChunkStream<S: Unpin> {
> input: S,
> chunker: Chunker,
> buffer: BytesMut,
> scan_pos: usize,
> + consumed: u64,
> + boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> + injections: Arc<Mutex<VecDeque<InjectChunks>>>,
okay, so boundaries and injections are only either both meaningful, or
not. we only set them for the payload stream. they should be an Option
;) technically consumed atm could also go inside that option, and we
could make the whole thing a struct?
struct InjectionData {
boundaries: Arc<Mutex<..>,
injections: Arc<Mutex<..>,
consumed: u64,
}
and then pass in an Option of that?
> }
>
> impl<S: Unpin> ChunkStream<S> {
> - pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> + pub fn new(
> + input: S,
> + chunk_size: Option<usize>,
> + boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> + injections: Arc<Mutex<VecDeque<InjectChunks>>>,
> + ) -> Self {
> Self {
> input,
> chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
> buffer: BytesMut::new(),
> scan_pos: 0,
> + consumed: 0,
> + boundaries,
> + injections,
> }
> }
> }
> @@ -40,6 +55,29 @@ where
> fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> let this = self.get_mut();
> loop {
> + {
this scope for lock purposes can then be an `if let Some(..)`, either
with the struct or tuple..
> + // Make sure to release this lock as soon as possible
> + let mut boundaries = this.boundaries.lock().unwrap();
> + if let Some(inject) = boundaries.pop_front() {
> + let max = this.consumed + this.buffer.len() as u64;
> + if inject.boundary <= max {
> + let chunk_size = (inject.boundary - this.consumed) as usize;
> + let result = this.buffer.split_to(chunk_size);
> + this.consumed += chunk_size as u64;
> + this.scan_pos = 0;
> +
> + // Add the size of the injected chunks to consumed, so chunk stream offsets
> + // are in sync with the rest of the archive.
> + this.consumed += inject.size as u64;
> +
> + this.injections.lock().unwrap().push_back(inject);
> +
> + return Poll::Ready(Some(Ok(result)));
> + }
> + boundaries.push_front(inject);
> + }
> + }
> +
> if this.scan_pos < this.buffer.len() {
> let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>
> @@ -50,7 +88,9 @@ where
> // continue poll
> } else if chunk_size <= this.buffer.len() {
> let result = this.buffer.split_to(chunk_size);
> + this.consumed += chunk_size as u64;
> this.scan_pos = 0;
> +
> return Poll::Ready(Some(Ok(result)));
> } else {
> panic!("got unexpected chunk boundary from chunker");
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 59aa4450..9ae84d37 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -1,4 +1,4 @@
> -use std::collections::{HashMap, HashSet};
> +use std::collections::{HashMap, HashSet, VecDeque};
> use std::ffi::{CStr, CString, OsStr};
> use std::fmt;
> use std::io::{self, Read};
> @@ -26,6 +26,7 @@ use proxmox_sys::fs::{self, acl, xattr};
>
> use pbs_datastore::catalog::BackupCatalogWriter;
>
> +use crate::inject_reused_chunks::InjectChunks;
> use crate::pxar::metadata::errno_is_unsupported;
> use crate::pxar::tools::assert_single_path_component;
> use crate::pxar::Flags;
> @@ -131,6 +132,7 @@ struct Archiver {
> hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
> file_copy_buffer: Vec<u8>,
> skip_e2big_xattr: bool,
> + forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> }
>
> type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -143,6 +145,7 @@ pub async fn create_archive<T, F>(
> catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
> mut payload_writer: Option<T>,
> options: PxarCreateOptions,
> + forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
could be combined with the payload_writer and the caching parameters
added later on? and then the whole thing can be optional?
> ) -> Result<(), Error>
> where
> T: SeqWrite + Send,
> @@ -201,6 +204,7 @@ where
> hardlinks: HashMap::new(),
> file_copy_buffer: vec::undefined(4 * 1024 * 1024),
> skip_e2big_xattr: options.skip_e2big_xattr,
> + forced_boundaries,
> };
>
> archiver
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 9a600cc1..1a51b0c2 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -1,3 +1,4 @@
> +use std::collections::VecDeque;
> use std::io::Write;
> //use std::os::unix::io::FromRawFd;
> use std::path::Path;
> @@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter;
>
> use pbs_datastore::catalog::CatalogWriter;
>
> +use crate::inject_reused_chunks::InjectChunks;
> +
> /// Stream implementation to encode and upload .pxar archives.
> ///
> /// The hyper client needs an async Stream for file upload, so we
> @@ -40,6 +43,7 @@ impl PxarBackupStream {
> dir: Dir,
> catalog: Arc<Mutex<CatalogWriter<W>>>,
> options: crate::pxar::PxarCreateOptions,
> + boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> separate_payload_stream: bool,
> ) -> Result<(Self, Option<Self>), Error> {
> let buffer_size = 256 * 1024;
> @@ -79,6 +83,7 @@ impl PxarBackupStream {
> Some(catalog),
> payload_writer,
> options,
> + boundaries,
> )
> .await
> {
> @@ -110,11 +115,12 @@ impl PxarBackupStream {
> dirname: &Path,
> catalog: Arc<Mutex<CatalogWriter<W>>>,
> options: crate::pxar::PxarCreateOptions,
> + boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
> separate_payload_stream: bool,
make boundaries optional (and maybe give it a more "readable" name ;)),
replace the separate_payload_stream with its Some-ness?
> ) -> Result<(Self, Option<Self>), Error> {
> let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
>
> - Self::new(dir, catalog, options, separate_payload_stream)
> + Self::new(dir, catalog, options, boundaries, separate_payload_stream)
> }
> }
>
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index e609aa16..f077ddf6 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -1,4 +1,4 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
> use std::io::{self, Read, Seek, SeekFrom, Write};
> use std::path::{Path, PathBuf};
> use std::pin::Pin;
> @@ -197,14 +197,19 @@ async fn backup_directory<P: AsRef<Path>>(
> bail!("cannot backup directory with fixed chunk size!");
> }
>
> + let payload_boundaries = Arc::new(Mutex::new(VecDeque::new()));
make this an Option, set based on payload_target
> let (pxar_stream, payload_stream) = PxarBackupStream::open(
> dir_path.as_ref(),
> catalog,
> pxar_create_options,
> + payload_boundaries.clone(),
> payload_target.is_some(),
> )?;
>
> - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
> + let dummy_injections = Arc::new(Mutex::new(VecDeque::new()));
> + let dummy_boundaries = Arc::new(Mutex::new(VecDeque::new()));
> + let mut chunk_stream =
> + ChunkStream::new(pxar_stream, chunk_size, dummy_boundaries, dummy_injections);
replace these with None
> let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
>
> let stream = ReceiverStream::new(rx).map_err(Error::from);
> @@ -216,15 +221,18 @@ async fn backup_directory<P: AsRef<Path>>(
> }
> });
>
> - let stats = client.upload_stream(archive_name, stream, upload_options.clone());
> + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
>
> if let Some(payload_stream) = payload_stream {
> let payload_target = payload_target
> .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
>
> + let payload_injections = Arc::new(Mutex::new(VecDeque::new()));
> let mut payload_chunk_stream = ChunkStream::new(
> payload_stream,
> chunk_size,
> + payload_boundaries,
> + payload_injections.clone(),
> );
> let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
> let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
> @@ -240,6 +248,7 @@ async fn backup_directory<P: AsRef<Path>>(
> &payload_target,
> stream,
> upload_options,
> + Some(payload_injections),
> );
>
> match futures::join!(stats, payload_stats) {
> @@ -276,7 +285,7 @@ async fn backup_image<P: AsRef<Path>>(
> }
>
> let stats = client
> - .upload_stream(archive_name, stream, upload_options)
> + .upload_stream(archive_name, stream, upload_options, None)
> .await?;
>
> Ok(stats)
> @@ -567,7 +576,14 @@ fn spawn_catalog_upload(
> let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
> let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
> let catalog_chunk_size = 512 * 1024;
> - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
> + let boundaries = Arc::new(Mutex::new(VecDeque::new()));
> + let injections = Arc::new(Mutex::new(VecDeque::new()));
> + let catalog_chunk_stream = ChunkStream::new(
> + catalog_stream,
> + Some(catalog_chunk_size),
> + boundaries,
> + injections.clone(),
> + );
replace these with None (they are also dummies AFAICT?)
>
> let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
> StdChannelWriter::new(catalog_tx),
> @@ -583,7 +599,7 @@ fn spawn_catalog_upload(
>
> tokio::spawn(async move {
> let catalog_upload_result = client
> - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
> + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
> .await;
>
> if let Err(ref err) = catalog_upload_result {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index bd8ddb20..d912734c 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -1,8 +1,10 @@
> ///! File-restore API running inside the restore VM
> +use std::collections::VecDeque;
> use std::ffi::OsStr;
> use std::fs;
> use std::os::unix::ffi::OsStrExt;
> use std::path::{Path, PathBuf};
> +use std::sync::{Arc, Mutex};
>
> use anyhow::{bail, Error};
> use futures::FutureExt;
> @@ -364,6 +366,7 @@ fn extract(
> None,
> None,
> options,
> + Arc::new(Mutex::new(VecDeque::new())),
> )
> .await
> }
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index e3b0faac..74ee04f7 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -1,10 +1,10 @@
> -use std::collections::HashSet;
> +use std::collections::{HashSet, VecDeque};
> use std::ffi::OsStr;
> use std::fs::OpenOptions;
> use std::os::unix::fs::OpenOptionsExt;
> use std::path::{Path, PathBuf};
> use std::sync::atomic::{AtomicBool, Ordering};
> -use std::sync::Arc;
> +use std::sync::{Arc, Mutex};
>
> use anyhow::{bail, format_err, Error};
> use futures::future::FutureExt;
> @@ -385,6 +385,7 @@ async fn create_archive(
> None,
> None,
> options,
> + Arc::new(Mutex::new(VecDeque::new())),
None / None merged with payload writer
> )
> .await?;
>
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 04af4ffd..6edd747d 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -1,4 +1,6 @@
> +use std::collections::VecDeque;
> use std::process::Command;
> +use std::sync::{Arc, Mutex};
>
> use anyhow::Error;
>
> @@ -41,6 +43,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
> None,
> None,
> options,
> + Arc::new(Mutex::new(VecDeque::new())),
same
> ))?;
>
> Command::new("cmp")
> --
> 2.39.2
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>
>
>
More information about the pbs-devel
mailing list