[pbs-devel] [PATCH v6 proxmox-backup 20/29] fix #3174: archiver: reuse files with unchanged metadata
Christian Ebner
c.ebner at proxmox.com
Thu Jan 25 14:25:59 CET 2024
During pxar archive encoding, check regular files against their
previous backup catalogs metadata, if present.
Instead of re-encoding files with unchanged metadata with file size over
a given threshold limit, mark the entries as appendix references in the
pxar archive and append the chunks containing the file payload in the
appendix.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Changes since v5:
- fix formatting using `cargo fmt`
pbs-client/src/pxar/create.rs | 312 +++++++++++++++++-
proxmox-backup-client/src/main.rs | 8 +-
.../src/proxmox_restore_daemon/api.rs | 1 +
pxar-bin/src/main.rs | 1 +
src/tape/file_formats/snapshot_archive.rs | 2 +-
5 files changed, 303 insertions(+), 21 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 9980c393..a8e70651 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -17,21 +17,26 @@ use nix::sys::stat::{FileStat, Mode};
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use proxmox_sys::error::SysError;
-use pxar::encoder::{LinkOffset, SeqWrite};
+use pxar::encoder::{AppendixRefOffset, LinkOffset, SeqWrite};
use pxar::Metadata;
use proxmox_io::vec;
use proxmox_lang::c_str;
use proxmox_sys::fs::{self, acl, xattr};
-use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader};
-use pbs_datastore::dynamic_index::{DynamicEntry, DynamicIndexReader};
+use pbs_datastore::catalog::{
+ AppendixStartOffset, BackupCatalogWriter, CatalogReader, CatalogV2Extension, DirEntry,
+ DirEntryAttribute,
+};
+use pbs_datastore::dynamic_index::{AppendableDynamicEntry, DynamicIndexReader};
use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
+const MAX_FILE_SIZE: u64 = 1024;
+
/// Pxar options for creating a pxar archive/stream
#[derive(Default)]
pub struct PxarCreateOptions {
@@ -45,6 +50,8 @@ pub struct PxarCreateOptions {
pub skip_lost_and_found: bool,
/// Reference state for partial backups
pub previous_ref: Option<PxarPrevRef>,
+ /// Catalog archive name
+ pub archive_name: Option<CString>,
}
/// Contains statefull information of previous backups snapshots for partial backups
@@ -127,6 +134,93 @@ struct HardLinkInfo {
st_ino: u64,
}
+struct Appendix {
+ total: AppendixRefOffset,
+ chunks: Vec<AppendableDynamicEntry>,
+}
+
+impl Appendix {
+ fn new() -> Self {
+ Self {
+ total: AppendixRefOffset::default(),
+ chunks: Vec::new(),
+ }
+ }
+
+ fn is_empty(&self) -> bool {
+ self.chunks.is_empty()
+ }
+
+ fn insert(
+ &mut self,
+ indices: Vec<AppendableDynamicEntry>,
+ start_padding: u64,
+ ) -> AppendixRefOffset {
+ if let Some(offset) = self.digest_sequence_contained(&indices) {
+ AppendixRefOffset::default().add(offset + start_padding)
+ } else if let Some(offset) = self.last_digest_matched(&indices) {
+ for chunk in indices.into_iter().skip(1) {
+ self.total = self.total.add(chunk.size());
+ self.chunks.push(chunk);
+ }
+ AppendixRefOffset::default().add(offset + start_padding)
+ } else {
+ let offset = self.total;
+ for chunk in indices.into_iter() {
+ self.total = self.total.add(chunk.size());
+ self.chunks.push(chunk);
+ }
+ offset.add(start_padding)
+ }
+ }
+
+ fn digest_sequence_contained(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+ let digest = if let Some(first) = indices.first() {
+ first.digest()
+ } else {
+ return None;
+ };
+
+ let mut offset = 0;
+ let mut iter = self.chunks.iter();
+ while let Some(position) = iter.position(|e| {
+ offset += e.size();
+ e.digest() == digest
+ }) {
+ if indices.len() + position > self.chunks.len() {
+ return None;
+ }
+
+ for (ind, chunk) in indices.iter().skip(1).enumerate() {
+ if chunk.digest() != self.chunks[ind + position].digest() {
+ return None;
+ }
+ }
+
+ offset -= self.chunks[position].size();
+ return Some(offset);
+ }
+
+ None
+ }
+
+ fn last_digest_matched(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+ let digest = if let Some(first) = indices.first() {
+ first.digest()
+ } else {
+ return None;
+ };
+
+ if let Some(last) = self.chunks.last() {
+ if last.digest() == digest {
+ return Some(self.total.raw() - last.size());
+ }
+ }
+
+ None
+ }
+}
+
struct Archiver {
feature_flags: Flags,
fs_feature_flags: Flags,
@@ -144,7 +238,8 @@ struct Archiver {
file_copy_buffer: Vec<u8>,
previous_ref: Option<PxarPrevRef>,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
- inject: (usize, Vec<DynamicEntry>),
+ appendix: Appendix,
+ prev_appendix: Option<AppendixStartOffset>,
}
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -155,7 +250,7 @@ pub async fn create_archive<T, F>(
feature_flags: Flags,
callback: F,
catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
- options: PxarCreateOptions,
+ mut options: PxarCreateOptions,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
) -> Result<(), Error>
where
@@ -200,6 +295,22 @@ where
)?);
}
+ let (appendix_start, prev_cat_parent) = if let Some(ref mut prev_ref) = options.previous_ref {
+ let entry = prev_ref
+ .catalog
+ .lookup_recursive(prev_ref.archive_name.as_bytes())?;
+ let parent = match entry.attr {
+ DirEntryAttribute::Archive { .. } => Some(entry),
+ _ => None,
+ };
+ let appendix_start = prev_ref
+ .catalog
+ .appendix_offset(prev_ref.archive_name.as_bytes())?;
+ (appendix_start, parent)
+ } else {
+ (None, None)
+ };
+
let mut archiver = Archiver {
feature_flags,
fs_feature_flags,
@@ -216,13 +327,42 @@ where
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
previous_ref: options.previous_ref,
forced_boundaries,
- inject: (0, Vec::new()),
+ appendix: Appendix::new(),
+ prev_appendix: appendix_start,
};
+ if let Some(ref mut catalog) = archiver.catalog {
+ if let Some(ref archive_name) = options.archive_name {
+ catalog
+ .lock()
+ .unwrap()
+ .start_archive(archive_name.as_c_str())?;
+ }
+ }
+
archiver
- .archive_dir_contents(&mut encoder, source_dir, true)
+ .archive_dir_contents(&mut encoder, source_dir, prev_cat_parent.as_ref(), true)
.await?;
- encoder.finish().await?;
+
+ if archiver.appendix.is_empty() {
+ encoder.finish(None).await?;
+ if let Some(ref mut catalog) = archiver.catalog {
+ if options.archive_name.is_some() {
+ catalog.lock().unwrap().end_archive(None)?;
+ }
+ }
+ } else {
+ let (appendix_offset, appendix_size) = archiver.add_appendix(&mut encoder).await?;
+ encoder
+ .finish(Some((appendix_offset, appendix_size)))
+ .await?;
+ if let Some(catalog) = archiver.catalog {
+ if options.archive_name.is_some() {
+ catalog.lock().unwrap().end_archive(Some(appendix_offset))?;
+ }
+ }
+ }
+
Ok(())
}
@@ -251,6 +391,7 @@ impl Archiver {
&'a mut self,
encoder: &'a mut Encoder<'b, T>,
mut dir: Dir,
+ prev_cat_parent: Option<&'a DirEntry>,
is_root: bool,
) -> BoxFuture<'a, Result<(), Error>> {
async move {
@@ -273,6 +414,13 @@ impl Archiver {
let old_path = std::mem::take(&mut self.path);
+ let mut prev_catalog_dir_entries = Vec::new();
+ if let Some(ref mut prev_ref) = self.previous_ref {
+ if let Some(ref parent) = prev_cat_parent {
+ prev_catalog_dir_entries = prev_ref.catalog.read_dir(parent)?;
+ }
+ }
+
for file_entry in file_list {
let file_name = file_entry.name.to_bytes();
@@ -284,9 +432,15 @@ impl Archiver {
(self.callback)(&file_entry.path)?;
self.path = file_entry.path;
- self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
- .await
- .map_err(|err| self.wrap_err(err))?;
+ self.add_entry(
+ encoder,
+ dir_fd,
+ &file_entry.name,
+ &file_entry.stat,
+ &prev_catalog_dir_entries,
+ )
+ .await
+ .map_err(|err| self.wrap_err(err))?;
}
self.path = old_path;
self.entry_counter = entry_counter;
@@ -533,12 +687,119 @@ impl Archiver {
Ok(())
}
+ async fn add_appendix<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(pxar::encoder::AppendixStartOffset, AppendixRefOffset), Error> {
+ let total = self.appendix.total;
+ let appendix_offset = encoder.add_appendix(total).await?;
+ let mut boundaries = self.forced_boundaries.lock().unwrap();
+ let mut position = unsafe { encoder.position_add(0) };
+
+ // Inject reused chunks in patches of 128 to not exceed upload post req size limit
+ for chunks in self.appendix.chunks.chunks(128) {
+ let size = chunks
+ .iter()
+ .fold(0, |sum, chunk| sum + chunk.size() as usize);
+ let inject_chunks = InjectChunks {
+ boundary: position,
+ chunks: chunks.to_vec(),
+ size,
+ };
+ boundaries.push_back(inject_chunks);
+ position = unsafe { encoder.position_add(size as u64) };
+ }
+
+ Ok((appendix_offset, total))
+ }
+
+ async fn reuse_if_metadata_unchanged<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ c_file_name: &CStr,
+ metadata: &Metadata,
+ stat: &FileStat,
+ catalog_entries: &[DirEntry],
+ ) -> Result<bool, Error> {
+ let prev_ref = match self.previous_ref {
+ None => return Ok(false),
+ Some(ref mut prev_ref) => prev_ref,
+ };
+
+ let catalog_entry = catalog_entries
+ .iter()
+ .find(|entry| entry.name == c_file_name.to_bytes());
+
+ let (size, ctime, start_offset) = match catalog_entry {
+ Some(&DirEntry {
+ attr:
+ DirEntryAttribute::File {
+ size,
+ mtime: _,
+ extension: Some(CatalogV2Extension { ctime, file_offset }),
+ },
+ ..
+ }) => (size, ctime, file_offset.raw()),
+ Some(&DirEntry {
+ attr:
+ DirEntryAttribute::AppendixRef {
+ size,
+ mtime: _,
+ ctime,
+ appendix_ref_offset,
+ },
+ ..
+ }) => (
+ size,
+ ctime,
+ self.prev_appendix.unwrap().raw() + appendix_ref_offset.raw(),
+ ),
+ // The entry type found in the catalog is not a regular file,
+ // do not reuse entry but rather encode it again.
+ _ => return Ok(false),
+ };
+
+ let file_size = stat.st_size as u64;
+ if ctime != stat.st_ctime || size != file_size {
+ return Ok(false);
+ }
+
+ // Since ctime is unchanged, use current metadata size to calculate size and thereby
+ // end offset for the file payload in the reference archive.
+ // This is required to find the existing indexes to be included in the new index file.
+ let mut bytes = pxar::encoder::encoded_size(c_file_name, metadata).await?;
+ // payload header size
+ bytes += std::mem::size_of::<pxar::format::Header>() as u64;
+
+ let end_offset = start_offset + bytes + file_size;
+ let (indices, start_padding, end_padding) =
+ prev_ref.index.indices(start_offset, end_offset)?;
+
+ let appendix_ref_offset = self.appendix.insert(indices, start_padding);
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+ self.add_appendix_ref(encoder, file_name, appendix_ref_offset, file_size)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_appendix_ref(
+ c_file_name,
+ file_size,
+ stat.st_mtime,
+ stat.st_ctime,
+ appendix_ref_offset,
+ )?;
+ }
+
+ Ok(true)
+ }
+
async fn add_entry<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
parent: RawFd,
c_file_name: &CStr,
stat: &FileStat,
+ prev_cat_entries: &[DirEntry],
) -> Result<(), Error> {
use pxar::format::mode;
@@ -599,6 +860,20 @@ impl Archiver {
}
let file_size = stat.st_size as u64;
+ if file_size > MAX_FILE_SIZE
+ && self
+ .reuse_if_metadata_unchanged(
+ encoder,
+ c_file_name,
+ &metadata,
+ stat,
+ prev_cat_entries,
+ )
+ .await?
+ {
+ return Ok(());
+ }
+
let offset: LinkOffset = self
.add_regular_file(encoder, fd, file_name, &metadata, file_size)
.await?;
@@ -626,8 +901,15 @@ impl Archiver {
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().start_directory(c_file_name)?;
}
+ let prev_cat_entry = prev_cat_entries.iter().find(|entry| match entry {
+ DirEntry {
+ name,
+ attr: DirEntryAttribute::Directory { .. },
+ } => name == c_file_name.to_bytes(),
+ _ => false,
+ });
let result = self
- .add_directory(encoder, dir, c_file_name, &metadata, stat)
+ .add_directory(encoder, dir, c_file_name, &metadata, stat, prev_cat_entry)
.await;
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().end_directory()?;
@@ -684,6 +966,7 @@ impl Archiver {
dir_name: &CStr,
metadata: &Metadata,
stat: &FileStat,
+ prev_cat_parent: Option<&DirEntry>,
) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
@@ -710,14 +993,15 @@ impl Archiver {
log::info!("skipping mount point: {:?}", self.path);
Ok(())
} else {
- self.archive_dir_contents(&mut encoder, dir, false).await
+ self.archive_dir_contents(&mut encoder, dir, prev_cat_parent, false)
+ .await
};
self.fs_magic = old_fs_magic;
self.fs_feature_flags = old_fs_feature_flags;
self.current_st_dev = old_st_dev;
- encoder.finish().await?;
+ encoder.finish(None).await?;
result
}
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 442616b7..a087b43b 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -48,7 +48,7 @@ use pbs_client::{
FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
BACKUP_SOURCE_SCHEMA,
};
-use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
+use pbs_datastore::catalog::{CatalogReader, CatalogWriter};
use pbs_datastore::chunk_store::verify_chunk_size;
use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -1005,10 +1005,6 @@ async fn create_backup(
let catalog = catalog.as_ref().unwrap();
log_file("directory", &filename, &target);
- catalog
- .lock()
- .unwrap()
- .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
let pxar_options = pbs_client::pxar::PxarCreateOptions {
device_set: devices.clone(),
@@ -1016,6 +1012,7 @@ async fn create_backup(
entries_max: entries_max as usize,
skip_lost_and_found,
previous_ref: None,
+ archive_name: Some(std::ffi::CString::new(target.as_str())?),
};
let upload_options = UploadOptions {
@@ -1036,7 +1033,6 @@ async fn create_backup(
)
.await?;
manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
- catalog.lock().unwrap().end_directory()?;
}
(BackupSpecificationType::IMAGE, false) => {
log_file("image", &filename, &target);
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index 5eff673e..734fa976 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -359,6 +359,7 @@ fn extract(
patterns,
skip_lost_and_found: false,
previous_ref: None,
+ archive_name: None,
};
let pxar_writer = TokioWriter::new(writer);
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index c019f3e4..6d365115 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -371,6 +371,7 @@ async fn create_archive(
patterns,
skip_lost_and_found: false,
previous_ref: None,
+ archive_name: None,
};
let writer = pxar::encoder::sync::StandardWriter::new(writer);
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index abeefbd3..686eddc1 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -91,7 +91,7 @@ pub fn tape_write_snapshot_archive<'a>(
proxmox_lang::io_bail!("file '{}' shrunk while reading", filename);
}
}
- encoder.finish()?;
+ encoder.finish(None)?;
Ok(())
});
--
2.39.2
More information about the pbs-devel
mailing list