[pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore

Wolfgang Bumiller w.bumiller at proxmox.com
Thu Aug 24 15:09:58 CEST 2023


On Tue, Aug 08, 2023 at 02:13:43PM +0200, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>  Cargo.toml                      |   2 +
>  pbs-datastore/src/read_chunk.rs |   2 +-
>  src/api2/config/remote.rs       |  14 +-
>  src/api2/pull.rs                |  31 +-
>  src/server/pull.rs              | 943 +++++++++++++++++++-------------
>  5 files changed, 570 insertions(+), 422 deletions(-)
> 
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> -    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> +    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> +    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>  use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
>  use crate::backup::{check_ns_modification_privs, check_ns_privs};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> +struct RemoteReader {
> +    backup_reader: Arc<BackupReader>,
> +    dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
>      store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
>      ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> +    /// Lists namespaces from the source.
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error>;
> +
> +    /// Lists groups within a specific namespace from the source.
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error>;
> +
> +    /// Lists backup directories for a specific group within a specific namespace from the source.
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error>;
> +    fn get_ns(&self) -> BackupNamespace;
> +    fn print_store_and_ns(&self) -> String;
> +
> +    /// Returns a reader for reading data from a specific backup directory.
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +            vec![self.ns.clone()];

This (still) does nothing, as mentioned in v2 ;-)

> +        }
> +
> +        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> +        let mut data = json!({});
> +        if let Some(max_depth) = max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
> +
> +        if !self.ns.is_root() {
> +            data["parent"] = json!(self.ns);
> +        }
> +        self.client.login().await?;
> +
> +        let mut result = match self.client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match code {
> +                    &StatusCode::NOT_FOUND => {
> +                        if self.ns.is_root() && max_depth.is_none() {
> +                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                            max_depth.replace(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
> +
> +                        return Ok(vec![self.ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
> +                }
> +            },
> +        };
> +
> +        let list: Vec<BackupNamespace> =
> +            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +                .iter()
> +                .map(|list_item| list_item.ns.clone())

If you're already modifying this, use

    .into_iter()
    .map(|list_item| list_item.ns)

since we don't really need to clone() here
  

> +                .collect();
> +
> +        Ok(list)
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        _owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> +        let args = if !namespace.is_root() {
> +            Some(json!({ "ns": namespace.clone() }))
> +        } else {
> +            None
> +        };
> +
> +        self.client.login().await?;
> +        let mut result =
> +            self.client.get(&path, args).await.map_err(|err| {
> +                format_err!("Failed to retrieve backup groups from remote - {}", err)
> +            })?;
> +
> +        Ok(
> +            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                .map_err(Error::from)?
> +                .into_iter()
> +                .map(|item| item.backup)
> +                .collect::<Vec<BackupGroup>>(),
> +        )
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        _namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
> +
> +        if !self.ns.is_root() {
> +            args["ns"] = serde_json::to_value(&self.ns)?;
> +        }
> +
> +        self.client.login().await?;
> +
> +        let mut result = self.client.get(&path, Some(args)).await?;
> +        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        Ok(snapshot_list
> +            .into_iter()
> +            .filter_map(|item: SnapshotListItem| {
> +                let snapshot = item.backup;
> +                // in-progress backups can't be synced
> +                if item.size.is_none() {
> +                    task_log!(
> +                        worker,
> +                        "skipping snapshot {} - in-progress backup",
> +                        snapshot
> +                    );
> +                    return None;
> +                }
> +
> +                Some(snapshot)
> +            })
> +            .collect::<Vec<BackupDir>>())
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        self.ns.clone()
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        print_store_and_ns(self.repo.store(), &self.ns)
> +    }
> +
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error> {
> +        let backup_reader =
> +            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> +        Ok(Arc::new(RemoteReader {
> +            backup_reader,
> +            dir: dir.clone(),
> +        }))
> +    }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
> +trait PullReader: Send + Sync {
> +    /// Returns a chunk reader with the specified encryption mode.
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> +    /// Asynchronously loads a file from the source into a local file.
> +    /// `filename` is the name of the file to load from the source.
> +    /// `into` is the path of the local file to load the source file into.
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error>;
> +
> +    /// Tries to download the client log from the source and save it into a local file.
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error>;
> +
> +    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> +        Arc::new(RemoteChunkReader::new(
> +            self.backup_reader.clone(),
> +            None,
> +            crypt_mode,
> +            HashMap::new(),
> +        ))
> +    }
> +
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> +        if let Err(err) = download_result {
> +            match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        task_log!(
> +                            worker,
> +                            "skipping snapshot {} - vanished since start of sync",
> +                            &self.dir,
> +                        );
> +                        return Ok(None);
> +                    }
> +                    _ => {
> +                        bail!("HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    return Err(err);
> +                }
> +            };
> +        };
> +        tmp_file.rewind()?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> +    }
> +
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        let mut tmp_path = to_path.to_owned();
> +        tmp_path.set_extension("tmp");
> +
> +        let tmpfile = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .read(true)
> +            .open(&tmp_path)?;
> +
> +        // Note: be silent if there is no log - only log successful download
> +        if let Ok(()) = self
> +            .backup_reader
> +            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> +            .await
> +        {
> +            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +            }
> +            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> +        false
> +    }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> +    /// Where data is pulled from
> +    source: Arc<dyn PullSource>,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
>      /// How many snapshots should be transferred at most (taking the newest N snapshots)
>      transfer_last: Option<usize>,
>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
>          limit: RateLimitConfig,
>          transfer_last: Option<usize>,
>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> -
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> +        };
>          let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        Ok(Self {
> -            remote,
> -            remote_ns,
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> +            Arc::new(RemoteSource {
> +                repo,
> +                ns: remote_ns,
> +                client,
> +            })
> +        } else {
> +            bail!("local sync not implemented yet")
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
>              ns,
> +        };
> +
> +        Ok(Self {
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>              transfer_last,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)

^ This code is still weird, again, as already mentioned in v2

>      }
>  }
>  
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
> -async fn pull_single_archive(
> -    worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> -    archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
> +    archive_info: &'a FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    reader
> +        .load_file_into(archive_name, &tmp_path, worker)
> +        .await?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

The t<->s typo is still there, as mentioned in v2.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

The t<->s typo is still there, as mentioned in v2.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
> -            tmpfile.seek(SeekFrom::Start(0))?;
> +            tmpfile.rewind()?;
>              let (csum, size) = sha256(&mut tmpfile)?;
>              verify_archive(archive_info, &csum, size)?;
>          }






More information about the pbs-devel mailing list