[pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
Dominik Csapak
d.csapak at proxmox.com
Tue Jul 21 11:37:41 CEST 2020
On 7/20/20 5:02 PM, Stefan Reiter wrote:
> It's currently only used in proxmox-backup-qemu, so no users in this
> package need to be changed.
>
> This also allows simplifying the interface to just what is needed in
> proxmox-backup-qemu, i.e. by making it async we need to remove the
> std::io::Read trait, and that allows to get rid of the seeking
> workaround.
>
> Cache locking is done with a reader-writer lock, though multiple
> requests for one non-cached chunk are still possible (cannot hold a lock
> during async HTTP request) - though in testing, performance did not
> regress.
>
> The "sequential read" optimization is removed, since it didn't serve any
> purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
>
> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
> ---
>
> I'm unsure about the approach here, happy to hear feedback from anyone more
> familiar with Rust. I couldn't find a way to combine the "read" trait impl with
> async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
> user I don't think this is an issue, but I don't know if this interface was put
> here deliberately, even if unused elsewhere.
high level comment:
maybe it would be nicer to implement AsyncSeek[0] for our existing
AsyncIndexReader (wich also buffers a single chunk, but works
for fixed + dynamic index). then we could simply keep the
behaviour we have in proxmox-backup-qemu (just add '.await')
for implementing this, we would have to extend the Index trait
to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx,
offset_in_chunk)
since a 'seek' would then be 'instant' (at least there is nothing we
could await),
we would just have a 'psuedo-async' interface to implement
it would then 'magically' work also for a dynamic index
alternatively, we could also implement the 'async_buffered_read' simply
for our AsyncIndexReader also and still drop this here entirely
more comments inline
0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html
>
> src/backup/fixed_index.rs | 145 ++++++++++----------------------------
> 1 file changed, 37 insertions(+), 108 deletions(-)
>
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..3f5fde2a 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -1,5 +1,6 @@
> use anyhow::{bail, format_err, Error};
> use std::io::{Seek, SeekFrom};
> +use std::cmp::min;
no need to use this see comment further below
>
> use super::chunk_stat::*;
> use super::chunk_store::*;
> @@ -11,7 +12,7 @@ use std::fs::File;
> use std::io::Write;
> use std::os::unix::io::AsRawFd;
> use std::path::{Path, PathBuf};
> -use std::sync::Arc;
> +use std::sync::{Arc,RwLock};
>
> use super::read_chunk::*;
> use super::ChunkInfo;
> @@ -146,20 +147,6 @@ impl FixedIndexReader {
> Ok(())
> }
>
> - #[inline]
> - fn chunk_end(&self, pos: usize) -> u64 {
> - if pos >= self.index_length {
> - panic!("chunk index out of range");
> - }
> -
> - let end = ((pos + 1) * self.chunk_size) as u64;
> - if end > self.size {
> - self.size
> - } else {
> - end
> - }
> - }
> -
> pub fn print_info(&self) {
> println!("Size: {}", self.size);
> println!("ChunkSize: {}", self.chunk_size);
> @@ -466,27 +453,29 @@ impl FixedIndexWriter {
> }
> }
>
> +struct ChunkBuffer {
> + data: Vec<u8>,
> + chunk_idx: usize,
> +}
> +
> pub struct BufferedFixedReader<S> {
> store: S,
> index: FixedIndexReader,
> archive_size: u64,
> - read_buffer: Vec<u8>,
> - buffered_chunk_idx: usize,
> - buffered_chunk_start: u64,
> - read_offset: u64,
> + buffer: RwLock<ChunkBuffer>,
> }
>
> -impl<S: ReadChunk> BufferedFixedReader<S> {
> +impl<S: AsyncReadChunk> BufferedFixedReader<S> {
> pub fn new(index: FixedIndexReader, store: S) -> Self {
> let archive_size = index.size;
> Self {
> store,
> index,
> archive_size,
> - read_buffer: Vec::with_capacity(1024 * 1024),
> - buffered_chunk_idx: 0,
> - buffered_chunk_start: 0,
> - read_offset: 0,
> + buffer: RwLock::new(ChunkBuffer {
> + data: Vec::with_capacity(1024 * 1024 * 4),
> + chunk_idx: 0,
> + }),
> }
> }
>
> @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
> self.archive_size
> }
>
> - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
> + async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
> let index = &self.index;
> let info = match index.chunk_info(idx) {
> Some(info) => info,
> @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>
> // fixme: avoid copy
>
> - let data = self.store.read_chunk(&info.digest)?;
> + let data = self.store.read_chunk(&info.digest).await?;
> let size = info.range.end - info.range.start;
> if size != data.len() as u64 {
> bail!("read chunk with wrong size ({} != {}", size, data.len());
> }
>
> - self.read_buffer.clear();
> - self.read_buffer.extend_from_slice(&data);
> + let mut buffer = self.buffer.write().unwrap();
> + buffer.data.clear();
> + buffer.data.extend_from_slice(&data);
> + buffer.chunk_idx = idx;
>
> - self.buffered_chunk_idx = idx;
> -
> - self.buffered_chunk_start = info.range.start as u64;
> - Ok(())
> + Ok(data)
> }
> -}
>
> -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
> - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
> + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
> if offset == self.archive_size {
> - return Ok(&self.read_buffer[0..0]);
> + return Ok(0);
> }
>
> - let buffer_len = self.read_buffer.len();
> - let index = &self.index;
> + let idx = (offset / self.index.chunk_size as u64) as usize;
> +
> + let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
> + let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
> + let data_len = min(from.len() - buffer_offset, size);
instead of using std::cmp::min, you can simply use min on a number:
let data_len = size.min(from.len() - buffer_offset);
> + unsafe {
> + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
> + }
here you can avoid unsafe code by writing:
buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]);
(does internally exactly the same, but checks the lengths and panics in
error case instead of corrupting memory/segfaulting)
> + data_len as _
> + };
>
> - // optimization for sequential read
> - if buffer_len > 0
> - && ((self.buffered_chunk_idx + 1) < index.index_length)
> - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> {
> - let next_idx = self.buffered_chunk_idx + 1;
> - let next_end = index.chunk_end(next_idx);
> - if offset < next_end {
> - self.buffer_chunk(next_idx)?;
> - let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> - return Ok(&self.read_buffer[buffer_offset..]);
> + let buffer = self.buffer.read().unwrap();
> + if buffer.data.len() != 0 && buffer.chunk_idx == idx {
> + return Ok(copy_to_buf(&buffer.data));
> }
> }
>
> - if (buffer_len == 0)
> - || (offset < self.buffered_chunk_start)
> - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> - {
> - let idx = (offset / index.chunk_size as u64) as usize;
> - self.buffer_chunk(idx)?;
> - }
> -
> - let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> - Ok(&self.read_buffer[buffer_offset..])
> - }
> -}
> -
> -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
> - fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
> - use crate::tools::BufferedRead;
> - use std::io::{Error, ErrorKind};
> -
> - let data = match self.buffered_read(self.read_offset) {
> - Ok(v) => v,
> - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
> - };
> -
> - let n = if data.len() > buf.len() {
> - buf.len()
> - } else {
> - data.len()
> - };
> -
> - unsafe {
> - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
> - }
> -
> - self.read_offset += n as u64;
> -
> - Ok(n)
> - }
> -}
> -
> -impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
> - fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
> - let new_offset = match pos {
> - SeekFrom::Start(start_offset) => start_offset as i64,
> - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
> - SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
> - };
> -
> - use std::io::{Error, ErrorKind};
> - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
> - return Err(Error::new(
> - ErrorKind::Other,
> - format!(
> - "seek is out of range {} ([0..{}])",
> - new_offset, self.archive_size
> - ),
> - ));
> - }
> - self.read_offset = new_offset as u64;
> -
> - Ok(self.read_offset)
> + let new_data = self.buffer_chunk(idx).await?;
> + Ok(copy_to_buf(&new_data))
> }
> }
>
More information about the pbs-devel
mailing list