[pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison
Christian Ebner
c.ebner at proxmox.com
Wed Feb 28 15:02:24 CET 2024
Add the final glue logic to enable the look-ahead caching and
metadata comparison introduced in the preparatory patches.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
pbs-client/src/pxar/create.rs | 107 ++++++++++++++++++++++++++++++++--
1 file changed, 102 insertions(+), 5 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index fbcccb2e..ad76890c 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -32,10 +32,14 @@ use pbs_datastore::dynamic_index::{
};
use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::lookahead_cache::{CacheEntry, CacheEntryData};
use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
+const MAX_CACHE_SIZE: usize = 1024;
+const CACHED_PAYLOAD_THRESHOLD: u64 = 2 * 1024 * 1024;
+
#[derive(Default)]
struct ReusedChunks {
start_boundary: PayloadOffset,
@@ -253,6 +257,9 @@ struct Archiver {
reused_chunks: ReusedChunks,
previous_payload_index: Option<DynamicIndexReader>,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+ cached_entries: Vec<CacheEntry>,
+ caching_enabled: bool,
+ cached_payload_size: u64,
}
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -335,11 +342,26 @@ where
reused_chunks: ReusedChunks::new(),
previous_payload_index,
forced_boundaries,
+ cached_entries: Vec::new(),
+ caching_enabled: false,
+ cached_payload_size: 0,
};
archiver
.archive_dir_contents(&mut encoder, accessor, source_dir, true)
.await?;
+
+ if let Some(last) = archiver.cached_entries.pop() {
+ match last {
+ CacheEntry::DirEnd => {}
+ _ => archiver.cached_entries.push(last),
+ }
+ }
+
+ archiver
+ .flush_cached_to_archive(&mut encoder, false)
+ .await?;
+
encoder.finish().await?;
Ok(())
}
@@ -413,6 +435,11 @@ impl Archiver {
.await
.map_err(|err| self.wrap_err(err))?;
}
+
+ if self.caching_enabled {
+ self.cached_entries.push(CacheEntry::DirEnd);
+ }
+
self.path = old_path;
self.entry_counter = entry_counter;
self.patterns.truncate(old_patterns_count);
@@ -693,8 +720,6 @@ impl Archiver {
c_file_name: &CStr,
stat: &FileStat,
) -> Result<(), Error> {
- use pxar::format::mode;
-
let file_mode = stat.st_mode & libc::S_IFMT;
let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
OFlag::empty()
@@ -732,6 +757,71 @@ impl Archiver {
self.skip_e2big_xattr,
)?;
+ if self.previous_payload_index.is_none() {
+ return self
+ .add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await;
+ }
+
+ // Avoid having to many open file handles in cached entries
+ if self.cached_entries.len() > MAX_CACHE_SIZE {
+ self.flush_cached_to_archive(encoder, false).await?;
+ }
+
+ if metadata.is_regular_file() {
+ self.cache_or_flush_entries(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ } else {
+ if self.caching_enabled {
+ if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+ let fd_clone = fd.try_clone()?;
+ let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata.clone(),
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+
+ let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+ self.add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ if !self.caching_enabled {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ }
+ } else {
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata,
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+ Ok(())
+ } else {
+ self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ }
+ }
+ }
+
+ async fn add_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
match metadata.file_type() {
mode::IFREG => {
@@ -781,7 +871,9 @@ impl Archiver {
.add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
.await;
if let Some(ref catalog) = self.catalog {
- catalog.lock().unwrap().end_directory()?;
+ if !self.caching_enabled {
+ catalog.lock().unwrap().end_directory()?;
+ }
}
result
}
@@ -1123,7 +1215,9 @@ impl Archiver {
) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
- encoder.create_directory(dir_name, metadata).await?;
+ if !self.caching_enabled {
+ encoder.create_directory(dir_name, metadata).await?;
+ }
let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags;
@@ -1163,7 +1257,10 @@ impl Archiver {
self.fs_feature_flags = old_fs_feature_flags;
self.current_st_dev = old_st_dev;
- encoder.finish().await?;
+ if !self.caching_enabled {
+ encoder.finish().await?;
+ }
+
result
}
--
2.39.2
More information about the pbs-devel
mailing list