[pbs-devel] [PATCH v6 proxmox-backup 65/65] chunk stream: tests: add regression tests for payload chunker

Dominik Csapak d.csapak at proxmox.com
Tue May 21 13:21:06 CEST 2024


this patch adds some warnings:

warning: unused import: `futures::stream::StreamExt`
warning: associated function `new` is never used

you can fix that with putting

#[cfg(test)]

before `mod test` so that code does not get executed outside `cargo test`

On 5/14/24 12:34, Christian Ebner wrote:
> Regression tests to cover suggested and forced boundaries as well as
> chunk injection.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
>   pbs-client/src/chunk_stream.rs | 87 ++++++++++++++++++++++++++++++++++
>   1 file changed, 87 insertions(+)
> 
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 4d5431a2b..764ae9989 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -238,3 +238,90 @@ where
>           }
>       }
>   }
> +
> +mod test {
> +    use futures::stream::StreamExt;
> +
> +    use super::*;
> +
> +    struct DummyInput {
> +        data: Vec<u8>,
> +    }
> +
> +    impl DummyInput {
> +        fn new(data: Vec<u8>) -> Self {
> +            Self { data }
> +        }
> +    }
> +
> +    impl Stream for DummyInput {
> +        type Item = Result<Vec<u8>, Error>;
> +
> +        fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
> +            let this = self.get_mut();
> +            match this.data.len() {
> +                0 => Poll::Ready(None),
> +                size if size > 10 => Poll::Ready(Some(Ok(this.data.split_off(10)))),
> +                _ => Poll::Ready(Some(Ok(std::mem::take(&mut this.data)))),
> +            }
> +        }
> +    }
> +
> +    #[test]
> +    fn test_chunk_stream_forced_boundaries() {
> +        let mut data = Vec::new();
> +        for i in 0..(256 * 1024) {
> +            for j in 0..4 {
> +                let byte = ((i >> (j << 3)) & 0xff) as u8;
> +                data.push(byte);
> +            }
> +        }
> +
> +        let mut input = DummyInput::new(data);
> +        let input = Pin::new(&mut input);
> +
> +        let (injections_tx, injections_rx) = mpsc::channel();
> +        let (boundaries_tx, boundaries_rx) = mpsc::channel();
> +        let (suggested_tx, suggested_rx) = mpsc::channel();
> +        let injection_data = InjectionData::new(boundaries_rx, injections_tx);
> +
> +        let mut chunk_stream = ChunkStream::new(input, Some(64 * 1024), Some(injection_data), Some(suggested_rx));
> +        let chunks = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
> +        let chunks_clone = chunks.clone();
> +
> +        // Suggested boundary matching forced boundary
> +        suggested_tx.send(32 * 1024).unwrap();
> +        // Suggested boundary not matching forced boundary
> +        suggested_tx.send(64 * 1024).unwrap();
> +        // Force chunk boundary at suggested boundary
> +        boundaries_tx.send(InjectChunks { boundary: 32 * 1024, chunks: Vec::new(), size: 1024 }).unwrap();
> +        // Force chunk boundary within regular chunk
> +        boundaries_tx.send(InjectChunks { boundary: 128 * 1024, chunks: Vec::new(), size: 2048 }).unwrap();
> +        // Force chunk boundary aligned with regular boundary
> +        boundaries_tx.send(InjectChunks { boundary: 657408, chunks: Vec::new(), size: 512 }).unwrap();
> +        // Force chunk boundary within regular chunk, without injecting data
> +        boundaries_tx.send(InjectChunks { boundary: 657408 + 1024, chunks: Vec::new(), size: 0 }).unwrap();
> +
> +        let rt = tokio::runtime::Runtime::new().unwrap();
> +        rt.block_on(async move {
> +            while let Some(chunk) = chunk_stream.next().await {
> +                let chunk = chunk.unwrap();
> +                let mut chunks = chunks.lock().unwrap();
> +                chunks.push(chunk);
> +            }
> +        });
> +
> +        let mut total = 0;
> +        let chunks = chunks_clone.lock().unwrap();
> +        let expected = [32768, 31744, 65536, 262144, 262144, 512, 262144, 131584];
> +        for (chunk, expected) in chunks.as_slice().iter().zip(expected.iter()) {
> +            assert_eq!(chunk.len(), *expected);
> +            total += chunk.len();
> +        }
> +        while let Ok(injection) = injections_rx.recv() {
> +            total += injection.size;
> +        }
> +
> +        assert_eq!(total, 4 * 256 * 1024 + 1024 + 2048 + 512);
> +    }
> +}





More information about the pbs-devel mailing list