[pbs-devel] [RFC v2 proxmox-backup 29/36] client: pxar: add previous reference to archiver

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Mar 12 13:12:04 CET 2024


On March 5, 2024 10:26 am, Christian Ebner wrote:
> Read the previous snaphosts manifest and check if a split archive
> with the same name is given. If so, create the accessor instance to
> read the previous archive entries to be able to lookup and compare
> the metata for the entries, allowing to make a decision if the
> entry is reusable or not.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 1:
> - refactor payload target archive name generation
> 
>  pbs-client/src/pxar/create.rs                 | 45 ++++++++++++---
>  proxmox-backup-client/src/main.rs             | 57 +++++++++++++++++--
>  .../src/proxmox_restore_daemon/api.rs         |  1 +
>  pxar-bin/src/main.rs                          |  1 +
>  4 files changed, 92 insertions(+), 12 deletions(-)
> 
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 66bdbce8..7d627079 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -138,7 +138,7 @@ impl ReusedChunks {
>  }
>  
>  /// Pxar options for creating a pxar archive/stream
> -#[derive(Default, Clone)]
> +#[derive(Default)]
>  pub struct PxarCreateOptions {
>      /// Device/mountpoint st_dev numbers that should be included. None for no limitation.
>      pub device_set: Option<HashSet<u64>>,
> @@ -150,6 +150,8 @@ pub struct PxarCreateOptions {
>      pub skip_lost_and_found: bool,
>      /// Skip xattrs of files that return E2BIG error
>      pub skip_e2big_xattr: bool,
> +    /// Reference state for partial backups
> +    pub previous_ref: Option<PxarPrevRef>,

this goes here

>  }
>  
>  /// Statefull information of previous backups snapshots for partial backups
> @@ -249,6 +251,7 @@ struct Archiver {
>      file_copy_buffer: Vec<u8>,
>      skip_e2big_xattr: bool,
>      reused_chunks: ReusedChunks,
> +    previous_payload_index: Option<DynamicIndexReader>,
>      forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,

but this goes here..

couldn't these be combined with the forced_boundaries/.. part into a
single parameter/field?

>  }
>  
> @@ -305,6 +308,14 @@ where
>              MatchType::Exclude,
>          )?);
>      }
> +    let (previous_payload_index, accessor) = if let Some(refs) = options.previous_ref {

"accessor" is a bit broad, maybe sneak in the fact what is accessed ;)

> +        (
> +            Some(refs.payload_index),
> +            refs.accessor.open_root().await.ok(),
> +        )
> +    } else {
> +        (None, None)
> +    };
>  
>      let mut archiver = Archiver {
>          feature_flags,
> @@ -322,11 +333,12 @@ where
>          file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>          skip_e2big_xattr: options.skip_e2big_xattr,
>          reused_chunks: ReusedChunks::new(),
> +        previous_payload_index,
>          forced_boundaries,
>      };
>  
>      archiver
> -        .archive_dir_contents(&mut encoder, source_dir, true)
> +        .archive_dir_contents(&mut encoder, accessor, source_dir, true)
>          .await?;
>      encoder.finish().await?;
>      Ok(())
> @@ -356,6 +368,7 @@ impl Archiver {
>      fn archive_dir_contents<'a, T: SeqWrite + Send>(
>          &'a mut self,
>          encoder: &'a mut Encoder<'_, T>,
> +        mut accessor: Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
>          mut dir: Dir,
>          is_root: bool,
>      ) -> BoxFuture<'a, Result<(), Error>> {
> @@ -390,9 +403,15 @@ impl Archiver {
>  
>                  (self.callback)(&file_entry.path)?;
>                  self.path = file_entry.path;
> -                self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
> -                    .await
> -                    .map_err(|err| self.wrap_err(err))?;
> +                self.add_entry(
> +                    encoder,
> +                    &mut accessor,
> +                    dir_fd,
> +                    &file_entry.name,
> +                    &file_entry.stat,
> +                )
> +                .await
> +                .map_err(|err| self.wrap_err(err))?;
>              }
>              self.path = old_path;
>              self.entry_counter = entry_counter;
> @@ -640,6 +659,7 @@ impl Archiver {
>      async fn add_entry<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> +        accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
>          parent: RawFd,
>          c_file_name: &CStr,
>          stat: &FileStat,
> @@ -729,7 +749,7 @@ impl Archiver {
>                      catalog.lock().unwrap().start_directory(c_file_name)?;
>                  }
>                  let result = self
> -                    .add_directory(encoder, dir, c_file_name, &metadata, stat)
> +                    .add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
>                      .await;
>                  if let Some(ref catalog) = self.catalog {
>                      catalog.lock().unwrap().end_directory()?;
> @@ -782,6 +802,7 @@ impl Archiver {
>      async fn add_directory<T: SeqWrite + Send>(
>          &mut self,
>          encoder: &mut Encoder<'_, T>,
> +        accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
>          dir: Dir,
>          dir_name: &CStr,
>          metadata: &Metadata,
> @@ -812,7 +833,17 @@ impl Archiver {
>              log::info!("skipping mount point: {:?}", self.path);
>              Ok(())
>          } else {
> -            self.archive_dir_contents(encoder, dir, false).await
> +            let mut dir_accessor = None;
> +            if let Some(accessor) = accessor.as_mut() {
> +                if let Some(file_entry) = accessor.lookup(dir_name).await? {
> +                    if file_entry.entry().is_dir() {
> +                        let dir = file_entry.enter_directory().await?;
> +                        dir_accessor = Some(dir);
> +                    }
> +                }
> +            }
> +            self.archive_dir_contents(encoder, dir_accessor, dir, false)
> +                .await
>          };
>  
>          self.fs_magic = old_fs_magic;
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index 8d657c15..7c2c6983 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -44,10 +44,10 @@ use pbs_client::tools::{
>      CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
>  };
>  use pbs_client::{
> -    delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
> -    BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
> -    FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
> -    BACKUP_SOURCE_SCHEMA,
> +    delete_ticket_info, parse_backup_detection_mode_specification, parse_backup_specification,
> +    view_task_result, BackupReader, BackupRepository, BackupSpecificationType, BackupStats,
> +    BackupWriter, ChunkStream, FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader,
> +    UploadOptions, BACKUP_DETECTION_MODE_SPEC, BACKUP_SOURCE_SCHEMA,
>  };
>  use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
>  use pbs_datastore::chunk_store::verify_chunk_size;
> @@ -699,6 +699,10 @@ fn spawn_catalog_upload(
>                 schema: TRAFFIC_CONTROL_BURST_SCHEMA,
>                 optional: true,
>             },
> +           "change-detection-mode": {
> +               schema: BACKUP_DETECTION_MODE_SPEC,
> +               optional: true,
> +           },
>             "exclude": {
>                 type: Array,
>                 description: "List of paths or patterns for matching files to exclude.",
> @@ -893,6 +897,9 @@ async fn create_backup(
>  
>      let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
>  
> +    let detection_mode = param["change-detection-mode"].as_str().unwrap_or("data");
> +    let detection_mode = parse_backup_detection_mode_specification(detection_mode)?;
> +
>      let client = connect_rate_limited(&repo, rate_limit)?;
>      record_repository(&repo);
>  
> @@ -944,6 +951,28 @@ async fn create_backup(
>          }
>      };
>  
> +    let backup_reader = if detection_mode.is_metadata() {
> +        if let Ok(backup_dir) =
> +            api_datastore_latest_snapshot(&client, repo.store(), &backup_ns, snapshot.group.clone())
> +                .await
> +        {
> +            BackupReader::start(
> +                &client,
> +                crypt_config.clone(),
> +                repo.store(),
> +                &backup_ns,
> +                &backup_dir,
> +                true,
> +            )
> +            .await
> +            .ok()
> +        } else {
> +            None
> +        }
> +    } else {
> +        None
> +    };
> +

this reader should be started after the writer (else somebody else might
be -however unlikely- faster and the reader is no longer for the
previous snapshot).

upside - it can then be moved into the download_previous_manifest arm,
if the previous manifest was not downloadable using the writer, or the
key changed, or something else -> we can already skip re-using it based
on those facts.

>      let client = BackupWriter::start(
>          client,
>          crypt_config.clone(),
> @@ -1040,7 +1069,10 @@ async fn create_backup(
>                  manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
>              }
>              (BackupSpecificationType::PXAR, false) => {
> -                let metadata_mode = false; // Until enabled via param
> +                let archives = detection_mode.metadata_archive_names();
> +                let metadata_mode = detection_mode.is_metadata()
> +                    && (archives.contains(&target_base) || archives.is_empty());

I wonder - do we really need such fine-grained control here? wouldn't a
simple per-backup job switch between metadata or not be enough?

> +
>                  let (target, payload_target) = if metadata_mode {
>                      (
>                          format!("{target_base}.meta.{extension}"),
> @@ -1065,12 +1097,27 @@ async fn create_backup(
>                      .unwrap()
>                      .start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
>  
> +                let previous_ref = if metadata_mode {
> +                    prepare_reference(
> +                        &target_base,
> +                        extension,
> +                        previous_manifest.clone(),
> +                        &client,
> +                        backup_reader.clone(),
> +                        crypt_config.clone(),
> +                    )
> +                    .await?
> +                } else {
> +                    None
> +                };
> +
>                  let pxar_options = pbs_client::pxar::PxarCreateOptions {
>                      device_set: devices.clone(),
>                      patterns: pattern_list.clone(),
>                      entries_max: entries_max as usize,
>                      skip_lost_and_found,
>                      skip_e2big_xattr,
> +                    previous_ref,
>                  };
>  
>                  let upload_options = UploadOptions {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index d912734c..449a7e4c 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -355,6 +355,7 @@ fn extract(
>                          patterns,
>                          skip_lost_and_found: false,
>                          skip_e2big_xattr: false,
> +                        previous_ref: None,
>                      };
>  
>                      let pxar_writer = TokioWriter::new(writer);
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index 74ee04f7..f3945801 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -336,6 +336,7 @@ async fn create_archive(
>          patterns,
>          skip_lost_and_found: false,
>          skip_e2big_xattr: false,
> +        previous_ref: None,
>      };
>  
>      let source = PathBuf::from(source);
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




More information about the pbs-devel mailing list