[pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Feb 28 12:25:05 CET 2023


On Thu, Feb 23, 2023 at 01:55:40PM +0100, Hannes Laimer wrote:
> ... and rewrite pull logic.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>  pbs-api-types/src/datastore.rs  |    2 +-
>  pbs-datastore/src/read_chunk.rs |    2 +-
>  src/api2/pull.rs                |   13 +-
>  src/server/pull.rs              | 1023 +++++++++++++++++++------------
>  4 files changed, 648 insertions(+), 392 deletions(-)
> 
> diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
> index 72e8d1ee..9a692b08 100644
> --- a/pbs-api-types/src/datastore.rs
> +++ b/pbs-api-types/src/datastore.rs
> @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
>  /// Uniquely identify a Backup (relative to data store)
>  ///
>  /// We also call this a backup snaphost.
> -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
> +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
>  #[serde(rename_all = "kebab-case")]
>  pub struct BackupDir {
>      /// Backup group.
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>      fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>  }
>  
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
>      /// Returns the encoded chunk data
>      fn read_raw_chunk<'a>(
>          &'a self,
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index bb8f6fe1..2966190c 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -121,8 +121,8 @@ pub fn do_sync_job(
>              let sync_job2 = sync_job.clone();
>  
>              let worker_future = async move {
> -                let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
> +                let mut pull_params = PullParameters::try_from(&sync_job)?;
> +                pull_params.init_source(sync_job.limit).await?;
>  
>                  task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                  if let Some(event_str) = schedule {
> @@ -137,7 +137,7 @@ pub fn do_sync_job(
>                  );
>  
>                  if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> +                    pull_store(&worker, pull_params).await?;
>                  } else {
>                      match (sync_job.ns, sync_job.remote_ns) {
>                          (Some(target_ns), Some(source_ns))
> @@ -280,7 +280,7 @@ async fn pull(
>          delete,
>      )?;
>  
> -    let pull_params = PullParameters::new(
> +    let mut pull_params = PullParameters::new(
>          &store,
>          ns,
>          remote.as_deref(),
> @@ -290,9 +290,8 @@ async fn pull(
>          remove_vanished,
>          max_depth,
>          group_filter,
> -        limit,
>      )?;
> -    let client = pull_params.client().await?;
> +    pull_params.init_source(limit).await?;
>  
>      // fixme: set to_stdout to false?
>      // FIXME: add namespace to worker id?
> @@ -310,7 +309,7 @@ async fn pull(
>                  remote_store,
>              );
>  
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>              (select! {
>                  success = pull_future.fuse() => success,
>                  abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..d3be39da 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
> +use std::path::PathBuf;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
>  use proxmox_sys::task_log;
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> +    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
>      Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> +use pbs_datastore::{
> +    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
> +};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
> -use crate::backup::{check_ns_modification_privs, check_ns_privs};
> +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> -    store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
> -    ns: BackupNamespace,
> +    /// Where data is pulled from
> +    source: PullSource,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
> +}
> +
> +pub(crate) enum PullSource {
> +    Remote(RemoteSource),
> +    Local(LocalSource),
> +}
> +
> +pub(crate) struct PullTarget {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct LocalSource {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    remote: Remote,
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: Option<HttpClient>,
> +    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,
> +}
> +
> +impl PullSource {
> +    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        match self {
> +            PullSource::Remote(source) => {
> +                source.client.replace(
> +                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
> +                );
> +            }
> +            PullSource::Local(_) => {}
> +        };
> +        Ok(())
> +    }
> +
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {

I'd very much prefer if all these functions that mainly `match` on
`self` factored into impl blocks for `RemoteSource` and `LocalSource`
and just forward the call.

Including this one for consistency, even if the size-wise it seems
excessive.

> +        match &self {
> +            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,
> +            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
> +                source.store.clone(),
> +                source.ns.clone(),
> +                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
> +            )?
> +            .collect(),
> +        }
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {

But here and below the indentation and code start to get bigger and
it'll just be easier to maintain these separately.
This is basically like a common API surface for "sources".

> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());

(Note for future work: we should really factor calls like this into a
`PbsClient` within the pbs-client crate. A mere "HttpClient" really
isn't something I'd look for in a `pbs-client` crate and this (and most
code in `proxmox-backup-client`) should actually just be a call to
`client.list_groups(store, namespace).await` (or equivalent) in the
future)

> +
> +                let args = if !namespace.is_root() {
> +                    Some(json!({ "ns": namespace.clone() }))
> +                } else {
> +                    None
> +                };
> +
> +                let client = source.get_client()?;
> +                client.login().await?;
> +                let mut result = client.get(&path, args).await.map_err(|err| {
> +                    format_err!("Failed to retrieve backup groups from remote - {}", err)
> +                })?;
> +
> +                Ok(
> +                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                        .map_err(|e| Error::from(e))?
> +                        .into_iter()
> +                        .map(|item| item.backup)
> +                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
> +                )
> +            }
> +            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
> +                &source.store,
> +                namespace.clone(),
> +                MAX_NAMESPACE_DEPTH,
> +                None,
> +                None,
> +                Some(owner),
> +            )?
> +            .filter_map(Result::ok)

^ Should we not log errors (or fail)?

> +            .map(|backup_group| backup_group.group().clone())
> +            .collect::<Vec<pbs_api_types::BackupGroup>>()),
> +        }
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &pbs_api_types::BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!(
> +                    "api2/json/admin/datastore/{}/snapshots",
> +                    source.repo.store()
> +                );
> +
> +                let mut args = json!({
> +                    "backup-type": group.ty,
> +                    "backup-id": group.id,
> +                });
> +
> +                if !source.ns.is_root() {
> +                    args["ns"] = serde_json::to_value(&source.ns)?;
> +                }
> +
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let mut result = client.get(&path, Some(args)).await?;
> +                let snapshot_list: Vec<SnapshotListItem> =
> +                    serde_json::from_value(result["data"].take())?;
> +                Ok(snapshot_list
> +                    .into_iter()
> +                    .filter_map(|item: SnapshotListItem| {
> +                        let snapshot = item.backup;
> +                        // in-progress backups can't be synced
> +                        if item.size.is_none() {
> +                            task_log!(
> +                                worker,
> +                                "skipping snapshot {} - in-progress backup",
> +                                snapshot
> +                            );
> +                            return None;
> +                        }
> +
> +                        Some(snapshot)
> +                    })
> +                    .collect::<Vec<BackupDir>>())
> +            }
> +            PullSource::Local(source) => Ok(source
> +                .store
> +                .backup_group(namespace.clone(), group.clone())
> +                .iter_snapshots()?
> +                .filter_map(Result::ok)

^ Should we not log errors (or fail)?

> +                .map(|snapshot| snapshot.dir().to_owned())
> +                .collect::<Vec<BackupDir>>()),
> +        }
> +    }
> +
> +    /// Load file from source namespace and BackupDir into file
> +    async fn load_file_into(
> +        &mut self,
> +        namespace: &BackupNamespace,
> +        snapshot: &pbs_api_types::BackupDir,
> +        filename: &str,
> +        into: &PathBuf,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        match self {
> +            PullSource::Remote(ref mut source) => {
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    reader.clone()
> +                } else {
> +                    let backup_reader = BackupReader::start(
> +                        client,
> +                        None,
> +                        source.repo.store(),
> +                        namespace,
> +                        snapshot,
> +                        true,
> +                    )
> +                    .await?;
> +                    source
> +                        .backup_reader
> +                        .insert(snapshot.clone(), backup_reader.clone());
> +                    backup_reader
> +                };
> +
> +                let download_result = reader.download(filename, &mut tmp_file).await;
> +
> +                if let Err(err) = download_result {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",
> +                                    snapshot,
> +                                );
> +                                return Ok(None);
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
> +                };
> +            }
> +            PullSource::Local(source) => {
> +                let dir = source
> +                    .store
> +                    .backup_dir(namespace.clone(), snapshot.clone())?;
> +                let mut from_path = dir.full_path();
> +                from_path.push(filename);
> +                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;

opening and using `std::io::copy()` might be nicer, after all, if most
of this code is about loading a file, why not just have it return a
`Vec<u8>`?
In fact, maybe that would be a nicer interface after all?
Given that we're creating unwritten tmpfiles and leaving them empty on
error, making it the caller's responsibility to clean it up, but not to
create it. (Not too bad for internal code, but IMO still nicer to avoid
this...)

(Another possibility would be to just make a hardlink here and skip the
creation on top altogether.)

> +            }
> +        }
> +
> +        tmp_file.seek(SeekFrom::Start(0))?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())

You're ignoring an error here. I suppose that's because you now use this
twice and one case doesn't actually use or care about the result from
this function and didn't call `DataBlob::load_from_reader` on the
contents either.
I think it makes more sense to return the `tmp_file` and have the caller
decide whether to use `DataBlob::load_from_reader` on it and then not
discard the error there.

> +    }
> +
> +    // Note: The client.log.blob is uploaded after the backup, so it is
> +    // not mentioned in the manifest.
> +    async fn try_download_client_log(
> +        &self,
> +        from_snapshot: &pbs_api_types::BackupDir,
> +        to_path: &std::path::Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let reader = source
> +                    .backup_reader
> +                    .get(from_snapshot)
> +                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
> +                let mut tmp_path = to_path.to_owned();
> +                tmp_path.set_extension("tmp");
> +
> +                let tmpfile = std::fs::OpenOptions::new()
> +                    .write(true)
> +                    .create(true)
> +                    .read(true)
> +                    .open(&tmp_path)?;
> +
> +                // Note: be silent if there is no log - only log successful download
> +                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> +                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +                    }
> +                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +                }
> +
> +                Ok(())
> +            }
> +            PullSource::Local(_) => Ok(()),
> +        }
> +    }
> +
> +    fn get_chunk_reader(
> +        &self,
> +        snapshot: &pbs_api_types::BackupDir,
> +        crypt_mode: CryptMode,
> +    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
> +        Ok(match &self {
> +            PullSource::Remote(source) => {
> +                if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    Arc::new(RemoteChunkReader::new(
> +                        reader.clone(),
> +                        None,
> +                        crypt_mode,
> +                        HashMap::new(),
> +                    ))
> +                } else {
> +                    bail!("No initialized BackupReader!")
> +                }
> +            }
> +            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
> +                source.store.clone(),
> +                None,
> +                crypt_mode,
> +            )),
> +        })
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        match &self {
> +            PullSource::Remote(source) => source.ns.clone(),
> +            PullSource::Local(source) => source.ns.clone(),
> +        }
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        match &self {
> +            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
> +            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
> +        }
> +    }
> +}
> +
> +impl RemoteSource {
> +    fn get_client(&self) -> Result<&HttpClient, Error> {
> +        if let Some(client) = &self.client {
> +            Ok(client)
> +        } else {
> +            bail!("RemoteSource not initialized")
> +        }
> +    }
>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
>          remove_vanished: Option<bool>,
>          max_depth: Option<usize>,
>          group_filter: Option<Vec<GroupFilter>>,
> -        limit: RateLimitConfig,
>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> +        };
> +        let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let source: PullSource = if let Some(remote) = remote {

Since I'm already suggesting moving some code into dedicated impls
blocks, it would make sense here as well so this becomes

    let source = match remote {
        Some(remote) => PullSource::Remote(RemoteSource::new(remote)),
        None => PullSource::Local(LocalSource::new(remote_store, remote_ns)),
    };


> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        let remove_vanished = remove_vanished.unwrap_or(false);
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +            PullSource::Remote(RemoteSource {
> +                remote,
> +                repo,
> +                ns: remote_ns.clone(),
> +                client: None,
> +                backup_reader: HashMap::new(),
> +            })
> +        } else {
> +            PullSource::Local(LocalSource {
> +                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
> +                ns: remote_ns,
> +            })
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> +            ns,
> +        };
>  
>          Ok(Self {
> -            remote,
> -            remote_ns,
> -            ns,
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        self.source.init(limit).await
> +    }
> +
> +    pub(crate) fn skip_chunk_sync(&self) -> bool {

^ Minor nit: sounds a bit like an action, maybe `should_..` or just
`is_same_datastore()`?

> +        match &self.source {
> +            PullSource::Local(source) => source.store.name() == self.target.store.name(),
> +            PullSource::Remote(_) => false,
> +        }
> +    }
> +
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();

Taking `/some/source`

> +        source_ns.map_prefix(&source_ns, &self.target.ns)

and replacing its `/some/source` by `/other/thing` should always yield
`/other/thing`, no?

>      }
>  }
>  
> +async fn list_remote_namespaces(
> +    source: &RemoteSource,
> +    max_depth: &mut Option<usize>,
> +    worker: &WorkerTask,
> +) -> Result<Vec<BackupNamespace>, Error> {
> +    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +        vec![source.ns.clone()];

^ Missing a `return` here maybe?

> +    }
> +
> +    let path = format!(
> +        "api2/json/admin/datastore/{}/namespace",
> +        source.repo.store()
> +    );
> +    let mut data = json!({});
> +    if let Some(max_depth) = max_depth {
> +        data["max-depth"] = json!(max_depth);
> +    }
> +
> +    if !source.ns.is_root() {
> +        data["parent"] = json!(source.ns);
> +    }
> +
> +    let client = source.get_client()?;
> +    client.login().await?;
> +
> +    let mut result = match client.get(&path, Some(data)).await {
> +        Ok(res) => res,
> +        Err(err) => match err.downcast_ref::<HttpError>() {
> +            Some(HttpError { code, message }) => match code {
> +                &StatusCode::NOT_FOUND => {
> +                    if source.ns.is_root() && max_depth.is_none() {
> +                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                        max_depth.replace(0);
> +                    } else {
> +                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                    }
> +
> +                    return Ok(vec![source.ns.clone()]);
> +                }
> +                _ => {
> +                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                }
> +            },
> +            None => {
> +                bail!("Querying namespaces failed - {err}");
> +            }
> +        },
> +    };
> +
> +    let list: Vec<pbs_api_types::BackupNamespace> =

^ BackupNamespace is in scope.

> +        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +            .iter()
> +            .map(|list_item| list_item.ns.clone())
> +            .collect();
> +
> +    Ok(list)
> +}
> +
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> +/// - Load archive file into tmp file
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
>  async fn pull_single_archive(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    from_namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> -    let mut path = snapshot.full_path();
> +    let mut path = to_snapshot.full_path();
>      path.push(archive_name);
>  
>      let mut tmp_path = path.clone();
> @@ -273,13 +636,18 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    params
> +        .source
> +        .load_file_into(
> +            from_namespace,
> +            from_snapshot,
> +            archive_name,
> +            &tmp_path,
> +            worker,
> +        )
> +        .await?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +657,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

^ typo: data{ts}ore

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -305,14 +679,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

^ typo: data{ts}ore

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;
> @@ -326,33 +706,6 @@ async fn pull_single_archive(
>      Ok(())
>  }
>  
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>  /// Actual implementation of pulling a snapshot.
>  ///
>  /// Pulling a snapshot consists of the following steps:
> @@ -364,44 +717,37 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let mut manifest_name = snapshot.full_path();
> +    let mut manifest_name = to_snapshot.full_path();
>      manifest_name.push(MANIFEST_BLOB_NAME);
>  
> -    let mut client_log_name = snapshot.full_path();
> +    let mut client_log_name = to_snapshot.full_path();
>      client_log_name.push(CLIENT_LOG_BLOB_NAME);
>  
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");
>  
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = params
> +        .source
> +        .load_file_into(
> +            namespace,
> +            from_snapshot,
> +            MANIFEST_BLOB_NAME,
> +            &tmp_manifest_name,
> +            worker,
> +        )
> +        .await?
> +    {
> +        tmp_manifest_blob = data;
> +    } else {
> +        return Ok(());
> +    }
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -418,8 +764,11 @@ async fn pull_snapshot(
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>              if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                params
> +                    .source
> +                    .try_download_client_log(from_snapshot, &client_log_name, worker)
> +                    .await?;
> +            };
>              task_log!(worker, "no data changes");
>              let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> @@ -429,7 +778,7 @@ async fn pull_snapshot(
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>  
>      for item in manifest.files() {
> -        let mut path = snapshot.full_path();
> +        let mut path = to_snapshot.full_path();
>          path.push(&item.filename);
>  
>          if path.exists() {
> @@ -467,18 +816,12 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>          pull_single_archive(
>              worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
>              item,
>              downloaded_chunks.clone(),
>          )
> @@ -490,10 +833,12 @@ async fn pull_snapshot(
>      }
>  
>      if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> -    snapshot
> +        params
> +            .source
> +            .try_download_client_log(from_snapshot, &client_log_name, worker)
> +            .await?;
> +    };
> +    to_snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
>  
> @@ -501,37 +846,53 @@ async fn pull_snapshot(
>  }
>  
>  /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> -///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
>  async fn pull_snapshot_from(

Maybe add a `do_` prefix to the other one and drop the `_from` here,
since we have both from & to here now, so this name doesn't make sense
anymore.

>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let (_path, is_new, _snap_lock) = snapshot
> +    let (_path, is_new, _snap_lock) = to_snapshot
>          .datastore()
> -        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
> +        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
>  
>      if is_new {
> -        task_log!(worker, "sync snapshot {}", snapshot.dir());
> +        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> -            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
> -                snapshot.backup_ns(),
> -                snapshot.as_ref(),
> +        if let Err(err) = pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await
> +        {
> +            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
> +                to_snapshot.backup_ns(),
> +                to_snapshot.as_ref(),
>                  true,
>              ) {
>                  task_log!(worker, "cleanup error - {}", cleanup_err);
>              }
>              return Err(err);
>          }
> -        task_log!(worker, "sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
>      } else {
> -        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> -        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
> +        pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await?;
> +        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
>      }
>  
>      Ok(())
> @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
>  /// - Sort by snapshot time
>  /// - Get last snapshot timestamp on local datastore
>  /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
>  /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>  /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>  ///
> @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> +    params: &mut PullParameters,
> +    source_namespace: &BackupNamespace,
>      group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;

So previously we took the passed remote_ns and replaced the *parameter*
remote_ns with the *params* local ns.

> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> +    let target_ns = params.get_target_ns()?;

While this will just give us the *params* target.

Aren't we syncing to different namespaces now?
It looks to me as if multiple namespaces would get merged to one now.

I find the use of `target_ns` and `params.target.ns` a bit confusing
below, too.

>  
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?;
>      let mut skip_info = SkipInfo {
>          oldest: i64::MAX,
>          newest: i64::MIN,
>          count: 0,
>      };
>  
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> +    let mut list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?
> +        .into_iter()
> +        .filter(|dir| {
> +            source_snapshots.insert(dir.time);
> +            if let Some(last_sync_time) = last_sync {
> +                if last_sync_time > dir.time {
> +                    skip_info.update(dir.time);
> +                    return false;
> +                }
> +            }
> +            true
> +        })
> +        .collect();
>  
> -        remote_snapshots.insert(snapshot.time);
> +    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
>  
> -        if let Some(last_sync_time) = last_sync {
> -            if last_sync_time > snapshot.time {
> -                skip_info.update(snapshot.time);
> -                continue;
> -            }
> -        }
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> -
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>  
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;

^ should this not be target_ns?

>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result = pull_snapshot_from(
> +            worker,
> +            params,
> +            source_namespace,
> +            &from_snapshot,
> +            &to_snapshot,
> +            downloaded_chunks.clone(),
> +        )
> +        .await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -703,11 +1025,14 @@ async fn pull_group(
>      }
>  
>      if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(target_ns.clone(), group.clone());
>          let local_list = group.list_backups()?;
>          for info in local_list {
>              let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                  continue;
>              }
>              if snapshot.is_protected() {
> @@ -720,6 +1045,7 @@ async fn pull_group(
>              }
>              task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
>              params
> +                .target
>                  .store
>                  .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
>          }
> @@ -732,64 +1058,12 @@ async fn pull_group(
>      Ok(())
>  }
>  
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>  fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>      let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>  
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>              .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>  
>          let name = match ns.components().last() {
> @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>              }
>          };
>  
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>              bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>          }
>          created = true;
>      }
>  
>      check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>          ns,
>          &params.owner,
>          PRIV_DATASTORE_BACKUP,
> @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>  }
>  
>  fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>          .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>  
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>  }
>  
>  fn check_and_remove_vanished_ns(
> @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
>      // clamp like remote does so that we don't list more than we can ever have synced.
>      let max_depth = params
>          .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>  
>      let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>          .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>          .filter(|ns| {
>              let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>              user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>          })
>          .collect();
> @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
>      local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>  
>      for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>              continue;
>          }
>  
> @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>      let mut errors = false;
>  
>      let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> -    } else {
> -        query_namespaces(worker, client, &mut params).await?
> -    };
> +    let mut namespaces = params
> +        .source
> +        .list_namespaces(&mut params.max_depth, worker)
> +        .await?;
>      errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
>  
>      let (mut groups, mut snapshots) = (0, 0);
>      let mut synced_ns = HashSet::with_capacity(namespaces.len());
>  
>      for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>  
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>  
>          task_log!(worker, "----");
>          task_log!(
> @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
>              }
>          }
>  
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>              Ok((ns_progress, ns_errors)) => {
>                  errors |= ns_errors;
>  
> @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
>                  task_log!(
>                      worker,
>                      "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                      err,
>                  );
>              }
> @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,

^ It's not clear to me which namespace this is supposed to be now?
And don't we need both?
`pull_store` creates the `target_ns` which is a source-to-target
prefix-mapped namespace, but further down we then again just use
`get_target_ns()` which is always the same.

> +    params: &mut PullParameters,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<pbs_api_types::BackupGroup> =
> +        params.source.list_groups(namespace, &params.owner).await?;
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>          if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>          } else {
>              type_order
>          }
> @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
>          filters.iter().any(|filter| group.matches(filter))
>      };
>  
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>      let list = if let Some(ref group_filter) = &params.group_filter {
>          let unfiltered_count = list.len();
>          let list: Vec<pbs_api_types::BackupGroup> = list
> @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
>  
>      let mut progress = StoreProgress::new(list.len() as u64);
>  
> +    let target_ns = params.get_target_ns()?;

^ here

>      for (done, group) in list.into_iter().enumerate() {
>          progress.done_groups = done as u64;
>          progress.done_snapshots = 0;
> @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
>  
>          let (owner, _lock_guard) =
>              match params
> +                .target
>                  .store
>                  .create_locked_backup_group(&target_ns, &group, &params.owner)
>              {
> @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
>                          err
>                      );
>                      errors = true; // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");
>                      continue;
>                  }
>              };
> @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
>                  owner
>              );
>              errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>          {
>              task_log!(worker, "sync group {} failed - {}", &group, err,);
>              errors = true; // do not stop here, instead continue
> @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
>  
>      if params.remove_vanished {
>          let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                  let local_group = local_group?;
>                  let local_group = local_group.group();
>                  if new_groups.contains(local_group) {
>                      continue;
>                  }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                  if check_backup_owner(&owner, &params.owner).is_err() {
>                      continue;
>                  }
> @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
>                      }
>                  }
>                  task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                      Ok(true) => {}
>                      Ok(false) => {
>                          task_log!(
> -- 
> 2.30.2





More information about the pbs-devel mailing list