[pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async

Dominik Csapak d.csapak at proxmox.com
Tue Jul 21 11:37:41 CEST 2020


On 7/20/20 5:02 PM, Stefan Reiter wrote:
> It's currently only used in proxmox-backup-qemu, so no users in this
> package need to be changed.
> 
> This also allows simplifying the interface to just what is needed in
> proxmox-backup-qemu, i.e. by making it async we need to remove the
> std::io::Read trait, and that allows to get rid of the seeking
> workaround.
> 
> Cache locking is done with a reader-writer lock, though multiple
> requests for one non-cached chunk are still possible (cannot hold a lock
> during async HTTP request) - though in testing, performance did not
> regress.
> 
> The "sequential read" optimization is removed, since it didn't serve any
> purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
> 
> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
> ---
> 
> I'm unsure about the approach here, happy to hear feedback from anyone more
> familiar with Rust. I couldn't find a way to combine the "read" trait impl with
> async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
> user I don't think this is an issue, but I don't know if this interface was put
> here deliberately, even if unused elsewhere.

high level comment:

maybe it would be nicer to implement AsyncSeek[0] for our existing
AsyncIndexReader (wich also buffers a single chunk, but works
for fixed + dynamic index). then we could simply keep the
behaviour we have in proxmox-backup-qemu (just add '.await')

for implementing this, we would have to extend the Index trait
to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx, 
offset_in_chunk)

since a 'seek' would then be 'instant' (at least there is nothing we 
could await),
we would just have a 'psuedo-async' interface to implement

it would then 'magically' work also for a dynamic index

alternatively, we could also implement the 'async_buffered_read' simply
for our AsyncIndexReader also and still drop this here entirely

more comments inline

0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html

> 
>   src/backup/fixed_index.rs | 145 ++++++++++----------------------------
>   1 file changed, 37 insertions(+), 108 deletions(-)
> 
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..3f5fde2a 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -1,5 +1,6 @@
>   use anyhow::{bail, format_err, Error};
>   use std::io::{Seek, SeekFrom};
> +use std::cmp::min;

no need to use this see comment further below

>   
>   use super::chunk_stat::*;
>   use super::chunk_store::*;
> @@ -11,7 +12,7 @@ use std::fs::File;
>   use std::io::Write;
>   use std::os::unix::io::AsRawFd;
>   use std::path::{Path, PathBuf};
> -use std::sync::Arc;
> +use std::sync::{Arc,RwLock};
>   
>   use super::read_chunk::*;
>   use super::ChunkInfo;
> @@ -146,20 +147,6 @@ impl FixedIndexReader {
>           Ok(())
>       }
>   
> -    #[inline]
> -    fn chunk_end(&self, pos: usize) -> u64 {
> -        if pos >= self.index_length {
> -            panic!("chunk index out of range");
> -        }
> -
> -        let end = ((pos + 1) * self.chunk_size) as u64;
> -        if end > self.size {
> -            self.size
> -        } else {
> -            end
> -        }
> -    }
> -
>       pub fn print_info(&self) {
>           println!("Size: {}", self.size);
>           println!("ChunkSize: {}", self.chunk_size);
> @@ -466,27 +453,29 @@ impl FixedIndexWriter {
>       }
>   }
>   
> +struct ChunkBuffer {
> +    data: Vec<u8>,
> +    chunk_idx: usize,
> +}
> +
>   pub struct BufferedFixedReader<S> {
>       store: S,
>       index: FixedIndexReader,
>       archive_size: u64,
> -    read_buffer: Vec<u8>,
> -    buffered_chunk_idx: usize,
> -    buffered_chunk_start: u64,
> -    read_offset: u64,
> +    buffer: RwLock<ChunkBuffer>,
>   }
>   
> -impl<S: ReadChunk> BufferedFixedReader<S> {
> +impl<S: AsyncReadChunk> BufferedFixedReader<S> {
>       pub fn new(index: FixedIndexReader, store: S) -> Self {
>           let archive_size = index.size;
>           Self {
>               store,
>               index,
>               archive_size,
> -            read_buffer: Vec::with_capacity(1024 * 1024),
> -            buffered_chunk_idx: 0,
> -            buffered_chunk_start: 0,
> -            read_offset: 0,
> +            buffer: RwLock::new(ChunkBuffer {
> +                data: Vec::with_capacity(1024 * 1024 * 4),
> +                chunk_idx: 0,
> +            }),
>           }
>       }
>   
> @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>           self.archive_size
>       }
>   
> -    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
> +    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
>           let index = &self.index;
>           let info = match index.chunk_info(idx) {
>               Some(info) => info,
> @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>   
>           // fixme: avoid copy
>   
> -        let data = self.store.read_chunk(&info.digest)?;
> +        let data = self.store.read_chunk(&info.digest).await?;
>           let size = info.range.end - info.range.start;
>           if size != data.len() as u64 {
>               bail!("read chunk with wrong size ({} != {}", size, data.len());
>           }
>   
> -        self.read_buffer.clear();
> -        self.read_buffer.extend_from_slice(&data);
> +        let mut buffer = self.buffer.write().unwrap();
> +        buffer.data.clear();
> +        buffer.data.extend_from_slice(&data);
> +        buffer.chunk_idx = idx;
>   
> -        self.buffered_chunk_idx = idx;
> -
> -        self.buffered_chunk_start = info.range.start as u64;
> -        Ok(())
> +        Ok(data)
>       }
> -}
>   
> -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
> -    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
> +    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
>           if offset == self.archive_size {
> -            return Ok(&self.read_buffer[0..0]);
> +            return Ok(0);
>           }
>   
> -        let buffer_len = self.read_buffer.len();
> -        let index = &self.index;
> +        let idx = (offset / self.index.chunk_size as u64) as usize;
> +
> +        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
> +            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
> +            let data_len = min(from.len() - buffer_offset, size);

instead of using std::cmp::min, you can simply use min on a number:
let data_len =  size.min(from.len() - buffer_offset);

> +            unsafe {
> +                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
> +            }

here you can avoid unsafe code by writing:

buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]);

(does internally exactly the same, but checks the lengths and panics in 
error case instead of corrupting memory/segfaulting)

> +            data_len as _
> +        };
>   
> -        // optimization for sequential read
> -        if buffer_len > 0
> -            && ((self.buffered_chunk_idx + 1) < index.index_length)
> -            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
>           {
> -            let next_idx = self.buffered_chunk_idx + 1;
> -            let next_end = index.chunk_end(next_idx);
> -            if offset < next_end {
> -                self.buffer_chunk(next_idx)?;
> -                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -                return Ok(&self.read_buffer[buffer_offset..]);
> +            let buffer = self.buffer.read().unwrap();
> +            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
> +                return Ok(copy_to_buf(&buffer.data));
>               }
>           }
>   
> -        if (buffer_len == 0)
> -            || (offset < self.buffered_chunk_start)
> -            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> -        {
> -            let idx = (offset / index.chunk_size as u64) as usize;
> -            self.buffer_chunk(idx)?;
> -        }
> -
> -        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -        Ok(&self.read_buffer[buffer_offset..])
> -    }
> -}
> -
> -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
> -    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
> -        use crate::tools::BufferedRead;
> -        use std::io::{Error, ErrorKind};
> -
> -        let data = match self.buffered_read(self.read_offset) {
> -            Ok(v) => v,
> -            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
> -        };
> -
> -        let n = if data.len() > buf.len() {
> -            buf.len()
> -        } else {
> -            data.len()
> -        };
> -
> -        unsafe {
> -            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
> -        }
> -
> -        self.read_offset += n as u64;
> -
> -        Ok(n)
> -    }
> -}
> -
> -impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
> -    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
> -        let new_offset = match pos {
> -            SeekFrom::Start(start_offset) => start_offset as i64,
> -            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
> -            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
> -        };
> -
> -        use std::io::{Error, ErrorKind};
> -        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
> -            return Err(Error::new(
> -                ErrorKind::Other,
> -                format!(
> -                    "seek is out of range {} ([0..{}])",
> -                    new_offset, self.archive_size
> -                ),
> -            ));
> -        }
> -        self.read_offset = new_offset as u64;
> -
> -        Ok(self.read_offset)
> +        let new_data = self.buffer_chunk(idx).await?;
> +        Ok(copy_to_buf(&new_data))
>       }
>   }
> 






More information about the pbs-devel mailing list