[pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Feb 28 12:36:22 CET 2023


On February 23, 2023 1:55 pm, Hannes Laimer wrote:
> ... and rewrite pull logic.

that's a bit terse ;)

there's also general refactoring interleaved with the local pull support, it
would be more easy to review if those two were split into separate patches.

general remarks (also partly repeated below):
- I don't like way of persisting the readers in the parameters, it shouldn't be
needed and isn't a nice structuring of the code
-- instead, at the point where previously a BackupReader is created (the loop
body in pull_group), you can create the equivalent of PullSource but for the
reader, which in turn
-- contains a BackupReader + RemoteChunkReader for the remote case
-- contains a lock guard + LocalChunkReader for the local case
- there's now a mix of helpers that are moved to the source structs and helpers
that are not, it should be one or the other across the board. adapting the
helpers in-place probably makes it easier to tell what changed, if they all have
the &source as parameter it's then trivial to move them to an impl block on
PullSource at the end and move longer parts to the local or remote impl block as
well (possibly as follow-up to reduce the amount of rebasing you have to do)..
- this patch would really benefit from being split into (at least) two patches
-- refactor the source handling with just the remote part (no semantic changes!)
-- add local pull support
- some of the comments above individual functions are wrong, please check them
carefully and adapt where needed
- please test recursive sync behaviour, the current version looks rather broken
to me (but I haven't done any in-depth testing)
- local reading doesn't lock the source, but it should (for remote this is
handled for us by the HTTP2 reader session)

> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>  pbs-api-types/src/datastore.rs  |    2 +-
>  pbs-datastore/src/read_chunk.rs |    2 +-
>  src/api2/pull.rs                |   13 +-
>  src/server/pull.rs              | 1023 +++++++++++++++++++------------
>  4 files changed, 648 insertions(+), 392 deletions(-)
> 
> diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
> index 72e8d1ee..9a692b08 100644
> --- a/pbs-api-types/src/datastore.rs
> +++ b/pbs-api-types/src/datastore.rs
> @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
>  /// Uniquely identify a Backup (relative to data store)
>  ///
>  /// We also call this a backup snaphost.
> -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
> +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]

same as the Sync part below..

>  #[serde(rename_all = "kebab-case")]
>  pub struct BackupDir {
>      /// Backup group.
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>      fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>  }
>  
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {

I guess you need this since you moved the BackupReader into PullParams? this
should not be needed..

>      /// Returns the encoded chunk data
>      fn read_raw_chunk<'a>(
>          &'a self,
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index bb8f6fe1..2966190c 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -121,8 +121,8 @@ pub fn do_sync_job(
>              let sync_job2 = sync_job.clone();
>  
>              let worker_future = async move {
> -                let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
> +                let mut pull_params = PullParameters::try_from(&sync_job)?;
> +                pull_params.init_source(sync_job.limit).await?;

see below

>  
>                  task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                  if let Some(event_str) = schedule {
> @@ -137,7 +137,7 @@ pub fn do_sync_job(
>                  );
>  
>                  if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> +                    pull_store(&worker, pull_params).await?;
>                  } else {
>                      match (sync_job.ns, sync_job.remote_ns) {
>                          (Some(target_ns), Some(source_ns))
> @@ -280,7 +280,7 @@ async fn pull(
>          delete,
>      )?;
>  
> -    let pull_params = PullParameters::new(
> +    let mut pull_params = PullParameters::new(
>          &store,
>          ns,
>          remote.as_deref(),
> @@ -290,9 +290,8 @@ async fn pull(
>          remove_vanished,
>          max_depth,
>          group_filter,
> -        limit,

see below
>      )?;
> -    let client = pull_params.client().await?;
> +    pull_params.init_source(limit).await?;

see below
>  
>      // fixme: set to_stdout to false?
>      // FIXME: add namespace to worker id?
> @@ -310,7 +309,7 @@ async fn pull(
>                  remote_store,
>              );
>  
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>              (select! {
>                  success = pull_future.fuse() => success,
>                  abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..d3be39da 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
> +use std::path::PathBuf;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
>  use proxmox_sys::task_log;
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> +    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
>      Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> +use pbs_datastore::{
> +    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
> +};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
> -use crate::backup::{check_ns_modification_privs, check_ns_privs};
> +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> -    store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
> -    ns: BackupNamespace,
> +    /// Where data is pulled from
> +    source: PullSource,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
> +}
> +
> +pub(crate) enum PullSource {
> +    Remote(RemoteSource),
> +    Local(LocalSource),
> +}
> +
> +pub(crate) struct PullTarget {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct LocalSource {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    remote: Remote,
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: Option<HttpClient>,
> +    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,

this is not needed all - you never pull multiple snapshots in parallel, so
there is no need to store readers for each pulled snapshot.

there also shouldn't be state stored in PullParams at all, it should always be
possible to pass that down from caller to next layer..

> +}
> +
> +impl PullSource {
> +    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        match self {
> +            PullSource::Remote(source) => {
> +                source.client.replace(
> +                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
> +                );
> +            }
> +            PullSource::Local(_) => {}
> +        };
> +        Ok(())
> +    }

drop this, and keep the old client helper but move it to PullSource::Remote

> +
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,

this checks permissions (on the remote system)

> +            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
> +                source.store.clone(),
> +                source.ns.clone(),
> +                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
> +            )?
> +            .collect(),

this doesn't, but should

> +        }
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());
> +
> +                let args = if !namespace.is_root() {
> +                    Some(json!({ "ns": namespace.clone() }))
> +                } else {
> +                    None
> +                };
> +
> +                let client = source.get_client()?;
> +                client.login().await?;

this could just initialize a new client using the helper, instead of retrieving
a stored one that might have an expired ticket..

> +                let mut result = client.get(&path, args).await.map_err(|err| {
> +                    format_err!("Failed to retrieve backup groups from remote - {}", err)
> +                })?;
> +
> +                Ok(
> +                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                        .map_err(|e| Error::from(e))?

could just be .map_err(Error::from), but

> +                        .into_iter()
> +                        .map(|item| item.backup)
> +                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
> +                )

the whole thing is probably easier to read when done like this:

  let list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
  let list: Vec<pbs_api_types::BackupGroup> =
    list.into_iter().map(|group| group.backup).collect();

  Ok(list)

> +            }
> +            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
> +                &source.store,
> +                namespace.clone(),
> +                MAX_NAMESPACE_DEPTH,
> +                None,
> +                None,
> +                Some(owner),
> +            )?
> +            .filter_map(Result::ok)
> +            .map(|backup_group| backup_group.group().clone())
> +            .collect::<Vec<pbs_api_types::BackupGroup>>()),

this has two issues:
- it recurses over namespaces, while it should only list groups in the current
namespace without recursion
- it doesn't set the expected privs, so this will potentially list too few or
too many groups as well, even within a namespace where the user is supposed to
have access
-- too little if the user has PRIV_DATASTORE_READ and should be able to read
groups owned by other users/tokens
-- too many if the user only has PRIV_DATASTORE_AUDIT, since owned groups are
then readable despite missing PRIV_DATASTORE_BACKUP

> +        }
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &pbs_api_types::BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!(
> +                    "api2/json/admin/datastore/{}/snapshots",
> +                    source.repo.store()
> +                );
> +
> +                let mut args = json!({
> +                    "backup-type": group.ty,
> +                    "backup-id": group.id,
> +                });
> +
> +                if !source.ns.is_root() {
> +                    args["ns"] = serde_json::to_value(&source.ns)?;
> +                }
> +
> +                let client = source.get_client()?;
> +                client.login().await?;

this could probably also get a fresh client..

> +
> +                let mut result = client.get(&path, Some(args)).await?;
> +                let snapshot_list: Vec<SnapshotListItem> =
> +                    serde_json::from_value(result["data"].take())?;
> +                Ok(snapshot_list
> +                    .into_iter()
> +                    .filter_map(|item: SnapshotListItem| {
> +                        let snapshot = item.backup;
> +                        // in-progress backups can't be synced
> +                        if item.size.is_none() {
> +                            task_log!(
> +                                worker,
> +                                "skipping snapshot {} - in-progress backup",
> +                                snapshot
> +                            );
> +                            return None;
> +                        }
> +
> +                        Some(snapshot)
> +                    })
> +                    .collect::<Vec<BackupDir>>())
> +            }
> +            PullSource::Local(source) => Ok(source
> +                .store
> +                .backup_group(namespace.clone(), group.clone())
> +                .iter_snapshots()?
> +                .filter_map(Result::ok)

this hides errors when iterating..

> +                .map(|snapshot| snapshot.dir().to_owned())

but doesn't skip "in-progress" snapshots like the remote version does..

> +                .collect::<Vec<BackupDir>>()),
> +        }
> +    }
> +
> +    /// Load file from source namespace and BackupDir into file
> +    async fn load_file_into(
> +        &mut self,
> +        namespace: &BackupNamespace,
> +        snapshot: &pbs_api_types::BackupDir,
> +        filename: &str,
> +        into: &PathBuf,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        match self {
> +            PullSource::Remote(ref mut source) => {
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    reader.clone()
> +                } else {
> +                    let backup_reader = BackupReader::start(
> +                        client,
> +                        None,
> +                        source.repo.store(),
> +                        namespace,
> +                        snapshot,
> +                        true,
> +                    )
> +                    .await?;
> +                    source
> +                        .backup_reader
> +                        .insert(snapshot.clone(), backup_reader.clone());
> +                    backup_reader
> +                };
> +
> +                let download_result = reader.download(filename, &mut tmp_file).await;
> +
> +                if let Err(err) = download_result {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",

this was previously only logged when the manifest went missing.. when
downloading the indices later on this shouldn't happen since we have an active
reader session open which holds a log, so nobody should be able to pull out the
snapshot under us and it should be a hard error..

> +                                    snapshot,
> +                                );
> +                                return Ok(None);
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
> +                };
> +            }
> +            PullSource::Local(source) => {
> +                let dir = source
> +                    .store
> +                    .backup_dir(namespace.clone(), snapshot.clone())?;
> +                let mut from_path = dir.full_path();
> +                from_path.push(filename);
> +                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
> +            }
> +        }
> +
> +        tmp_file.seek(SeekFrom::Start(0))?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())

hides errors!!

note that this seeks to the beginning and loads the blob (see [0] markers)

> +    }
> +
> +    // Note: The client.log.blob is uploaded after the backup, so it is
> +    // not mentioned in the manifest.
> +    async fn try_download_client_log(
> +        &self,
> +        from_snapshot: &pbs_api_types::BackupDir,

there is only a single snapshot involved

> +        to_path: &std::path::Path,

and a single path, so not sure whether we really need the from/to prefix?

> +        worker: &WorkerTask,

worker should come first..

> +    ) -> Result<(), Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let reader = source
> +                    .backup_reader
> +                    .get(from_snapshot)
> +                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
> +                let mut tmp_path = to_path.to_owned();
> +                tmp_path.set_extension("tmp");
> +
> +                let tmpfile = std::fs::OpenOptions::new()
> +                    .write(true)
> +                    .create(true)
> +                    .read(true)
> +                    .open(&tmp_path)?;
> +
> +                // Note: be silent if there is no log - only log successful download
> +                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> +                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +                    }
> +                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +                }
> +
> +                Ok(())

this should probably return the tmpfile, so that we only open it once..

> +            }
> +            PullSource::Local(_) => Ok(()),

local sync should also copy the log? also, similar to other parts - this is
mostly the old code refactored, but also slightly changed. it would have been
nice to have the "refactor" part first (with only a single match arm), and the
add local part second.

> +        }
> +    }
> +
> +    fn get_chunk_reader(
> +        &self,
> +        snapshot: &pbs_api_types::BackupDir,
> +        crypt_mode: CryptMode,
> +    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
> +        Ok(match &self {
> +            PullSource::Remote(source) => {
> +                if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    Arc::new(RemoteChunkReader::new(
> +                        reader.clone(),
> +                        None,
> +                        crypt_mode,
> +                        HashMap::new(),
> +                    ))
> +                } else {
> +                    bail!("No initialized BackupReader!")
> +                }
> +            }
> +            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
> +                source.store.clone(),
> +                None,
> +                crypt_mode,
> +            )),
> +        })
> +    }

shouldn't be needed - the reader should be passed down on its own.. a helper for
the first time the source needs to be converted to a reader might still be a
good idea, but without persisting inside the source..

> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        match &self {
> +            PullSource::Remote(source) => source.ns.clone(),
> +            PullSource::Local(source) => source.ns.clone(),
> +        }
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        match &self {
> +            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
> +            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
> +        }
> +    }
> +}
> +
> +impl RemoteSource {
> +    fn get_client(&self) -> Result<&HttpClient, Error> {
> +        if let Some(client) = &self.client {
> +            Ok(client)
> +        } else {
> +            bail!("RemoteSource not initialized")
> +        }
> +    }

should instead return a fresh client..

>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
>          remove_vanished: Option<bool>,
>          max_depth: Option<usize>,
>          group_filter: Option<Vec<GroupFilter>>,
> -        limit: RateLimitConfig,

should either move to RemoteSource, or be implemented for both variants and stay here..

>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> +        };
> +        let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let source: PullSource = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        let remove_vanished = remove_vanished.unwrap_or(false);
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +            PullSource::Remote(RemoteSource {
> +                remote,
> +                repo,
> +                ns: remote_ns.clone(),
> +                client: None,
> +                backup_reader: HashMap::new(),
> +            })
> +        } else {
> +            PullSource::Local(LocalSource {
> +                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
> +                ns: remote_ns,
> +            })
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> +            ns,
> +        };
>  
>          Ok(Self {
> -            remote,
> -            remote_ns,
> -            ns,
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        self.source.init(limit).await
> +    }

see other related comments

> +
> +    pub(crate) fn skip_chunk_sync(&self) -> bool {
> +        match &self.source {
> +            PullSource::Local(source) => source.store.name() == self.target.store.name(),
> +            PullSource::Remote(_) => false,
> +        }
> +    }
> +
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)

this doesn't do the right thing, see the two call sites..

>      }
>  }
>  
> +async fn list_remote_namespaces(
> +    source: &RemoteSource,
> +    max_depth: &mut Option<usize>,
> +    worker: &WorkerTask,

worker usually comes first if passed as argument.. but it's only passed in for
the two log statements, which are actually possible to handle at the call site
as well (there's a check there for whether max_depth was modified).

> +) -> Result<Vec<BackupNamespace>, Error> {
> +    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +        vec![source.ns.clone()];

missing 'return' I think? also changes the check for some reason, although they
are semantically the same I am not sure it's worth the churn in an already quite
crowded patch (series).

> +    }
> +
> +    let path = format!(
> +        "api2/json/admin/datastore/{}/namespace",
> +        source.repo.store()
> +    );
> +    let mut data = json!({});
> +    if let Some(max_depth) = max_depth {
> +        data["max-depth"] = json!(max_depth);
> +    }
> +
> +    if !source.ns.is_root() {
> +        data["parent"] = json!(source.ns);
> +    }
> +
> +    let client = source.get_client()?;
> +    client.login().await?;
> +
> +    let mut result = match client.get(&path, Some(data)).await {
> +        Ok(res) => res,
> +        Err(err) => match err.downcast_ref::<HttpError>() {
> +            Some(HttpError { code, message }) => match code {
> +                &StatusCode::NOT_FOUND => {
> +                    if source.ns.is_root() && max_depth.is_none() {
> +                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                        task_log!(worker, "Either make backwards-compat mode
explicit (max-depth == 0) or upgrade remote system.");

see above, this could be logged at the call site to avoid passing in worker at
all. or it could remain here, but then please put the worker first in the
argument list ;)

> +                        max_depth.replace(0);
> +                    } else {
> +                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                    }
> +
> +                    return Ok(vec![source.ns.clone()]);
> +                }
> +                _ => {
> +                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                }
> +            },
> +            None => {
> +                bail!("Querying namespaces failed - {err}");
> +            }
> +        },
> +    };
> +
> +    let list: Vec<pbs_api_types::BackupNamespace> =
> +        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +            .iter()
> +            .map(|list_item| list_item.ns.clone())
> +            .collect();
> +
> +    Ok(list)
> +}

this is one of the examples that would have really benefited from splitting this
into a "no-change refactor PullParams with RemoteSource" patch and an "add local
pull support" patch.. this is basically the old query_namespaces with parameter
adjustements, but then *also* other changes. now because the location also
completely changed, I have to manually diff the two to see what actually changed
(or if semantic changes are hidden in the noise).

> +
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> +/// - Load archive file into tmp file
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
>  async fn pull_single_archive(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    from_namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> -    let mut path = snapshot.full_path();
> +    let mut path = to_snapshot.full_path();
>      path.push(archive_name);
>  
>      let mut tmp_path = path.clone();
> @@ -273,13 +636,18 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    params
> +        .source
> +        .load_file_into(
> +            from_namespace,
> +            from_snapshot,
> +            archive_name,
> +            &tmp_path,
> +            worker,
> +        )
> +        .await?;

[0] this returns the blob, but it's not used..

>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;

couldn't load_file_into just return the open file?

>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +657,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

typo, also I asked whether it wouldn't make sense to check that the chunks are
there when reviewing v1? it's basically "only" a series of 'stat' calls.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -305,14 +679,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

same as above for dynamic indices..

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;

[0] so here we *again* seek to the start, which load_file_into already does for
us *if* we skip the blob loading there..

> @@ -326,33 +706,6 @@ async fn pull_single_archive(
>      Ok(())
>  }
>  
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>  /// Actual implementation of pulling a snapshot.
>  ///
>  /// Pulling a snapshot consists of the following steps:
> @@ -364,44 +717,37 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let mut manifest_name = snapshot.full_path();
> +    let mut manifest_name = to_snapshot.full_path();
>      manifest_name.push(MANIFEST_BLOB_NAME);
>  
> -    let mut client_log_name = snapshot.full_path();
> +    let mut client_log_name = to_snapshot.full_path();
>      client_log_name.push(CLIENT_LOG_BLOB_NAME);
>  
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");
>  
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = params
> +        .source
> +        .load_file_into(
> +            namespace,
> +            from_snapshot,
> +            MANIFEST_BLOB_NAME,
> +            &tmp_manifest_name,
> +            worker,
> +        )
> +        .await?
> +    {
> +        tmp_manifest_blob = data;

[0] so this is the only part that actually uses the parsed blob, it might make
sense to only do that parsing here..

> +    } else {
> +        return Ok(());

even further hides the wrong error handling from load_file_into..

> +    }
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -418,8 +764,11 @@ async fn pull_snapshot(
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>              if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                params
> +                    .source
> +                    .try_download_client_log(from_snapshot, &client_log_name, worker)
> +                    .await?;
> +            };
>              task_log!(worker, "no data changes");
>              let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> @@ -429,7 +778,7 @@ async fn pull_snapshot(
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>  
>      for item in manifest.files() {
> -        let mut path = snapshot.full_path();
> +        let mut path = to_snapshot.full_path();
>          path.push(&item.filename);
>  
>          if path.exists() {
> @@ -467,18 +816,12 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>          pull_single_archive(
>              worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
>              item,
>              downloaded_chunks.clone(),
>          )
> @@ -490,10 +833,12 @@ async fn pull_snapshot(
>      }
>  
>      if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> -    snapshot
> +        params
> +            .source
> +            .try_download_client_log(from_snapshot, &client_log_name, worker)
> +            .await?;
> +    };

nit: stray ';'

> +    to_snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
>  
> @@ -501,37 +846,53 @@ async fn pull_snapshot(
>  }
>  
>  /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> -///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.

please at least describe what `namespace` is referring to here..

>  async fn pull_snapshot_from(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let (_path, is_new, _snap_lock) = snapshot
> +    let (_path, is_new, _snap_lock) = to_snapshot
>          .datastore()
> -        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
> +        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
>  
>      if is_new {
> -        task_log!(worker, "sync snapshot {}", snapshot.dir());
> +        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> -            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
> -                snapshot.backup_ns(),
> -                snapshot.as_ref(),
> +        if let Err(err) = pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await
> +        {
> +            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
> +                to_snapshot.backup_ns(),
> +                to_snapshot.as_ref(),
>                  true,
>              ) {
>                  task_log!(worker, "cleanup error - {}", cleanup_err);
>              }
>              return Err(err);
>          }
> -        task_log!(worker, "sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
>      } else {
> -        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> -        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
> +        pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await?;
> +        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
>      }
>  
>      Ok(())
> @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
>  /// - Sort by snapshot time
>  /// - Get last snapshot timestamp on local datastore
>  /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader

should still be done.

>  /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>  /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>  ///
> @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> +    params: &mut PullParameters,

should not be done ;)

> +    source_namespace: &BackupNamespace,

in general: inconsistent naming with regards to source/remote/local and from/to.
it would be good to be consistent at least internally, even if the config/api
parameters are a bit "weird" for backwards compat reasons for now..

>      group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;

this involves three namespaces:
- the remote anchor
- the local anchor
- the current namespace being pulled

> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> +    let target_ns = params.get_target_ns()?;

this is wrong, since it only involves two namespaces (the two anchors).

>  
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?;
>      let mut skip_info = SkipInfo {
>          oldest: i64::MAX,
>          newest: i64::MIN,
>          count: 0,
>      };
>  
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> +    let mut list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?
> +        .into_iter()
> +        .filter(|dir| {
> +            source_snapshots.insert(dir.time);
> +            if let Some(last_sync_time) = last_sync {
> +                if last_sync_time > dir.time {
> +                    skip_info.update(dir.time);
> +                    return false;
> +                }
> +            }
> +            true
> +        })
> +        .collect();
>  
> -        remote_snapshots.insert(snapshot.time);
> +    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
>  
> -        if let Some(last_sync_time) = last_sync {
> -            if last_sync_time > snapshot.time {
> -                skip_info.update(snapshot.time);
> -                continue;
> -            }
> -        }
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> -
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>  
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result = pull_snapshot_from(
> +            worker,
> +            params,
> +            source_namespace,
> +            &from_snapshot,
> +            &to_snapshot,
> +            downloaded_chunks.clone(),
> +        )
> +        .await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -703,11 +1025,14 @@ async fn pull_group(
>      }
>  
>      if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(target_ns.clone(), group.clone());
>          let local_list = group.list_backups()?;
>          for info in local_list {
>              let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                  continue;
>              }
>              if snapshot.is_protected() {
> @@ -720,6 +1045,7 @@ async fn pull_group(
>              }
>              task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
>              params
> +                .target
>                  .store
>                  .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
>          }
> @@ -732,64 +1058,12 @@ async fn pull_group(
>      Ok(())
>  }
>  
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>  fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>      let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>  
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>              .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>  
>          let name = match ns.components().last() {
> @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>              }
>          };
>  
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>              bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>          }
>          created = true;
>      }
>  
>      check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>          ns,
>          &params.owner,
>          PRIV_DATASTORE_BACKUP,
> @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>  }
>  
>  fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>          .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>  
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>  }
>  
>  fn check_and_remove_vanished_ns(
> @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
>      // clamp like remote does so that we don't list more than we can ever have synced.
>      let max_depth = params
>          .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>  
>      let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>          .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>          .filter(|ns| {
>              let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>              user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>          })
>          .collect();
> @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
>      local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>  
>      for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>              continue;
>          }
>  
> @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>      let mut errors = false;
>  
>      let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> -    } else {
> -        query_namespaces(worker, client, &mut params).await?
> -    };
> +    let mut namespaces = params
> +        .source
> +        .list_namespaces(&mut params.max_depth, worker)
> +        .await?;
>      errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
>  
>      let (mut groups, mut snapshots) = (0, 0);
>      let mut synced_ns = HashSet::with_capacity(namespaces.len());
>  
>      for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>  
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>  
>          task_log!(worker, "----");
>          task_log!(
> @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
>              }
>          }
>  
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>              Ok((ns_progress, ns_errors)) => {
>                  errors |= ns_errors;
>  
> @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
>                  task_log!(
>                      worker,
>                      "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                      err,
>                  );
>              }
> @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,
> +    params: &mut PullParameters,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<pbs_api_types::BackupGroup> =
> +        params.source.list_groups(namespace, &params.owner).await?;
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>          if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>          } else {
>              type_order
>          }
> @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
>          filters.iter().any(|filter| group.matches(filter))
>      };
>  
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>      let list = if let Some(ref group_filter) = &params.group_filter {
>          let unfiltered_count = list.len();
>          let list: Vec<pbs_api_types::BackupGroup> = list
> @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
>  
>      let mut progress = StoreProgress::new(list.len() as u64);
>  
> +    let target_ns = params.get_target_ns()?;

this is the wrong namespace.. we need to map the source namespace to the target
namespace anchor (e.g., if we are recursively pulling from /a/b to /a/z and the
namespace currently being pulled ist /a/b/c, then target namespace should be
/a/z/c, not /a/z).

>      for (done, group) in list.into_iter().enumerate() {
>          progress.done_groups = done as u64;
>          progress.done_snapshots = 0;
> @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
>  
>          let (owner, _lock_guard) =
>              match params
> +                .target
>                  .store
>                  .create_locked_backup_group(&target_ns, &group, &params.owner)
>              {
> @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
>                          err
>                      );
>                      errors = true; // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");

any reason this is here? it's already logged two lines above ;)

>                      continue;
>                  }
>              };
> @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
>                  owner
>              );
>              errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>          {
>              task_log!(worker, "sync group {} failed - {}", &group, err,);
>              errors = true; // do not stop here, instead continue
> @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
>  
>      if params.remove_vanished {
>          let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                  let local_group = local_group?;
>                  let local_group = local_group.group();
>                  if new_groups.contains(local_group) {
>                      continue;
>                  }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                  if check_backup_owner(&owner, &params.owner).is_err() {
>                      continue;
>                  }
> @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
>                      }
>                  }
>                  task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                      Ok(true) => {}
>                      Ok(false) => {
>                          task_log!(
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 





More information about the pbs-devel mailing list