[pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async

Stefan Reiter s.reiter at proxmox.com
Mon Jul 20 17:02:18 CEST 2020


It's currently only used in proxmox-backup-qemu, so no users in this
package need to be changed.

This also allows simplifying the interface to just what is needed in
proxmox-backup-qemu, i.e. by making it async we need to remove the
std::io::Read trait, and that allows to get rid of the seeking
workaround.

Cache locking is done with a reader-writer lock, though multiple
requests for one non-cached chunk are still possible (cannot hold a lock
during async HTTP request) - though in testing, performance did not
regress.

The "sequential read" optimization is removed, since it didn't serve any
purpose AFAICT. chunk_end was not used anywhere else (gave a warning).

Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---

I'm unsure about the approach here, happy to hear feedback from anyone more
familiar with Rust. I couldn't find a way to combine the "read" trait impl with
async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
user I don't think this is an issue, but I don't know if this interface was put
here deliberately, even if unused elsewhere.

 src/backup/fixed_index.rs | 145 ++++++++++----------------------------
 1 file changed, 37 insertions(+), 108 deletions(-)

diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..3f5fde2a 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -1,5 +1,6 @@
 use anyhow::{bail, format_err, Error};
 use std::io::{Seek, SeekFrom};
+use std::cmp::min;
 
 use super::chunk_stat::*;
 use super::chunk_store::*;
@@ -11,7 +12,7 @@ use std::fs::File;
 use std::io::Write;
 use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc,RwLock};
 
 use super::read_chunk::*;
 use super::ChunkInfo;
@@ -146,20 +147,6 @@ impl FixedIndexReader {
         Ok(())
     }
 
-    #[inline]
-    fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_length {
-            panic!("chunk index out of range");
-        }
-
-        let end = ((pos + 1) * self.chunk_size) as u64;
-        if end > self.size {
-            self.size
-        } else {
-            end
-        }
-    }
-
     pub fn print_info(&self) {
         println!("Size: {}", self.size);
         println!("ChunkSize: {}", self.chunk_size);
@@ -466,27 +453,29 @@ impl FixedIndexWriter {
     }
 }
 
+struct ChunkBuffer {
+    data: Vec<u8>,
+    chunk_idx: usize,
+}
+
 pub struct BufferedFixedReader<S> {
     store: S,
     index: FixedIndexReader,
     archive_size: u64,
-    read_buffer: Vec<u8>,
-    buffered_chunk_idx: usize,
-    buffered_chunk_start: u64,
-    read_offset: u64,
+    buffer: RwLock<ChunkBuffer>,
 }
 
-impl<S: ReadChunk> BufferedFixedReader<S> {
+impl<S: AsyncReadChunk> BufferedFixedReader<S> {
     pub fn new(index: FixedIndexReader, store: S) -> Self {
         let archive_size = index.size;
         Self {
             store,
             index,
             archive_size,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            buffered_chunk_idx: 0,
-            buffered_chunk_start: 0,
-            read_offset: 0,
+            buffer: RwLock::new(ChunkBuffer {
+                data: Vec::with_capacity(1024 * 1024 * 4),
+                chunk_idx: 0,
+            }),
         }
     }
 
@@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
         self.archive_size
     }
 
-    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
+    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
         let index = &self.index;
         let info = match index.chunk_info(idx) {
             Some(info) => info,
@@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
 
         // fixme: avoid copy
 
-        let data = self.store.read_chunk(&info.digest)?;
+        let data = self.store.read_chunk(&info.digest).await?;
         let size = info.range.end - info.range.start;
         if size != data.len() as u64 {
             bail!("read chunk with wrong size ({} != {}", size, data.len());
         }
 
-        self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
+        let mut buffer = self.buffer.write().unwrap();
+        buffer.data.clear();
+        buffer.data.extend_from_slice(&data);
+        buffer.chunk_idx = idx;
 
-        self.buffered_chunk_idx = idx;
-
-        self.buffered_chunk_start = info.range.start as u64;
-        Ok(())
+        Ok(data)
     }
-}
 
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
-    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
+    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
         if offset == self.archive_size {
-            return Ok(&self.read_buffer[0..0]);
+            return Ok(0);
         }
 
-        let buffer_len = self.read_buffer.len();
-        let index = &self.index;
+        let idx = (offset / self.index.chunk_size as u64) as usize;
+
+        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
+            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
+            let data_len = min(from.len() - buffer_offset, size);
+            unsafe {
+                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
+            }
+            data_len as _
+        };
 
-        // optimization for sequential read
-        if buffer_len > 0
-            && ((self.buffered_chunk_idx + 1) < index.index_length)
-            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
         {
-            let next_idx = self.buffered_chunk_idx + 1;
-            let next_end = index.chunk_end(next_idx);
-            if offset < next_end {
-                self.buffer_chunk(next_idx)?;
-                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-                return Ok(&self.read_buffer[buffer_offset..]);
+            let buffer = self.buffer.read().unwrap();
+            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
+                return Ok(copy_to_buf(&buffer.data));
             }
         }
 
-        if (buffer_len == 0)
-            || (offset < self.buffered_chunk_start)
-            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let idx = (offset / index.chunk_size as u64) as usize;
-            self.buffer_chunk(idx)?;
-        }
-
-        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-        Ok(&self.read_buffer[buffer_offset..])
-    }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-        use crate::tools::BufferedRead;
-        use std::io::{Error, ErrorKind};
-
-        let data = match self.buffered_read(self.read_offset) {
-            Ok(v) => v,
-            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
-        };
-
-        let n = if data.len() > buf.len() {
-            buf.len()
-        } else {
-            data.len()
-        };
-
-        unsafe {
-            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
-        }
-
-        self.read_offset += n as u64;
-
-        Ok(n)
-    }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-        let new_offset = match pos {
-            SeekFrom::Start(start_offset) => start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
-            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
-        };
-
-        use std::io::{Error, ErrorKind};
-        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!(
-                    "seek is out of range {} ([0..{}])",
-                    new_offset, self.archive_size
-                ),
-            ));
-        }
-        self.read_offset = new_offset as u64;
-
-        Ok(self.read_offset)
+        let new_data = self.buffer_chunk(idx).await?;
+        Ok(copy_to_buf(&new_data))
     }
 }
-- 
2.20.1






More information about the pbs-devel mailing list