[pbs-devel] [PATCH v8 proxmox-backup 46/69] client: implement prepare reference method

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Jun 4 11:24:16 CEST 2024


On May 28, 2024 11:42 am, Christian Ebner wrote:
> Implement a method that prepares the decoder instance to access a
> previous snapshots metadata index and payload index in order to
> pass it to the pxar archiver. The archiver than can utilize these
> to compare the metadata for files to the previous state and gather
> reusable chunks.

I think either here or when downloading the previous manifest would be a
good place to implement a check for changing encryption mode from "none"
to "encrypted", since that invalidates any previous reference that would
otherwise exist? 

currently it is broken and not caught by anything else. i.e., I can
enable encryption from one snapshot to the next while still re-using
unencrypted injected chunks.. the resulting snapshot is then unusable,
since the *reader* actually enforces only reading encrypted chunks if
the manifest says so..

for "regular" backups, this wasn't an issue, since the chunks cannot
collide and be re-used. it still might be an optimization to also
skip "none" -> "encrypted" manifests in general, since they are useless
in that case anyway.

> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - fix incorrectly rebased PxarVariant adaption
> 
> changes since version 6:
> - adapt to PxarVariant pxar interface
> - adapt to `change_detection_mode` now being an api type
> - adapt to pxar -> mpxar mapping helper changes
> 
>  pbs-client/src/pxar/create.rs                 |  67 +++++++++-
>  pbs-client/src/pxar/mod.rs                    |   4 +-
>  proxmox-backup-client/src/main.rs             | 121 +++++++++++++++---
>  .../src/proxmox_restore_daemon/api.rs         |   1 +
>  pxar-bin/src/main.rs                          |   1 +
>  5 files changed, 170 insertions(+), 24 deletions(-)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 7667348d4..678ad768f 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -18,6 +18,8 @@ use nix::sys::stat::{FileStat, Mode};
>  
>  use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
>  use proxmox_sys::error::SysError;
> +use pxar::accessor::aio::{Accessor, Directory};
> +use pxar::accessor::ReadAt;
>  use pxar::encoder::{LinkOffset, SeqWrite};
>  use pxar::{Metadata, PxarVariant};
>  
> @@ -35,7 +37,7 @@ use crate::pxar::tools::assert_single_path_component;
>  use crate::pxar::Flags;
>  
>  /// Pxar options for creating a pxar archive/stream
> -#[derive(Default, Clone)]
> +#[derive(Default)]
>  pub struct PxarCreateOptions {
>      /// Device/mountpoint st_dev numbers that should be included. None for no limitation.
>      pub device_set: Option<HashSet<u64>>,
> @@ -47,6 +49,20 @@ pub struct PxarCreateOptions {
>      pub skip_lost_and_found: bool,
>      /// Skip xattrs of files that return E2BIG error
>      pub skip_e2big_xattr: bool,
> +    /// Reference state for partial backups
> +    pub previous_ref: Option<PxarPrevRef>,
> +}
> +
> +pub type MetadataArchiveReader = Arc<dyn ReadAt + Send + Sync + 'static>;
> +
> +/// Statefull information of previous backups snapshots for partial backups
> +pub struct PxarPrevRef {
> +    /// Reference accessor for metadata comparison
> +    pub accessor: Accessor<MetadataArchiveReader>,
> +    /// Reference index for reusing payload chunks
> +    pub payload_index: DynamicIndexReader,
> +    /// Reference archive name for partial backups
> +    pub archive_name: String,
>  }
>  
>  fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
> @@ -136,6 +152,7 @@ struct Archiver {
>      file_copy_buffer: Vec<u8>,
>      skip_e2big_xattr: bool,
>      forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
> +    previous_payload_index: Option<DynamicIndexReader>,
>  }
>  
>  type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -200,6 +217,15 @@ where
>              MatchType::Exclude,
>          )?);
>      }
> +    let (previous_payload_index, previous_metadata_accessor) =
> +        if let Some(refs) = options.previous_ref {
> +            (
> +                Some(refs.payload_index),
> +                refs.accessor.open_root().await.ok(),
> +            )
> +        } else {
> +            (None, None)
> +        };
>  
>      let mut archiver = Archiver {
>          feature_flags,
> @@ -217,10 +243,11 @@ where
>          file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>          skip_e2big_xattr: options.skip_e2big_xattr,
>          forced_boundaries,
> +        previous_payload_index,
>      };
>  
>      archiver
> -        .archive_dir_contents(&mut encoder, source_dir, true)
> +        .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
>          .await?;
>      encoder.finish().await?;
>      encoder.close().await?;
> @@ -252,6 +279,7 @@ impl Archiver {
>      fn archive_dir_contents<'a, T: SeqWrite + Send>(
>          &'a mut self,
>          encoder: &'a mut Encoder<'_, T>,
> +        mut previous_metadata_accessor: Option<Directory<MetadataArchiveReader>>,
>          mut dir: Dir,
>          is_root: bool,
>      ) -> BoxFuture<'a, Result<(), Error>> {
> @@ -286,9 +314,15 @@ impl Archiver {
>  
>                  (self.callback)(&file_entry.path)?;
>                  self.path = file_entry.path;
> -                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
> -                    .await
> -                    .map_err(|err| self.wrap_err(err))?;
> +                self.add_entry(
> +                    encoder,
> +                    &mut previous_metadata_accessor,
> +                    dir_fd,
> +                    &file_entry.name,
> +                    &file_entry.stat,
> +                )
> +                .await
> +                .map_err(|err| self.wrap_err(err))?;
>              }
>              self.path = old_path;
>              self.entry_counter = entry_counter;
> @@ -536,6 +570,7 @@ impl Archiver {
>      async fn add_entry<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> +        previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
>          parent: RawFd,
>          c_file_name: &CStr,
>          stat: &FileStat,
> @@ -625,7 +660,14 @@ impl Archiver {
>                      catalog.lock().unwrap().start_directory(c_file_name)?;
>                  }
>                  let result = self
> -                    .add_directory(encoder, dir, c_file_name, &metadata, stat)
> +                    .add_directory(
> +                        encoder,
> +                        previous_metadata,
> +                        dir,
> +                        c_file_name,
> +                        &metadata,
> +                        stat,
> +                    )
>                      .await;
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().end_directory()?;
> @@ -678,6 +720,7 @@ impl Archiver {
>      async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> +        previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
>          dir: Dir,
>          dir_name: &CStr,
>          metadata: &Metadata,
> @@ -708,7 +751,17 @@ impl Archiver {
>              log::info!("skipping mount point: {:?}", self.path);
>              Ok(())
>          } else {
> -            self.archive_dir_contents(encoder, dir, false).await
> +            let mut dir_accessor = None;
> +            if let Some(accessor) = previous_metadata_accessor.as_mut() {
> +                if let Some(file_entry) = accessor.lookup(dir_name).await? {
> +                    if file_entry.entry().is_dir() {
> +                        let dir = file_entry.enter_directory().await?;
> +                        dir_accessor = Some(dir);
> +                    }
> +                }
> +            }
> +            self.archive_dir_contents(encoder, dir_accessor, dir, false)
> +                .await
>          };
>  
>          self.fs_magic = old_fs_magic;
> diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
> index b7dcf8362..5248a1956 100644
> --- a/pbs-client/src/pxar/mod.rs
> +++ b/pbs-client/src/pxar/mod.rs
> @@ -56,7 +56,9 @@ pub(crate) mod tools;
>  mod flags;
>  pub use flags::Flags;
>  
> -pub use create::{create_archive, PxarCreateOptions, PxarWriters};
> +pub use create::{
> +    create_archive, MetadataArchiveReader, PxarCreateOptions, PxarPrevRef, PxarWriters,
> +};
>  pub use extract::{
>      create_tar, create_zip, extract_archive, extract_sub_dir, extract_sub_dir_seq, ErrorHandler,
>      OverwriteFlags, PxarExtractContext, PxarExtractOptions,
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index f93f9c851..32e5f9b81 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -21,6 +21,7 @@ use proxmox_router::{cli::*, ApiMethod, RpcEnvironment};
>  use proxmox_schema::api;
>  use proxmox_sys::fs::{file_get_json, image_size, replace_file, CreateOptions};
>  use proxmox_time::{epoch_i64, strftime_local};
> +use pxar::accessor::aio::Accessor;
>  use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
>  
>  use pbs_api_types::{
> @@ -30,7 +31,7 @@ use pbs_api_types::{
>      BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
>  };
>  use pbs_client::catalog_shell::Shell;
> -use pbs_client::pxar::ErrorHandler as PxarErrorHandler;
> +use pbs_client::pxar::{ErrorHandler as PxarErrorHandler, MetadataArchiveReader, PxarPrevRef};
>  use pbs_client::tools::{
>      complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
>      complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
> @@ -43,14 +44,14 @@ use pbs_client::tools::{
>      CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
>  };
>  use pbs_client::{
> -    delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
> -    BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
> -    FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
> +    delete_ticket_info, parse_backup_specification, view_task_result, BackupDetectionMode,
> +    BackupReader, BackupRepository, BackupSpecificationType, BackupStats, BackupWriter,
> +    ChunkStream, FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
>      UploadOptions, BACKUP_SOURCE_SCHEMA,
>  };
>  use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
>  use pbs_datastore::chunk_store::verify_chunk_size;
> -use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
> +use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
>  use pbs_datastore::fixed_index::FixedIndexReader;
>  use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
> @@ -687,6 +688,10 @@ fn spawn_catalog_upload(
>                 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
>                 optional: true,
>             },
> +           "change-detection-mode": {
> +               type: BackupDetectionMode,
> +               optional: true,
> +           },
>             "exclude": {
>                 type: Array,
>                 description: "List of paths or patterns for matching files to exclude.",
> @@ -722,6 +727,7 @@ async fn create_backup(
>      param: Value,
>      all_file_systems: bool,
>      skip_lost_and_found: bool,
> +    change_detection_mode: Option<BackupDetectionMode>,
>      dry_run: bool,
>      skip_e2big_xattr: bool,
>      _info: &ApiMethod,
> @@ -881,6 +887,8 @@ async fn create_backup(
>  
>      let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
>  
> +    let detection_mode = change_detection_mode.unwrap_or_default();
> +
>      let http_client = connect_rate_limited(&repo, rate_limit)?;
>      record_repository(&repo);
>  
> @@ -981,7 +989,7 @@ async fn create_backup(
>          None
>      };
>  
> -    let mut manifest = BackupManifest::new(snapshot);
> +    let mut manifest = BackupManifest::new(snapshot.clone());
>  
>      let mut catalog = None;
>      let mut catalog_result_rx = None;
> @@ -1028,22 +1036,21 @@ async fn create_backup(
>                  manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
>              }
>              (BackupSpecificationType::PXAR, false) => {
> -                let metadata_mode = false; // Until enabled via param
> -
>                  let target_base = if let Some(base) = target_base.strip_suffix(".pxar") {
>                      base.to_string()
>                  } else {
>                      bail!("unexpected suffix in target: {target_base}");
>                  };
>  
> -                let (target, payload_target) = if metadata_mode {
> -                    (
> -                        format!("{target_base}.mpxar.{extension}"),
> -                        Some(format!("{target_base}.ppxar.{extension}")),
> -                    )
> -                } else {
> -                    (target, None)
> -                };
> +                let (target, payload_target) =
> +                    if detection_mode.is_metadata() || detection_mode.is_data() {
> +                        (
> +                            format!("{target_base}.mpxar.{extension}"),
> +                            Some(format!("{target_base}.ppxar.{extension}")),
> +                        )
> +                    } else {
> +                        (target, None)
> +                    };
>  
>                  // start catalog upload on first use
>                  if catalog.is_none() {
> @@ -1060,12 +1067,41 @@ async fn create_backup(
>                      .unwrap()
>                      .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
>  
> +                let mut previous_ref = None;
> +                if detection_mode.is_metadata() {
> +                    if let Some(ref manifest) = previous_manifest {
> +                        // BackupWriter::start created a new snapshot, get the one before
> +                        if let Some(backup_time) = client.previous_backup_time().await? {
> +                            let backup_dir: BackupDir =
> +                                (snapshot.group.clone(), backup_time).into();
> +                            let backup_reader = BackupReader::start(
> +                                &http_client,
> +                                crypt_config.clone(),
> +                                repo.store(),
> +                                &backup_ns,
> +                                &backup_dir,
> +                                true,
> +                            )
> +                            .await?;
> +                            previous_ref = prepare_reference(
> +                                &target,
> +                                manifest.clone(),
> +                                &client,
> +                                backup_reader.clone(),
> +                                crypt_config.clone(),
> +                            )
> +                            .await?
> +                        }
> +                    }
> +                }
> +
>                  let pxar_options = pbs_client::pxar::PxarCreateOptions {
>                      device_set: devices.clone(),
>                      patterns: pattern_list.clone(),
>                      entries_max: entries_max as usize,
>                      skip_lost_and_found,
>                      skip_e2big_xattr,
> +                    previous_ref,
>                  };
>  
>                  let upload_options = UploadOptions {
> @@ -1177,6 +1213,59 @@ async fn create_backup(
>      Ok(Value::Null)
>  }
>  
> +async fn prepare_reference(
> +    target: &str,
> +    manifest: Arc<BackupManifest>,
> +    backup_writer: &BackupWriter,
> +    backup_reader: Arc<BackupReader>,
> +    crypt_config: Option<Arc<CryptConfig>>,
> +) -> Result<Option<PxarPrevRef>, Error> {
> +    let (target, payload_target) = helper::get_pxar_archive_names(target, &manifest);
> +    let payload_target = payload_target.unwrap_or_default();
> +
> +    let metadata_ref_index = if let Ok(index) = backup_reader
> +        .download_dynamic_index(&manifest, &target)
> +        .await
> +    {
> +        index
> +    } else {
> +        log::info!("No previous metadata index, continue without reference");
> +        return Ok(None);
> +    };
> +
> +    if manifest.lookup_file_info(&payload_target).is_err() {
> +        log::info!("No previous payload index found in manifest, continue without reference");
> +        return Ok(None);
> +    }
> +
> +    let known_payload_chunks = Arc::new(Mutex::new(HashSet::new()));
> +    let payload_ref_index = backup_writer
> +        .download_previous_dynamic_index(&payload_target, &manifest, known_payload_chunks)
> +        .await?;
> +
> +    log::info!("Using previous index as metadata reference for '{target}'");
> +
> +    let most_used = metadata_ref_index.find_most_used_chunks(8);
> +    let file_info = manifest.lookup_file_info(&target)?;
> +    let chunk_reader = RemoteChunkReader::new(
> +        backup_reader.clone(),
> +        crypt_config.clone(),
> +        file_info.chunk_crypt_mode(),
> +        most_used,
> +    );
> +    let reader = BufferedDynamicReader::new(metadata_ref_index, chunk_reader);
> +    let archive_size = reader.archive_size();
> +    let reader: MetadataArchiveReader = Arc::new(LocalDynamicReadAt::new(reader));
> +    // only care about the metadata, therefore do not attach payload reader
> +    let accessor = Accessor::new(pxar::PxarVariant::Unified(reader), archive_size).await?;
> +
> +    Ok(Some(pbs_client::pxar::PxarPrevRef {
> +        accessor,
> +        payload_index: payload_ref_index,
> +        archive_name: target,
> +    }))
> +}
> +
>  async fn dump_image<W: Write>(
>      client: Arc<BackupReader>,
>      crypt_config: Option<Arc<CryptConfig>>,
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index f7fbae093..681fa6db9 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -360,6 +360,7 @@ fn extract(
>                          patterns,
>                          skip_lost_and_found: false,
>                          skip_e2big_xattr: false,
> +                        previous_ref: None,
>                      };
>  
>                      let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer));
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index 73e7b92eb..2cf8def53 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -363,6 +363,7 @@ async fn create_archive(
>          patterns,
>          skip_lost_and_found: false,
>          skip_e2big_xattr: false,
> +        previous_ref: None,
>      };
>  
>      let source = PathBuf::from(source);
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list