[pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore

Lukas Wagner l.wagner at proxmox.com
Thu Sep 21 13:10:34 CEST 2023


Some of the changed lines seem to be overly long (>100 chars), I've 
noted some of the places, but probably did not catch everything.

On 8/8/23 14:13, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   Cargo.toml                      |   2 +
>   pbs-datastore/src/read_chunk.rs |   2 +-
>   src/api2/config/remote.rs       |  14 +-
>   src/api2/pull.rs                |  31 +-
>   src/server/pull.rs              | 943 +++++++++++++++++++-------------
>   5 files changed, 570 insertions(+), 422 deletions(-)
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 4d34f8a1..74cb68e0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
>   
>   # regular crates
>   anyhow = "1.0"
> +async-trait = "0.1.56"
>   apt-pkg-native = "0.3.2"
>   base64 = "0.13"
>   bitflags = "1.2.1"
> @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
>   
>   [dependencies]
>   anyhow.workspace = true
> +async-trait.workspace = true
>   apt-pkg-native.workspace = true
>   base64.workspace = true
>   bitflags.workspace = true
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>       fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>   }
>   
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
>       /// Returns the encoded chunk data
>       fn read_raw_chunk<'a>(
>           &'a self,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 307cf3cd..2511c5d5 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
>       Ok(())
>   }
>   
> -/// Helper to get client for remote.cfg entry
> -pub async fn remote_client(
> +/// Helper to get client for remote.cfg entry without login, just config
> +pub fn remote_client_config(
>       remote: &Remote,
>       limit: Option<RateLimitConfig>,
>   ) -> Result<HttpClient, Error> {
> @@ -320,6 +320,16 @@ pub async fn remote_client(
>           &remote.config.auth_id,
>           options,
>       )?;
> +
> +    Ok(client)
> +}
> +
> +/// Helper to get client for remote.cfg entry
> +pub async fn remote_client(
> +    remote: &Remote,
> +    limit: Option<RateLimitConfig>,
> +) -> Result<HttpClient, Error> {
> +    let client = remote_client_config(remote, limit)?;
>       let _auth_info = client
>           .login() // make sure we can auth
>           .await
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 664ecce5..e36a5b14 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>   
>   use pbs_api_types::{
>       Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> -    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> +    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
>       PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
>       TRANSFER_LAST_SCHEMA,
>   };
> @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>           PullParameters::new(
>               &sync_job.store,
>               sync_job.ns.clone().unwrap_or_default(),
> -            sync_job.remote.as_deref().unwrap_or("local"),
> +            sync_job.remote.as_deref(),
>               &sync_job.remote_store,
>               sync_job.remote_ns.clone().unwrap_or_default(),
>               sync_job
> @@ -124,7 +124,6 @@ pub fn do_sync_job(
>   
>               let worker_future = async move {
>                   let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
>   
>                   task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                   if let Some(event_str) = schedule {
> @@ -138,24 +137,7 @@ pub fn do_sync_job(
>                       sync_job.remote_store,
>                   );
>   
> -                if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> -                } else {
> -                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> -                        if target_ns.path().starts_with(source_ns.path())
> -                            && sync_job.store == sync_job.remote_store
> -                            && sync_job.max_depth.map_or(true, |sync_depth| {
> -                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> -                        }) {
> -                            task_log!(
> -                                worker,
> -                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> -                            );
> -                        }
> -                    } else {
> -                        pull_store(&worker, &client, pull_params).await?;
> -                    }
> -                }
> +                pull_store(&worker, pull_params).await?;
>   
>                   task_log!(worker, "sync job '{}' end", &job_id);
>   
> @@ -284,7 +266,7 @@ async fn pull(
>       let pull_params = PullParameters::new(
>           &store,
>           ns,
> -        remote.as_deref().unwrap_or("local"),
> +        remote.as_deref(),
>           &remote_store,
>           remote_ns.unwrap_or_default(),
>           auth_id.clone(),
> @@ -294,7 +276,6 @@ async fn pull(
>           limit,
>           transfer_last,
>       )?;
> -    let client = pull_params.client().await?;
>   
>       // fixme: set to_stdout to false?
>       // FIXME: add namespace to worker id?
> @@ -312,7 +293,7 @@ async fn pull(
>                   remote_store,
>               );
>   
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>               (select! {
>                   success = pull_future.fuse() => success,
>                   abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> @@ -327,4 +308,4 @@ async fn pull(
>       Ok(upid_str)
>   }
>   
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>   //! Sync datastore from remote server
>   
>   use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
>   use std::sync::atomic::{AtomicUsize, Ordering};
>   use std::sync::{Arc, Mutex};
>   use std::time::SystemTime;
>   
>   use anyhow::{bail, format_err, Error};
>   use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>   use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>   
>   use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> -    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> +    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> +    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>       PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>   };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>   use pbs_datastore::data_blob::DataBlob;
>   use pbs_datastore::dynamic_index::DynamicIndexReader;
>   use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
>   use pbs_datastore::manifest::{
>       archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>   };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
>   use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
>   use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>   
>   use crate::backup::{check_ns_modification_privs, check_ns_privs};
>   use crate::tools::parallel_handler::ParallelHandler;
>   
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> +struct RemoteReader {
> +    backup_reader: Arc<BackupReader>,
> +    dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
>       store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
>       ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> +    /// Lists namespaces from the source.
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error>;
> +
> +    /// Lists groups within a specific namespace from the source.
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error>;
> +
> +    /// Lists backup directories for a specific group within a specific namespace from the source.
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error>;
> +    fn get_ns(&self) -> BackupNamespace;
> +    fn print_store_and_ns(&self) -> String;
> +
> +    /// Returns a reader for reading data from a specific backup directory.
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +            vec![self.ns.clone()];
> +        }
> +
> +        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> +        let mut data = json!({});
> +        if let Some(max_depth) = max_depth {
> +            data["max-depth"] = json!(max_depth);
> +        }
> +
> +        if !self.ns.is_root() {
> +            data["parent"] = json!(self.ns);
> +        }
> +        self.client.login().await?;
> +
> +        let mut result = match self.client.get(&path, Some(data)).await {
> +            Ok(res) => res,
> +            Err(err) => match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match code {
> +                    &StatusCode::NOT_FOUND => {
> +                        if self.ns.is_root() && max_depth.is_none() {
> +                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");

These lines exceed our 100 character limit.

> +                            max_depth.replace(0);
> +                        } else {
> +                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                        }
> +
> +                        return Ok(vec![self.ns.clone()]);
> +                    }
> +                    _ => {
> +                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    bail!("Querying namespaces failed - {err}");
> +                }
> +            },
> +        };
> +
> +        let list: Vec<BackupNamespace> =
> +            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +                .iter()
> +                .map(|list_item| list_item.ns.clone())
> +                .collect();
> +
> +        Ok(list)
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        _owner: &Authid,
> +    ) -> Result<Vec<BackupGroup>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> +        let args = if !namespace.is_root() {
> +            Some(json!({ "ns": namespace.clone() }))
> +        } else {
> +            None
> +        };
> +
> +        self.client.login().await?;
> +        let mut result =
> +            self.client.get(&path, args).await.map_err(|err| {
> +                format_err!("Failed to retrieve backup groups from remote - {}", err)
> +            })?;
> +
> +        Ok(
> +            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                .map_err(Error::from)?
> +                .into_iter()
> +                .map(|item| item.backup)
> +                .collect::<Vec<BackupGroup>>(),
> +        )
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        _namespace: &BackupNamespace,
> +        group: &BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupDir>, Error> {
> +        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> +        let mut args = json!({
> +            "backup-type": group.ty,
> +            "backup-id": group.id,
> +        });
> +
> +        if !self.ns.is_root() {
> +            args["ns"] = serde_json::to_value(&self.ns)?;
> +        }
> +
> +        self.client.login().await?;
> +
> +        let mut result = self.client.get(&path, Some(args)).await?;
> +        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +        Ok(snapshot_list
> +            .into_iter()
> +            .filter_map(|item: SnapshotListItem| {
> +                let snapshot = item.backup;
> +                // in-progress backups can't be synced
> +                if item.size.is_none() {
> +                    task_log!(
> +                        worker,
> +                        "skipping snapshot {} - in-progress backup",
> +                        snapshot
> +                    );
> +                    return None;
> +                }
> +
> +                Some(snapshot)
> +            })
> +            .collect::<Vec<BackupDir>>())
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        self.ns.clone()
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        print_store_and_ns(self.repo.store(), &self.ns)
> +    }
> +
> +    async fn reader(
> +        &self,
> +        ns: &BackupNamespace,
> +        dir: &BackupDir,
> +    ) -> Result<Arc<dyn PullReader>, Error> {
> +        let backup_reader =
> +            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> +        Ok(Arc::new(RemoteReader {
> +            backup_reader,
> +            dir: dir.clone(),
> +        }))
> +    }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.

Long line again

> +trait PullReader: Send + Sync {
> +    /// Returns a chunk reader with the specified encryption mode.
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> +    /// Asynchronously loads a file from the source into a local file.
> +    /// `filename` is the name of the file to load from the source.
> +    /// `into` is the path of the local file to load the source file into.
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error>;
> +
> +    /// Tries to download the client log from the source and save it into a local file.
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error>;
> +
> +    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> +    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> +        Arc::new(RemoteChunkReader::new(
> +            self.backup_reader.clone(),
> +            None,
> +            crypt_mode,
> +            HashMap::new(),
> +        ))
> +    }
> +
> +    async fn load_file_into(
> +        &self,
> +        filename: &str,
> +        into: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> +        if let Err(err) = download_result {
> +            match err.downcast_ref::<HttpError>() {
> +                Some(HttpError { code, message }) => match *code {
> +                    StatusCode::NOT_FOUND => {
> +                        task_log!(
> +                            worker,
> +                            "skipping snapshot {} - vanished since start of sync",
> +                            &self.dir,
> +                        );
> +                        return Ok(None);
> +                    }
> +                    _ => {
> +                        bail!("HTTP error {code} - {message}");
> +                    }
> +                },
> +                None => {
> +                    return Err(err);
> +                }
> +            };
> +        };
> +        tmp_file.rewind()?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> +    }
> +
> +    async fn try_download_client_log(
> +        &self,
> +        to_path: &Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        let mut tmp_path = to_path.to_owned();
> +        tmp_path.set_extension("tmp");
> +
> +        let tmpfile = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .read(true)
> +            .open(&tmp_path)?;
> +
> +        // Note: be silent if there is no log - only log successful download
> +        if let Ok(()) = self
> +            .backup_reader
> +            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> +            .await
> +        {
> +            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +            }
> +            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +        }
> +
> +        Ok(())
> +    }
> +
> +    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> +        false
> +    }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> +    /// Where data is pulled from
> +    source: Arc<dyn PullSource>,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>       /// Owner of synced groups (needs to match local owner of pre-existing groups)
>       owner: Authid,
>       /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
>       max_depth: Option<usize>,
>       /// Filters for reducing the pull scope
>       group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
>       /// How many snapshots should be transferred at most (taking the newest N snapshots)
>       transfer_last: Option<usize>,
>   }
>   
>   impl PullParameters {
>       /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>       pub(crate) fn new(
>           store: &str,
>           ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>           remote_store: &str,
>           remote_ns: BackupNamespace,
>           owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
>           limit: RateLimitConfig,
>           transfer_last: Option<usize>,
>       ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>           if let Some(max_depth) = max_depth {
>               ns.check_max_depth(max_depth)?;
>               remote_ns.check_max_depth(max_depth)?;
> -        }
> -
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> +        };
>           let remove_vanished = remove_vanished.unwrap_or(false);
>   
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>   
> -        Ok(Self {
> -            remote,
> -            remote_ns,
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
> +            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> +            Arc::new(RemoteSource {
> +                repo,
> +                ns: remote_ns,
> +                client,
> +            })
> +        } else {
> +            bail!("local sync not implemented yet")
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
>               ns,
> +        };
> +
> +        Ok(Self {
>               source,
> -            store,
> +            target,
>               owner,
>               remove_vanished,
>               max_depth,
>               group_filter,
> -            limit,
>               transfer_last,
>           })
>       }
>   
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)
>       }
>   }
>   
>   async fn pull_index_chunks<I: IndexFile>(
>       worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>       target: Arc<DataStore>,
>       index: I,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
>       Ok(())
>   }
>   
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>   fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>       if size != info.size {
>           bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>   /// Pulls a single file referenced by a manifest.
>   ///
>   /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
>   /// - if archive is an index, pull referenced chunks
>   /// - Rename tmp file into real path
> -async fn pull_single_archive(
> -    worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> -    archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
> +    archive_info: &'a FileInfo,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>   
>       task_log!(worker, "sync archive {}", archive_name);
>   
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    reader
> +        .load_file_into(archive_name, &tmp_path, worker)
> +        .await?;
>   
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>   
>       match archive_type(archive_name)? {
>           ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
>               let (csum, size) = index.compute_csum();
>               verify_archive(archive_info, &csum, size)?;
>   
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");
> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>           }
>           ArchiveType::FixedIndex => {
>               let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
>               let (csum, size) = index.compute_csum();
>               verify_archive(archive_info, &csum, size)?;
>   
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if reader.skip_chunk_sync(snapshot.datastore().name()) {
> +                task_log!(worker, "skipping chunk sync for same datatsore");
> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    reader.chunk_reader(archive_info.crypt_mode),
> +                    snapshot.datastore().clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>           }
>           ArchiveType::Blob => {
> -            tmpfile.seek(SeekFrom::Start(0))?;
> +            tmpfile.rewind()?;
>               let (csum, size) = sha256(&mut tmpfile)?;
>               verify_archive(archive_info, &csum, size)?;
>           }
> @@ -330,33 +616,6 @@ async fn pull_single_archive(
>       Ok(())
>   }
>   
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>   /// Actual implementation of pulling a snapshot.
>   ///
>   /// Pulling a snapshot consists of the following steps:
> @@ -366,10 +625,10 @@ async fn try_client_log_download(
>   /// -- if file already exists, verify contents
>   /// -- if not, pull it from the remote
>   /// - Download log if not already existing
> -async fn pull_snapshot(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +async fn pull_snapshot<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let mut manifest_name = snapshot.full_path();
> @@ -380,32 +639,15 @@ async fn pull_snapshot(
>   
>       let mut tmp_manifest_name = manifest_name.clone();
>       tmp_manifest_name.set_extension("tmp");
> -
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = reader
> +        .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
> +        .await?
> +    {
> +        tmp_manifest_blob = data;
> +    } else {
> +        return Ok(());
> +    }
>   
>       if manifest_name.exists() {
>           let manifest_blob = proxmox_lang::try_block!({
> @@ -422,8 +664,10 @@ async fn pull_snapshot(
>   
>           if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>               if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                reader
> +                    .try_download_client_log(&client_log_name, worker)
> +                    .await?;
> +            };
>               task_log!(worker, "no data changes");
>               let _ = std::fs::remove_file(&tmp_manifest_name);
>               return Ok(()); // nothing changed
> @@ -471,17 +715,9 @@ async fn pull_snapshot(
>               }
>           }
>   
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>           pull_single_archive(
>               worker,
> -            &reader,
> -            &mut chunk_reader,
> +            reader.clone(),
>               snapshot,
>               item,
>               downloaded_chunks.clone(),
> @@ -494,9 +730,10 @@ async fn pull_snapshot(
>       }
>   
>       if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> +        reader
> +            .try_download_client_log(&client_log_name, worker)
> +            .await?;
> +    };
>       snapshot
>           .cleanup_unreferenced_files(&manifest)
>           .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,12 +743,12 @@ async fn pull_snapshot(
>   
>   /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
>   ///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
> -async fn pull_snapshot_from(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +/// The `reader` is configured to read from the source backup directory, while the
> +/// `snapshot` is pointing to the local datastore and target namespace.
> +async fn pull_snapshot_from<'a>(
> +    worker: &'a WorkerTask,
> +    reader: Arc<dyn PullReader + 'a>,
> +    snapshot: &'a pbs_datastore::BackupDir,
>       downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>   ) -> Result<(), Error> {
>       let (_path, is_new, _snap_lock) = snapshot
> @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
>   /// - Sort by snapshot time
>   /// - Get last snapshot timestamp on local datastore
>   /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
>   /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>   /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>   ///
> -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
> +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
>   /// remote when querying snapshots. This allows us to interact with old remotes that don't have
>   /// namespace support yet.
>   ///
> @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
>   /// - local group owner is already checked by pull_store
>   async fn pull_group(
>       worker: &WorkerTask,
> -    client: &HttpClient,
>       params: &PullParameters,
> -    group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
> +    source_namespace: &BackupNamespace,
> +    group: &BackupGroup,
>       progress: &mut StoreProgress,
>   ) -> Result<(), Error> {
> -    task_log!(worker, "sync group {}", group);
> -
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -    let last_sync_time = last_sync.unwrap_or(i64::MIN);
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> -
>       let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
>       let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>   
> -    let total_amount = list.len();
> +    let mut raw_list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?;
> +    raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
> +
> +    let total_amount = raw_list.len();
>   
>       let cutoff = params
>           .transfer_last
>           .map(|count| total_amount.saturating_sub(count))
>           .unwrap_or_default();
>   
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> -
> -        remote_snapshots.insert(snapshot.time);
> +    let target_ns = params.get_target_ns()?;
>   
> -        if last_sync_time > snapshot.time {
> -            already_synced_skip_info.update(snapshot.time);
> -            continue;
> -        } else if already_synced_skip_info.count > 0 {
> -            task_log!(worker, "{}", already_synced_skip_info);
> -            already_synced_skip_info.reset();
> -        }
> -
> -        if pos < cutoff && last_sync_time != snapshot.time {
> -            transfer_last_skip_info.update(snapshot.time);
> -            continue;
> -        } else if transfer_last_skip_info.count > 0 {
> -            task_log!(worker, "{}", transfer_last_skip_info);
> -            transfer_last_skip_info.reset();
> -        }
> -
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync_time = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?
> +        .unwrap_or(i64::MIN);
> +
> +    let list: Vec<BackupDir> = raw_list
> +        .into_iter()
> +        .enumerate()
> +        .filter(|&(pos, ref dir)| {
> +            source_snapshots.insert(dir.time);
> +            if last_sync_time > dir.time {
> +                already_synced_skip_info.update(dir.time);
> +                return false;
> +            } else if already_synced_skip_info.count > 0 {
> +                task_log!(worker, "{}", already_synced_skip_info);
> +                already_synced_skip_info.reset();
> +                return true;
> +            }
>   
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> +            if pos < cutoff && last_sync_time != dir.time {
> +                transfer_last_skip_info.update(dir.time);
> +                return false;
> +            } else if transfer_last_skip_info.count > 0 {
> +                task_log!(worker, "{}", transfer_last_skip_info);
> +                transfer_last_skip_info.reset();
> +            }
> +            true
> +        })
> +        .map(|(_, dir)| dir)
> +        .collect();
>   
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>   
> -        let reader = BackupReader::start(
> -            &new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>   
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>   
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let reader = params
> +            .source
> +            .reader(source_namespace, &from_snapshot)
> +            .await?;
> +        let result =
> +            pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
>   
>           progress.done_snapshots = pos as u64 + 1;
>           task_log!(worker, "percentage done: {}", progress);
> @@ -758,11 +956,14 @@ async fn pull_group(
>       }
>   
>       if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(params.get_target_ns()?, group.clone());
>           let local_list = group.list_backups()?;
>           for info in local_list {
>               let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                   continue;
>               }
>               if snapshot.is_protected() {
> @@ -774,73 +975,23 @@ async fn pull_group(
>                   continue;
>               }
>               task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
> -            params
> -                .store
> -                .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
> +            params.target.store.remove_backup_dir(
> +                &params.get_target_ns()?,
> +                snapshot.as_ref(),
> +                false,
> +            )?;
>           }
>       }
>   
>       Ok(())
>   }
>   
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>   fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>       let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>   
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>               .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>   
>           let name = match ns.components().last() {
> @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>               }
>           };
>   
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>               bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>           }
>           created = true;
>       }
>   
>       check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>           ns,
>           &params.owner,
>           PRIV_DATASTORE_BACKUP,
> @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>   }
>   
>   fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>           .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>   
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>   }
>   
>   fn check_and_remove_vanished_ns(
> @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
>       // clamp like remote does so that we don't list more than we can ever have synced.
>       let max_depth = params
>           .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>   
>       let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>           .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>           .filter(|ns| {
>               let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>               user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>           })
>           .collect();
> @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
>       local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>   
>       for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>               continue;
>           }
>   
> @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
>   /// - access to sub-NS checked here
>   pub(crate) async fn pull_store(
>       worker: &WorkerTask,
> -    client: &HttpClient,
>       mut params: PullParameters,
>   ) -> Result<(), Error> {
>       // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>       let mut errors = false;
>   
>       let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> +    let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
> +        vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
>       } else {
> -        query_namespaces(worker, client, &mut params).await?
> +        params
> +            .source
> +            .list_namespaces(&mut params.max_depth, worker)
> +            .await?
>       };
> +
> +    let ns_layers_to_be_pulled = namespaces
> +        .iter()
> +        .map(BackupNamespace::depth)
> +        .max()
> +        .map_or(0, |v| v - params.source.get_ns().depth());
> +    let target_depth = params.target.ns.depth();
> +
> +    if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
> +        bail!(
> +            "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
> +            ns_layers_to_be_pulled,
> +            target_depth,
> +            MAX_NAMESPACE_DEPTH
> +        );
> +    }
> +
>       errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by_key(|a| a.name_len());
>   
>       let (mut groups, mut snapshots) = (0, 0);
>       let mut synced_ns = HashSet::with_capacity(namespaces.len());
>   
>       for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>   
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>   
>           task_log!(worker, "----");
>           task_log!(
> @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
>               }
>           }
>   
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>               Ok((ns_progress, ns_errors)) => {
>                   errors |= ns_errors;
>   
> @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
>                   task_log!(
>                       worker,
>                       "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                       err,
>                   );
>               }
> @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
>   /// - owner check for vanished groups done here
>   pub(crate) async fn pull_ns(
>       worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,
> +    params: &mut PullParameters,
>   ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
>   
>       let total_count = list.len();
>       list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>           if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>           } else {
>               type_order
>           }
>       });
>   
> -    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
> +    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
>           filters.iter().any(|filter| group.matches(filter))
>       };
>   
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>       let list = if let Some(ref group_filter) = &params.group_filter {
>           let unfiltered_count = list.len();
> -        let list: Vec<pbs_api_types::BackupGroup> = list
> +        let list: Vec<BackupGroup> = list
>               .into_iter()
>               .filter(|group| apply_filters(group, group_filter))
>               .collect();
> @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
>   
>       let mut errors = false;
>   
> -    let mut new_groups = std::collections::HashSet::new();
> +    let mut new_groups = HashSet::new();
>       for group in list.iter() {
>           new_groups.insert(group.clone());
>       }
>   
>       let mut progress = StoreProgress::new(list.len() as u64);
>   
> +    let target_ns = params.get_target_ns()?;
>       for (done, group) in list.into_iter().enumerate() {
>           progress.done_groups = done as u64;
>           progress.done_snapshots = 0;
> @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
>   
>           let (owner, _lock_guard) =
>               match params
> +                .target
>                   .store
>                   .create_locked_backup_group(&target_ns, &group, &params.owner)
>               {
> @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
>                           &group,
>                           err
>                       );
> -                    errors = true; // do not stop here, instead continue
> +                    errors = true;
> +                    // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");
>                       continue;
>                   }
>               };
> @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
>                   owner
>               );
>               errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>           {
>               task_log!(worker, "sync group {} failed - {}", &group, err,);
>               errors = true; // do not stop here, instead continue
> @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
>   
>       if params.remove_vanished {
>           let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                   let local_group = local_group?;
>                   let local_group = local_group.group();
>                   if new_groups.contains(local_group) {
>                       continue;
>                   }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                   if check_backup_owner(&owner, &params.owner).is_err() {
>                       continue;
>                   }
> @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
>                       }
>                   }
>                   task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                       Ok(true) => {}
>                       Ok(false) => {
>                           task_log!(

-- 
- Lukas





More information about the pbs-devel mailing list