[pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
Stefan Reiter
s.reiter at proxmox.com
Mon Jul 20 17:02:18 CEST 2020
It's currently only used in proxmox-backup-qemu, so no users in this
package need to be changed.
This also allows simplifying the interface to just what is needed in
proxmox-backup-qemu, i.e. by making it async we need to remove the
std::io::Read trait, and that allows to get rid of the seeking
workaround.
Cache locking is done with a reader-writer lock, though multiple
requests for one non-cached chunk are still possible (cannot hold a lock
during async HTTP request) - though in testing, performance did not
regress.
The "sequential read" optimization is removed, since it didn't serve any
purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
---
I'm unsure about the approach here, happy to hear feedback from anyone more
familiar with Rust. I couldn't find a way to combine the "read" trait impl with
async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
user I don't think this is an issue, but I don't know if this interface was put
here deliberately, even if unused elsewhere.
src/backup/fixed_index.rs | 145 ++++++++++----------------------------
1 file changed, 37 insertions(+), 108 deletions(-)
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..3f5fde2a 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, format_err, Error};
use std::io::{Seek, SeekFrom};
+use std::cmp::min;
use super::chunk_stat::*;
use super::chunk_store::*;
@@ -11,7 +12,7 @@ use std::fs::File;
use std::io::Write;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc,RwLock};
use super::read_chunk::*;
use super::ChunkInfo;
@@ -146,20 +147,6 @@ impl FixedIndexReader {
Ok(())
}
- #[inline]
- fn chunk_end(&self, pos: usize) -> u64 {
- if pos >= self.index_length {
- panic!("chunk index out of range");
- }
-
- let end = ((pos + 1) * self.chunk_size) as u64;
- if end > self.size {
- self.size
- } else {
- end
- }
- }
-
pub fn print_info(&self) {
println!("Size: {}", self.size);
println!("ChunkSize: {}", self.chunk_size);
@@ -466,27 +453,29 @@ impl FixedIndexWriter {
}
}
+struct ChunkBuffer {
+ data: Vec<u8>,
+ chunk_idx: usize,
+}
+
pub struct BufferedFixedReader<S> {
store: S,
index: FixedIndexReader,
archive_size: u64,
- read_buffer: Vec<u8>,
- buffered_chunk_idx: usize,
- buffered_chunk_start: u64,
- read_offset: u64,
+ buffer: RwLock<ChunkBuffer>,
}
-impl<S: ReadChunk> BufferedFixedReader<S> {
+impl<S: AsyncReadChunk> BufferedFixedReader<S> {
pub fn new(index: FixedIndexReader, store: S) -> Self {
let archive_size = index.size;
Self {
store,
index,
archive_size,
- read_buffer: Vec::with_capacity(1024 * 1024),
- buffered_chunk_idx: 0,
- buffered_chunk_start: 0,
- read_offset: 0,
+ buffer: RwLock::new(ChunkBuffer {
+ data: Vec::with_capacity(1024 * 1024 * 4),
+ chunk_idx: 0,
+ }),
}
}
@@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
self.archive_size
}
- fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
+ async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
let index = &self.index;
let info = match index.chunk_info(idx) {
Some(info) => info,
@@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
// fixme: avoid copy
- let data = self.store.read_chunk(&info.digest)?;
+ let data = self.store.read_chunk(&info.digest).await?;
let size = info.range.end - info.range.start;
if size != data.len() as u64 {
bail!("read chunk with wrong size ({} != {}", size, data.len());
}
- self.read_buffer.clear();
- self.read_buffer.extend_from_slice(&data);
+ let mut buffer = self.buffer.write().unwrap();
+ buffer.data.clear();
+ buffer.data.extend_from_slice(&data);
+ buffer.chunk_idx = idx;
- self.buffered_chunk_idx = idx;
-
- self.buffered_chunk_start = info.range.start as u64;
- Ok(())
+ Ok(data)
}
-}
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
- fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
+ pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
if offset == self.archive_size {
- return Ok(&self.read_buffer[0..0]);
+ return Ok(0);
}
- let buffer_len = self.read_buffer.len();
- let index = &self.index;
+ let idx = (offset / self.index.chunk_size as u64) as usize;
+
+ let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
+ let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
+ let data_len = min(from.len() - buffer_offset, size);
+ unsafe {
+ std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
+ }
+ data_len as _
+ };
- // optimization for sequential read
- if buffer_len > 0
- && ((self.buffered_chunk_idx + 1) < index.index_length)
- && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
{
- let next_idx = self.buffered_chunk_idx + 1;
- let next_end = index.chunk_end(next_idx);
- if offset < next_end {
- self.buffer_chunk(next_idx)?;
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- return Ok(&self.read_buffer[buffer_offset..]);
+ let buffer = self.buffer.read().unwrap();
+ if buffer.data.len() != 0 && buffer.chunk_idx == idx {
+ return Ok(copy_to_buf(&buffer.data));
}
}
- if (buffer_len == 0)
- || (offset < self.buffered_chunk_start)
- || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
- {
- let idx = (offset / index.chunk_size as u64) as usize;
- self.buffer_chunk(idx)?;
- }
-
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- Ok(&self.read_buffer[buffer_offset..])
- }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
- use crate::tools::BufferedRead;
- use std::io::{Error, ErrorKind};
-
- let data = match self.buffered_read(self.read_offset) {
- Ok(v) => v,
- Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
- };
-
- let n = if data.len() > buf.len() {
- buf.len()
- } else {
- data.len()
- };
-
- unsafe {
- std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
- }
-
- self.read_offset += n as u64;
-
- Ok(n)
- }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
- fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
- let new_offset = match pos {
- SeekFrom::Start(start_offset) => start_offset as i64,
- SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
- SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
- };
-
- use std::io::{Error, ErrorKind};
- if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
- return Err(Error::new(
- ErrorKind::Other,
- format!(
- "seek is out of range {} ([0..{}])",
- new_offset, self.archive_size
- ),
- ));
- }
- self.read_offset = new_offset as u64;
-
- Ok(self.read_offset)
+ let new_data = self.buffer_chunk(idx).await?;
+ Ok(copy_to_buf(&new_data))
}
}
--
2.20.1
More information about the pbs-devel
mailing list