[pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues

Christian Ebner c.ebner at proxmox.com
Mon Apr 8 15:54:31 CEST 2024


On 4/4/24 16:52, Fabian Grünbichler wrote:
> On March 28, 2024 1:36 pm, Christian Ebner wrote:
>> Adds a queue to the chunk stream to request forced boundaries at a
>> given offset within the stream and inject reused dynamic entries
>> after this boundary.
>>
>> The chunks are then passed along to the uploader stream using the
>> injection queue, which inserts them during upload.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 2:
>> - combined queues into new optional struct
>> - refactoring
>>
>>   examples/test_chunk_speed2.rs                 |  2 +-
>>   pbs-client/src/backup_writer.rs               | 89 +++++++++++--------
>>   pbs-client/src/chunk_stream.rs                | 36 +++++++-
>>   pbs-client/src/pxar/create.rs                 |  6 +-
>>   pbs-client/src/pxar_backup_stream.rs          |  7 +-
>>   proxmox-backup-client/src/main.rs             | 31 ++++---
>>   .../src/proxmox_restore_daemon/api.rs         |  1 +
>>   pxar-bin/src/main.rs                          |  1 +
>>   tests/catar.rs                                |  1 +
>>   9 files changed, 121 insertions(+), 53 deletions(-)
>>
>> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
>> index 3f69b436d..22dd14ce2 100644
>> --- a/examples/test_chunk_speed2.rs
>> +++ b/examples/test_chunk_speed2.rs
>> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
>>           .map_err(Error::from);
>>   
>>       //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
>> -    let mut chunk_stream = ChunkStream::new(stream, None);
>> +    let mut chunk_stream = ChunkStream::new(stream, None, None);
>>   
>>       let start_time = std::time::Instant::now();
>>   
>> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
>> index 8bd0e4f36..032d93da7 100644
>> --- a/pbs-client/src/backup_writer.rs
>> +++ b/pbs-client/src/backup_writer.rs
>> @@ -1,4 +1,4 @@
>> -use std::collections::HashSet;
>> +use std::collections::{HashSet, VecDeque};
>>   use std::future::Future;
>>   use std::os::unix::fs::OpenOptionsExt;
>>   use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
>> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>>   
>>   use proxmox_human_byte::HumanByte;
>>   
>> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>>   use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>>   
>>   use super::{H2Client, HttpClient};
>> @@ -265,6 +266,7 @@ impl BackupWriter {
>>           archive_name: &str,
>>           stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
>>           options: UploadOptions,
>> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
>>       ) -> Result<BackupStats, Error> {
>>           let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>>   
>> @@ -341,6 +343,7 @@ impl BackupWriter {
>>                   None
>>               },
>>               options.compress,
>> +            injection_queue,
>>           )
>>           .await?;
>>   
>> @@ -637,6 +640,7 @@ impl BackupWriter {
>>           known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>           crypt_config: Option<Arc<CryptConfig>>,
>>           compress: bool,
>> +        injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
>>       ) -> impl Future<Output = Result<UploadStats, Error>> {
>>           let total_chunks = Arc::new(AtomicUsize::new(0));
>>           let total_chunks2 = total_chunks.clone();
>> @@ -663,48 +667,63 @@ impl BackupWriter {
>>           let index_csum_2 = index_csum.clone();
>>   
>>           stream
>> -            .and_then(move |data| {
>> -                let chunk_len = data.len();
>> +            .inject_reused_chunks(
>> +                injection_queue.unwrap_or_default(),
>> +                stream_len,
>> +                reused_len.clone(),
>> +                index_csum.clone(),
>> +            )
>> +            .and_then(move |chunk_info| match chunk_info {
> 
> for this part here I am still not sure whether doing all of the
> accounting here wouldn't be nicer..
> 

Moved almost all the accounting to here, only stream len is still 
required for the offset calculation in `inject_reused_chunks`.

> 
>> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
>> index a45420ca0..6ac0c638b 100644
>> --- a/pbs-client/src/chunk_stream.rs
>> +++ b/pbs-client/src/chunk_stream.rs
>> @@ -38,15 +38,17 @@ pub struct ChunkStream<S: Unpin> {
>>       chunker: Chunker,
>>       buffer: BytesMut,
>>       scan_pos: usize,
>> +    injection_data: Option<InjectionData>,
>>   }
>>   
>>   impl<S: Unpin> ChunkStream<S> {
>> -    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
>> +    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
>>           Self {
>>               input,
>>               chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
>>               buffer: BytesMut::new(),
>>               scan_pos: 0,
>> +            injection_data,
>>           }
>>       }
>>   }
>> @@ -64,6 +66,34 @@ where
>>       fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>>           let this = self.get_mut();
>>           loop {
>> +            if let Some(InjectionData {
>> +                boundaries,
>> +                injections,
>> +                consumed,
>> +            }) = this.injection_data.as_mut()
>> +            {
>> +                // Make sure to release this lock as soon as possible
>> +                let mut boundaries = boundaries.lock().unwrap();
>> +                if let Some(inject) = boundaries.pop_front() {
> 
> here I am a bit more wary that this popping and re-pushing might hurt
> performance..

> 
>> +                    let max = *consumed + this.buffer.len() as u64;
>> +                    if inject.boundary <= max {
>> +                        let chunk_size = (inject.boundary - *consumed) as usize;
>> +                        let result = this.buffer.split_to(chunk_size);
> 
> a comment or better variable naming would make this easier to follow
> along.. >
> "result" is a forced chunk that is created here because we've reached a
> point where we want to inject something afterwards..
> 

Improved the variable naming and added comments to clarify the 
functionality for the upcoming version of the patches.

> once more I am wondering here whether for the payload stream, a vastly
> simplified chunker that just picks the boundaries based on re-use and
> payload size(s) (to avoid the one file == one chunk pathological case
> for lots of small files) wouldn't improve performance :)

Do you suggest to have 2 chunker implementations and for the payload 
stream, instead of performing chunking by the statistical sliding window 
approach use the  provide the chunk boundaries by some interface rather 
than performing the chunking based on the statistical approach with the 
sliding window? As you mentioned in response to Dietmar on patch 49 of 
this patch series version?




More information about the pbs-devel mailing list