[pbs-devel] [RFC v2 proxmox-backup 18/23] fix #3174: backup writer: inject queued chunk in upload steam

Christian Ebner c.ebner at proxmox.com
Mon Oct 9 13:51:34 CEST 2023


Inject the chunk in the backup writers upload stream, including them
thereby in the index file.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since version 1:
no changes

 pbs-client/src/backup_writer.rs | 84 +++++++++++++++++++--------------
 1 file changed, 49 insertions(+), 35 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index cc6dd49a..0f18b1ff 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -23,7 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
 
 use proxmox_human_byte::HumanByte;
 
-use super::inject_reused_chunks::InjectChunks;
+use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
 
 use super::{H2Client, HttpClient};
@@ -667,48 +667,62 @@ impl BackupWriter {
         let index_csum_2 = index_csum.clone();
 
         stream
-            .and_then(move |data| {
-                let chunk_len = data.len();
+            .inject_reused_chunks(injection_queue, stream_len, index_csum.clone())
+            .and_then(move |chunk_info| {
+                match chunk_info {
+                    InjectedChunksInfo::Known(chunks) => {
+                        total_chunks.fetch_add(chunks.len(), Ordering::SeqCst);
+                        future::ok(MergedChunkInfo::Known(chunks))
+                    }
+                    InjectedChunksInfo::Raw((offset, data)) => {
+                        let chunk_len = data.len();
 
-                total_chunks.fetch_add(1, Ordering::SeqCst);
-                let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
+                        total_chunks.fetch_add(1, Ordering::SeqCst);
 
-                let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
+                        let mut chunk_builder =
+                            DataChunkBuilder::new(data.as_ref()).compress(compress);
 
-                if let Some(ref crypt_config) = crypt_config {
-                    chunk_builder = chunk_builder.crypt_config(crypt_config);
-                }
+                        if let Some(ref crypt_config) = crypt_config {
+                            chunk_builder = chunk_builder.crypt_config(crypt_config);
+                        }
 
-                let mut known_chunks = known_chunks.lock().unwrap();
-                let digest = chunk_builder.digest();
+                        let mut known_chunks = known_chunks.lock().unwrap();
 
-                let mut guard = index_csum.lock().unwrap();
-                let csum = guard.as_mut().unwrap();
+                        let digest = chunk_builder.digest();
 
-                let chunk_end = offset + chunk_len as u64;
+                        let mut guard = index_csum.lock().unwrap();
+                        let csum = guard.as_mut().unwrap();
 
-                if !is_fixed_chunk_size {
-                    csum.update(&chunk_end.to_le_bytes());
-                }
-                csum.update(digest);
+                        let chunk_end = offset + chunk_len as u64;
 
-                let chunk_is_known = known_chunks.contains(digest);
-                if chunk_is_known {
-                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
-                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
-                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
-                } else {
-                    let compressed_stream_len2 = compressed_stream_len.clone();
-                    known_chunks.insert(*digest);
-                    future::ready(chunk_builder.build().map(move |(chunk, digest)| {
-                        compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
-                        MergedChunkInfo::New(ChunkInfo {
-                            chunk,
-                            digest,
-                            chunk_len: chunk_len as u64,
-                            offset,
-                        })
-                    }))
+                        if !is_fixed_chunk_size {
+                            csum.update(&chunk_end.to_le_bytes());
+                        }
+                        csum.update(digest);
+
+                        let chunk_is_known = known_chunks.contains(digest);
+                        if chunk_is_known {
+                            known_chunk_count.fetch_add(1, Ordering::SeqCst);
+                            reused_len.fetch_add(chunk_len, Ordering::SeqCst);
+
+                            future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
+                        } else {
+                            let compressed_stream_len2 = compressed_stream_len.clone();
+                            known_chunks.insert(*digest);
+
+                            future::ready(chunk_builder.build().map(move |(chunk, digest)| {
+                                compressed_stream_len2
+                                    .fetch_add(chunk.raw_size(), Ordering::SeqCst);
+
+                                MergedChunkInfo::New(ChunkInfo {
+                                    chunk,
+                                    digest,
+                                    chunk_len: chunk_len as u64,
+                                    offset,
+                                })
+                            }))
+                        }
+                    }
                 }
             })
             .merge_known_chunks()
-- 
2.39.2






More information about the pbs-devel mailing list