[pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Feb 14 15:33:32 CET 2023


On February 13, 2023 4:45 pm, Hannes Laimer wrote:
> ... since creating a HttpClient(which would be needed
> to reuse existing pull logic) without a remote was not
> possible. This also improves the speed for local
> sync-jobs.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>

a high level remark - for all the parts below where I suggest not going over the
API as root for local pulling, you need to properly filter the results by the
"owner" from PullParameters (or maybe that doesn't make sense for the source
side, and we need to pass in the "local" authid of the "caller" from the API to
replace the "remote" authid we get currently). there is a
src/backup/hierarchy.rs module that might be of help for that ;)

> ---
>  pbs-client/src/backup_reader.rs |   5 +
>  src/api2/admin/datastore.rs     |  10 +
>  src/server/pull.rs              | 499 ++++++++++++++++++++------------
>  3 files changed, 335 insertions(+), 179 deletions(-)
> 
> diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
> index 2cd4dc27..9dacef74 100644
> --- a/pbs-client/src/backup_reader.rs
> +++ b/pbs-client/src/backup_reader.rs
> @@ -20,6 +20,11 @@ use pbs_tools::sha::sha256;
>  
>  use super::{H2Client, HttpClient};
>  
> +pub enum BackupSource {
> +    Remote(Arc<BackupReader>),
> +    Local(pbs_datastore::BackupDir),

the Local variant could also take a SnapshotReader instead of BackupDir, like
pbs-tape does. not sure whether that would simplify/improve code - did you
evaluate it?

this is also only used by server::pull, so likely it should go there.. the name
is also rather generic, something like `PullReader` or even just `Reader`, since
it's a pull-internal enum that doesn't need to be pub anyway.

but see below for some suggestions!

> +}
> +
>  /// Backup Reader
>  pub struct BackupReader {
>      h2: H2Client,
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 8d3a6146..8ad78f29 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -445,6 +445,16 @@ pub async fn list_snapshots(
>      _param: Value,
>      _info: &ApiMethod,
>      rpcenv: &mut dyn RpcEnvironment,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> +    do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await
> +}
> +
> +pub async fn do_list_snapshots(
> +    store: String,
> +    ns: Option<BackupNamespace>,
> +    backup_type: Option<BackupType>,
> +    backup_id: Option<String>,
> +    rpcenv: &mut dyn RpcEnvironment,

this change is not needed (see below)

>  ) -> Result<Vec<SnapshotListItem>, Error> {
>      let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>  
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..81df00c3 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,7 +1,7 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
> @@ -9,6 +9,7 @@ use std::time::SystemTime;
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
>  use pbs_config::CachedUserInfo;
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>  use serde_json::json;
>  
>  use proxmox_router::HttpError;
> @@ -21,7 +22,7 @@ use pbs_api_types::{
>  };
>  
>  use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> +    BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader,
>  };
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
> @@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress};
>  use pbs_tools::sha::sha256;
>  use proxmox_rest_server::WorkerTask;
>  
> @@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler;
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
>      /// Remote that is pulled from
> -    remote: Remote,
> +    remote: Option<Remote>,
>      /// Full specification of remote datastore
>      source: BackupRepository,

might make more sense to refactor this type to properly differentiate between
remote source and local source?

e.g., we could have

pub(crate) struct LocalSource {
    store: Arc<Datastore>,
    ns: BackupNamespace,
}

pub(crate) struct RemoteSource {
    remote: Remote,
    repo: BackupRepository,
    ns: BackupNamespace,
}

pub(crate) enum PullSource {
    Local(LocalSource),
    Remote(RemoteSOurce),
}

pub(crate) struct PullTarget {
    store: Arc<Datastore>,
    ns: BackupNamespace,
}

pub(crate) struct PullParameters {
    source: PullSource,
    target: PullTarget,
    /// Owner of synced groups (needs to match local owner of pre-existing groups)
    owner: Authid,
    /// Whether to remove groups which exist locally, but not on the remote end
    remove_vanished: bool,
    /// How many levels of sub-namespaces to pull (0 == no recursion, None == maximum recursion)
    max_depth: Option<usize>,
    /// Filters for reducing the pull scope
    group_filter: Option<Vec<GroupFilter>>,
    /// Rate limits for all transfers from `remote`
    limit: RateLimitConfig,
}

and as a first refactoring step, just do this switch (without RemoteSource ;))
and dropping the Client from signatures.

just a suggestion, feel free to name/structure things differently if that makes
more sense!

>      /// Local store that is pulled into
> @@ -70,7 +71,7 @@ impl PullParameters {
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,

same here - could have a 

fn new_remote(..) and fn new_local(..) to keep the signatures and parameter
names sensible..

>          owner: Authid,
> @@ -86,18 +87,24 @@ impl PullParameters {
>              remote_ns.check_max_depth(max_depth)?;
>          }
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let (remote, source): (Option<Remote>, BackupRepository) = if let Some(remote_str) = remote
> +        {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote = remote_config.lookup::<Remote>("remote", remote_str)?;
> +            let source = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            (Some(remote), source)
> +        } else {
> +            let source = BackupRepository::new(None, None, None, remote_store.to_string());
> +            (None, source)
> +        };
>  
>          let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> -
>          Ok(Self {
>              remote,
>              remote_ns,
> @@ -114,13 +121,17 @@ impl PullParameters {
>  
>      /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
>      pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +        if let Some(remote) = &self.remote {
> +            crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await
> +        } else {
> +            bail!("No remote specified. Do not use a HttpClient for a local sync.")
> +        }
>      }
>  }
>  
> -async fn pull_index_chunks<I: IndexFile>(
> +async fn pull_index_chunks<I: IndexFile, C: AsyncReadChunk + Clone>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: C,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
> -async fn pull_single_archive(
> +async fn pull_single_archive<C: AsyncReadChunk + Clone>(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> +    source: &BackupSource,
> +    chunk_reader: C,
>      snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -279,7 +290,15 @@ async fn pull_single_archive(
>          .read(true)
>          .open(&tmp_path)?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    match source {
> +        BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?,
> +        BackupSource::Local(dir) => {
> +            let mut source_path = dir.full_path();
> +            source_path.push(archive_name);
> +            let data = std::fs::read(source_path)?;
> +            tmpfile.write_all(&data)?;

could use std::fs::copy()

> +        }
> +    };
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +308,23 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            match source {
> +                BackupSource::Local(ref dir)
> +                    if dir.datastore().name() == snapshot.datastore().name() =>
> +                {
> +                    task_log!(worker, "skipping chunk sync for same datatsore");
> +                }

not sure if this is a good idea - I think pulling locally should check that the
referenced chunks are all there and fail otherwise.. we can always re-introduce
skipping here (possibly opt-in?) later on if we think it's a good performance improvement.

> +                _ => {
> +                    pull_index_chunks(
> +                        worker,
> +                        chunk_reader,
> +                        snapshot.datastore().clone(),
> +                        index,
> +                        downloaded_chunks,
> +                    )
> +                    .await?;
> +                }
> +            };
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -304,15 +332,23 @@ async fn pull_single_archive(
>              })?;
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
> -
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            match source {
> +                BackupSource::Local(ref dir)
> +                    if dir.datastore().name() == snapshot.datastore().name() =>
> +                {
> +                    task_log!(worker, "skipping chunk sync for same datatsore");
> +                }

same here

> +                _ => {
> +                    pull_index_chunks(
> +                        worker,
> +                        chunk_reader,
> +                        snapshot.datastore().clone(),
> +                        index,
> +                        downloaded_chunks,
> +                    )
> +                    .await?;
> +                }
> +            };
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;
> @@ -321,6 +357,9 @@ async fn pull_single_archive(
>          }
>      }
>      if let Err(err) = std::fs::rename(&tmp_path, &path) {
> +        task_log!(worker, "sync archive {}", archive_name);
> +        task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str());
> +        task_log!(worker, "path path {:?}", path.as_os_str());

left-over debug logs? ;)

>          bail!("Atomic rename file {:?} failed - {}", path, err);
>      }
>      Ok(())
> @@ -364,7 +403,7 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> +    source: BackupSource,
>      snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> @@ -377,31 +416,45 @@ async fn pull_snapshot(
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");

this whole part here could be refactored so that the new/refreshed manifest is
not stored in a tmpfile at all, but only kept in memory

> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> +    let tmp_manifest_blob = match source {
> +        BackupSource::Remote(ref reader) => {
> +            let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await {
> +                Ok(manifest_file) => manifest_file,
> +                Err(err) => {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",
> +                                    snapshot.dir(),
> +                                );
> +                                return Ok(());
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
>                  }
>              };
> +            DataBlob::load_from_reader(&mut tmp_manifest_file)?
> +        }
> +        BackupSource::Local(ref dir) => {
> +            let data = dir.load_blob(MANIFEST_BLOB_NAME)?;
> +            let mut tmp_manifest_file = std::fs::OpenOptions::new()
> +                .write(true)
> +                .create(true)
> +                .truncate(true)
> +                .read(true)
> +                .open(&tmp_manifest_name)?;
> +            tmp_manifest_file.write_all(data.raw_data())?;
> +            data
>          }
>      };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -417,13 +470,17 @@ async fn pull_snapshot(
>          })?;
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
> -            if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> +            if let BackupSource::Remote(ref reader) = source {
> +                if !client_log_name.exists() {
> +                    try_client_log_download(worker, reader.clone(), &client_log_name).await?;
> +                };

logs should also be fetched for local operations..

> +            }
> +            if tmp_manifest_name.exists() {
> +                let _ = std::fs::remove_file(&tmp_manifest_name);
>              }
>              task_log!(worker, "no data changes");
> -            let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> -        }
> +        };
>      }
>  
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
> @@ -467,32 +524,49 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
> -        pull_single_archive(
> -            worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> -            item,
> -            downloaded_chunks.clone(),
> -        )
> -        .await?;
> +        match source {
> +            BackupSource::Remote(ref reader) => {
> +                let chunk_reader = RemoteChunkReader::new(
> +                    reader.clone(),
> +                    None,
> +                    item.chunk_crypt_mode(),
> +                    HashMap::new(),
> +                );
> +                pull_single_archive(
> +                    worker,
> +                    &source,
> +                    chunk_reader,
> +                    snapshot,
> +                    item,
> +                    downloaded_chunks.clone(),
> +                )
> +                .await?
> +            }
> +            BackupSource::Local(ref dir) => {
> +                let chunk_reader =
> +                    LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode());
> +                pull_single_archive(
> +                    worker,
> +                    &source,
> +                    chunk_reader,
> +                    snapshot,
> +                    item,
> +                    downloaded_chunks.clone(),
> +                )
> +                .await?
> +            }
> +        }
>      }
>  
>      if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
>          bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
>      }

this could become a call to update_manifest that replaces the old one with the
new one instead, if the manifest is kept in-memory only..

>  
> -    if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> +    if let BackupSource::Remote(reader) = source {
> +        if !client_log_name.exists() {
> +            try_client_log_download(worker, reader, &client_log_name).await?;
> +        };

not only relevant for local operations..

>      }
> -
>      snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,7 +580,7 @@ async fn pull_snapshot(
>  /// pointing to the local datastore and target namespace.
>  async fn pull_snapshot_from(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> +    source: BackupSource,
>      snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> @@ -517,7 +591,7 @@ async fn pull_snapshot_from(
>      if is_new {
>          task_log!(worker, "sync snapshot {}", snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> +        if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await {
>              if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
>                  snapshot.backup_ns(),
>                  snapshot.as_ref(),
> @@ -530,7 +604,7 @@ async fn pull_snapshot_from(
>          task_log!(worker, "sync snapshot {} done", snapshot.dir());
>      } else {
>          task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> +        pull_snapshot(worker, source, snapshot, downloaded_chunks).await?;
>          task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
>      }
>  
> @@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      params: &PullParameters,
>      group: &pbs_api_types::BackupGroup,
>      remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
>      let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
> +    let mut list: Vec<SnapshotListItem> = if let Some(client) = client {
> +        let path = format!(
> +            "api2/json/admin/datastore/{}/snapshots",
> +            params.source.store()
> +        );
>  
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
>  
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> +        if !remote_ns.is_root() {
> +            args["ns"] = serde_json::to_value(&remote_ns)?;
> +        }
>  
> -    client.login().await?; // make sure auth is complete
> +        let mut result = client.get(&path, Some(args)).await?;
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));

not needed (and would be wrong, since this would elevate logical privileges to root!)

> +        let source_ns = if remote_ns.is_root() {
> +            None
> +        } else {
> +            Some(remote_ns.clone())
> +        };
> +        crate::api2::admin::datastore::do_list_snapshots(
> +            params.source.store().to_string(),
> +            source_ns,
> +            Some(group.ty),
> +            Some(group.id.clone()),
> +            &mut rpcenv,
> +        )
> +        .await?

this could simply list the local snapshots via the local datastore's
store.backup_group().list_backups(), with a bit of refactoring:
- we need BackupDir and in-progress status for both remote and Local
- BackupDir is returned by both local and remote list
- skipping can be done based on size being None (remote) or manifest being
missing (local)

for example, this could be a Vec<(BackupDir, bool)> or skipping could be done
up front, and this could just become a Vec<BackupDir>

> +    };
> +    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
>  
> -    let fingerprint = client.fingerprint();
> +    if let Some(client) = client {
> +        client.login().await?; // make sure auth is complete
> +    }
>  
>      let last_sync = params.store.last_successful_backup(&target_ns, group)?;
>  
> @@ -646,6 +736,13 @@ async fn pull_group(
>          count: 0,
>      };
>  
> +    let datastore: Option<Arc<DataStore>> = match client {
> +        None => Some(DataStore::lookup_datastore(
> +            params.source.store(),
> +            Some(Operation::Read),
> +        )?),
> +        _ => None,
> +    };
>      for (pos, item) in list.into_iter().enumerate() {
>          let snapshot = item.backup;
>  
> @@ -668,33 +765,47 @@ async fn pull_group(
>              }
>          }
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> +        let backup_source = if let Some(client) = client {

with client possibly dropped, this should rather match on the refactored param
field that tells us whether we are pulling from a local or remote source..

> +            // get updated auth_info (new tickets)
> +            let auth_info = client.login().await?;
> +            let fingerprint = client.fingerprint();
>  
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +            let options = HttpClientOptions::new_non_interactive(
> +                auth_info.ticket.clone(),
> +                fingerprint.clone(),
> +            )
> +            .rate_limit(params.limit.clone());
> +
> +            let new_client = HttpClient::new(
> +                params.source.host(),
> +                params.source.port(),
> +                params.source.auth_id(),
> +                options,
> +            )?;
> +
> +            BackupSource::Remote(
> +                BackupReader::start(
> +                    new_client,
> +                    None,
> +                    params.source.store(),
> +                    &remote_ns,
> +                    &snapshot,
> +                    true,
> +                )
> +                .await?,
> +            )
> +        } else {
> +            if let Some(datastore) = datastore.clone() {

this would then be the second match arm

> +                BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?)
> +            } else {

and this would no longer exist, since the match can be exhaustive ;)

> +                unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier")
> +            }
> +        };

otherwise, this } else { if ... else ... } can be collapsed (there are a few
more clippy lints triggered by this series that might be worthy of cleaning up
as well).

>  
>          let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result =
> +            pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -735,49 +846,64 @@ async fn pull_group(
>  // will modify params if switching to backwards mode for lack of NS support on remote end
>  async fn query_namespaces(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      params: &mut PullParameters,
>  ) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> +    let mut list: Vec<NamespaceListItem> = if let Some(client) = client {

same here again - this should match on params.source being Local or Remote

> +        let path = format!(
> +            "api2/json/admin/datastore/{}/namespace",
> +            params.source.store()
> +        );
> +        let mut data = json!({});
> +        if let Some(max_depth) = params.max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
>  
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> +        if !params.remote_ns.is_root() {
> +            data["parent"] = json!(params.remote_ns);
> +        }
>  
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> +        let mut result = match client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        if params.remote_ns.is_root() && params.max_depth.is_none() {
> +                            task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                            params.max_depth = Some(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
>  
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                        return Ok(vec![params.remote_ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
>                  }
>              },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> +        };
>  
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
> +        let parent_ns = if params.remote_ns.is_root() {
> +            None
> +        } else {
> +            Some(params.remote_ns.clone())
> +        };
> +        crate::api2::admin::namespace::list_namespaces(
> +            params.source.store().to_string(),
> +            parent_ns,
> +            params.max_depth,
> +            &mut rpcenv,
> +        )?

and here as well - instead of pretending to be root at pam and querying over the
API, this could just query directly via pbs_datastore.. we only need a
Vec<BackupNamespace> after all.

> +    };
>      // parents first
>      list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
>  
> @@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,

the client here is redundant anyway, since for the remote case we can always get
it from params..

>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> @@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> +    client: Option<&HttpClient>,
>      params: &PullParameters,
>      source_ns: BackupNamespace,
>      target_ns: BackupNamespace,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> +    let mut list: Vec<GroupListItem> = if let Some(client) = client {

and here we have the same pattern again
- should match on params.source
- we are actually only interested in a Vec<BackupGroup> down below

> +        let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
>  
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> +        let args = if !source_ns.is_root() {
> +            Some(json!({
> +                "ns": source_ns,
> +            }))
> +        } else {
> +            None
> +        };
>  
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> +        let mut result = client
> +            .get(&path, args)
> +            .await
> +            .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
>  
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +        serde_json::from_value(result["data"].take())?
> +    } else {
> +        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
> +        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
> +        let source_ns = if source_ns.is_root() {
> +            None
> +        } else {
> +            Some(source_ns.clone())
> +        };
> +        crate::api2::admin::datastore::list_groups(
> +            params.source.store().to_string(),
> +            source_ns,
> +            &mut rpcenv,
> +        )?

so this doesn't need to query over the API as pretend-root, but can just do a local query of the store+NS.

> +    };
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 





More information about the pbs-devel mailing list