[pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling

Hannes Laimer h.laimer at proxmox.com
Thu Feb 23 13:55:40 CET 2023


... and rewrite pull logic.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 pbs-api-types/src/datastore.rs  |    2 +-
 pbs-datastore/src/read_chunk.rs |    2 +-
 src/api2/pull.rs                |   13 +-
 src/server/pull.rs              | 1023 +++++++++++++++++++------------
 4 files changed, 648 insertions(+), 392 deletions(-)

diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
index 72e8d1ee..9a692b08 100644
--- a/pbs-api-types/src/datastore.rs
+++ b/pbs-api-types/src/datastore.rs
@@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
 /// Uniquely identify a Backup (relative to data store)
 ///
 /// We also call this a backup snaphost.
-#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
+#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
 #[serde(rename_all = "kebab-case")]
 pub struct BackupDir {
     /// Backup group.
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
 }
 
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
     /// Returns the encoded chunk data
     fn read_raw_chunk<'a>(
         &'a self,
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index bb8f6fe1..2966190c 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -121,8 +121,8 @@ pub fn do_sync_job(
             let sync_job2 = sync_job.clone();
 
             let worker_future = async move {
-                let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
+                let mut pull_params = PullParameters::try_from(&sync_job)?;
+                pull_params.init_source(sync_job.limit).await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -137,7 +137,7 @@ pub fn do_sync_job(
                 );
 
                 if sync_job.remote.is_some() {
-                    pull_store(&worker, &client, pull_params).await?;
+                    pull_store(&worker, pull_params).await?;
                 } else {
                     match (sync_job.ns, sync_job.remote_ns) {
                         (Some(target_ns), Some(source_ns))
@@ -280,7 +280,7 @@ async fn pull(
         delete,
     )?;
 
-    let pull_params = PullParameters::new(
+    let mut pull_params = PullParameters::new(
         &store,
         ns,
         remote.as_deref(),
@@ -290,9 +290,8 @@ async fn pull(
         remove_vanished,
         max_depth,
         group_filter,
-        limit,
     )?;
-    let client = pull_params.client().await?;
+    pull_params.init_source(limit).await?;
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -310,7 +309,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..d3be39da 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
+use std::path::PathBuf;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
 use proxmox_sys::task_log;
+use serde_json::json;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
+    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
     Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-
-use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{
+    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
 use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
 
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
-    /// Remote that is pulled from
-    remote: Remote,
-    /// Full specification of remote datastore
-    source: BackupRepository,
-    /// Local store that is pulled into
-    store: Arc<DataStore>,
-    /// Remote namespace
-    remote_ns: BackupNamespace,
-    /// Local namespace (anchor)
-    ns: BackupNamespace,
+    /// Where data is pulled from
+    source: PullSource,
+    /// Where data should be pulled into
+    target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
     max_depth: Option<usize>,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
-    /// Rate limits for all transfers from `remote`
-    limit: RateLimitConfig,
+}
+
+pub(crate) enum PullSource {
+    Remote(RemoteSource),
+    Local(LocalSource),
+}
+
+pub(crate) struct PullTarget {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
+pub(crate) struct LocalSource {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+    remote: Remote,
+    repo: BackupRepository,
+    ns: BackupNamespace,
+    client: Option<HttpClient>,
+    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,
+}
+
+impl PullSource {
+    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+        match self {
+            PullSource::Remote(source) => {
+                source.client.replace(
+                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
+                );
+            }
+            PullSource::Local(_) => {}
+        };
+        Ok(())
+    }
+
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        match &self {
+            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,
+            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
+                source.store.clone(),
+                source.ns.clone(),
+                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+            )?
+            .collect(),
+        }
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());
+
+                let args = if !namespace.is_root() {
+                    Some(json!({ "ns": namespace.clone() }))
+                } else {
+                    None
+                };
+
+                let client = source.get_client()?;
+                client.login().await?;
+                let mut result = client.get(&path, args).await.map_err(|err| {
+                    format_err!("Failed to retrieve backup groups from remote - {}", err)
+                })?;
+
+                Ok(
+                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                        .map_err(|e| Error::from(e))?
+                        .into_iter()
+                        .map(|item| item.backup)
+                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
+                )
+            }
+            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
+                &source.store,
+                namespace.clone(),
+                MAX_NAMESPACE_DEPTH,
+                None,
+                None,
+                Some(owner),
+            )?
+            .filter_map(Result::ok)
+            .map(|backup_group| backup_group.group().clone())
+            .collect::<Vec<pbs_api_types::BackupGroup>>()),
+        }
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &pbs_api_types::BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let path = format!(
+                    "api2/json/admin/datastore/{}/snapshots",
+                    source.repo.store()
+                );
+
+                let mut args = json!({
+                    "backup-type": group.ty,
+                    "backup-id": group.id,
+                });
+
+                if !source.ns.is_root() {
+                    args["ns"] = serde_json::to_value(&source.ns)?;
+                }
+
+                let client = source.get_client()?;
+                client.login().await?;
+
+                let mut result = client.get(&path, Some(args)).await?;
+                let snapshot_list: Vec<SnapshotListItem> =
+                    serde_json::from_value(result["data"].take())?;
+                Ok(snapshot_list
+                    .into_iter()
+                    .filter_map(|item: SnapshotListItem| {
+                        let snapshot = item.backup;
+                        // in-progress backups can't be synced
+                        if item.size.is_none() {
+                            task_log!(
+                                worker,
+                                "skipping snapshot {} - in-progress backup",
+                                snapshot
+                            );
+                            return None;
+                        }
+
+                        Some(snapshot)
+                    })
+                    .collect::<Vec<BackupDir>>())
+            }
+            PullSource::Local(source) => Ok(source
+                .store
+                .backup_group(namespace.clone(), group.clone())
+                .iter_snapshots()?
+                .filter_map(Result::ok)
+                .map(|snapshot| snapshot.dir().to_owned())
+                .collect::<Vec<BackupDir>>()),
+        }
+    }
+
+    /// Load file from source namespace and BackupDir into file
+    async fn load_file_into(
+        &mut self,
+        namespace: &BackupNamespace,
+        snapshot: &pbs_api_types::BackupDir,
+        filename: &str,
+        into: &PathBuf,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        match self {
+            PullSource::Remote(ref mut source) => {
+                let client = source.get_client()?;
+                client.login().await?;
+
+                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
+                    reader.clone()
+                } else {
+                    let backup_reader = BackupReader::start(
+                        client,
+                        None,
+                        source.repo.store(),
+                        namespace,
+                        snapshot,
+                        true,
+                    )
+                    .await?;
+                    source
+                        .backup_reader
+                        .insert(snapshot.clone(), backup_reader.clone());
+                    backup_reader
+                };
+
+                let download_result = reader.download(filename, &mut tmp_file).await;
+
+                if let Err(err) = download_result {
+                    match err.downcast_ref::<HttpError>() {
+                        Some(HttpError { code, message }) => match *code {
+                            StatusCode::NOT_FOUND => {
+                                task_log!(
+                                    worker,
+                                    "skipping snapshot {} - vanished since start of sync",
+                                    snapshot,
+                                );
+                                return Ok(None);
+                            }
+                            _ => {
+                                bail!("HTTP error {code} - {message}");
+                            }
+                        },
+                        None => {
+                            return Err(err);
+                        }
+                    };
+                };
+            }
+            PullSource::Local(source) => {
+                let dir = source
+                    .store
+                    .backup_dir(namespace.clone(), snapshot.clone())?;
+                let mut from_path = dir.full_path();
+                from_path.push(filename);
+                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+            }
+        }
+
+        tmp_file.seek(SeekFrom::Start(0))?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    // Note: The client.log.blob is uploaded after the backup, so it is
+    // not mentioned in the manifest.
+    async fn try_download_client_log(
+        &self,
+        from_snapshot: &pbs_api_types::BackupDir,
+        to_path: &std::path::Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let reader = source
+                    .backup_reader
+                    .get(from_snapshot)
+                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
+                let mut tmp_path = to_path.to_owned();
+                tmp_path.set_extension("tmp");
+
+                let tmpfile = std::fs::OpenOptions::new()
+                    .write(true)
+                    .create(true)
+                    .read(true)
+                    .open(&tmp_path)?;
+
+                // Note: be silent if there is no log - only log successful download
+                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
+                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
+                    }
+                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+                }
+
+                Ok(())
+            }
+            PullSource::Local(_) => Ok(()),
+        }
+    }
+
+    fn get_chunk_reader(
+        &self,
+        snapshot: &pbs_api_types::BackupDir,
+        crypt_mode: CryptMode,
+    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+        Ok(match &self {
+            PullSource::Remote(source) => {
+                if let Some(reader) = source.backup_reader.get(snapshot) {
+                    Arc::new(RemoteChunkReader::new(
+                        reader.clone(),
+                        None,
+                        crypt_mode,
+                        HashMap::new(),
+                    ))
+                } else {
+                    bail!("No initialized BackupReader!")
+                }
+            }
+            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
+                source.store.clone(),
+                None,
+                crypt_mode,
+            )),
+        })
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        match &self {
+            PullSource::Remote(source) => source.ns.clone(),
+            PullSource::Local(source) => source.ns.clone(),
+        }
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        match &self {
+            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
+            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
+        }
+    }
+}
+
+impl RemoteSource {
+    fn get_client(&self) -> Result<&HttpClient, Error> {
+        if let Some(client) = &self.client {
+            Ok(client)
+        } else {
+            bail!("RemoteSource not initialized")
+        }
+    }
 }
 
 impl PullParameters {
     /// Creates a new instance of `PullParameters`.
-    ///
-    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
-    /// [BackupRepository] with `remote_store`.
-    #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
         remove_vanished: Option<bool>,
         max_depth: Option<usize>,
         group_filter: Option<Vec<GroupFilter>>,
-        limit: RateLimitConfig,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
             remote_ns.check_max_depth(max_depth)?;
-        }
+        };
+        let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
+        let source: PullSource = if let Some(remote) = remote {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        let remove_vanished = remove_vanished.unwrap_or(false);
+            let repo = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
+            PullSource::Remote(RemoteSource {
+                remote,
+                repo,
+                ns: remote_ns.clone(),
+                client: None,
+                backup_reader: HashMap::new(),
+            })
+        } else {
+            PullSource::Local(LocalSource {
+                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+                ns: remote_ns,
+            })
+        };
+        let target = PullTarget {
+            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
+            ns,
+        };
 
         Ok(Self {
-            remote,
-            remote_ns,
-            ns,
             source,
-            store,
+            target,
             owner,
             remove_vanished,
             max_depth,
             group_filter,
-            limit,
         })
     }
 
-    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
-    pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+        self.source.init(limit).await
+    }
+
+    pub(crate) fn skip_chunk_sync(&self) -> bool {
+        match &self.source {
+            PullSource::Local(source) => source.store.name() == self.target.store.name(),
+            PullSource::Remote(_) => false,
+        }
+    }
+
+    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
+        let source_ns = self.source.get_ns();
+        source_ns.map_prefix(&source_ns, &self.target.ns)
     }
 }
 
+async fn list_remote_namespaces(
+    source: &RemoteSource,
+    max_depth: &mut Option<usize>,
+    worker: &WorkerTask,
+) -> Result<Vec<BackupNamespace>, Error> {
+    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+        vec![source.ns.clone()];
+    }
+
+    let path = format!(
+        "api2/json/admin/datastore/{}/namespace",
+        source.repo.store()
+    );
+    let mut data = json!({});
+    if let Some(max_depth) = max_depth {
+        data["max-depth"] = json!(max_depth);
+    }
+
+    if !source.ns.is_root() {
+        data["parent"] = json!(source.ns);
+    }
+
+    let client = source.get_client()?;
+    client.login().await?;
+
+    let mut result = match client.get(&path, Some(data)).await {
+        Ok(res) => res,
+        Err(err) => match err.downcast_ref::<HttpError>() {
+            Some(HttpError { code, message }) => match code {
+                &StatusCode::NOT_FOUND => {
+                    if source.ns.is_root() && max_depth.is_none() {
+                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                        max_depth.replace(0);
+                    } else {
+                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                    }
+
+                    return Ok(vec![source.ns.clone()]);
+                }
+                _ => {
+                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                }
+            },
+            None => {
+                bail!("Querying namespaces failed - {err}");
+            }
+        },
+    };
+
+    let list: Vec<pbs_api_types::BackupNamespace> =
+        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+            .iter()
+            .map(|list_item| list_item.ns.clone())
+            .collect();
+
+    Ok(list)
+}
+
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: Arc<dyn AsyncReadChunk>,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-    let mut tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .read(true)
-        .open(filename)?;
-
-    reader
-        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
-        .await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
         bail!(
@@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// Pulls a single file referenced by a manifest.
 ///
 /// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
+/// - Load archive file into tmp file
 /// - Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
 async fn pull_single_archive(
     worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    from_namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
-    let mut path = snapshot.full_path();
+    let mut path = to_snapshot.full_path();
     path.push(archive_name);
 
     let mut tmp_path = path.clone();
@@ -273,13 +636,18 @@ async fn pull_single_archive(
 
     task_log!(worker, "sync archive {}", archive_name);
 
-    let mut tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
+    params
+        .source
+        .load_file_into(
+            from_namespace,
+            from_snapshot,
+            archive_name,
+            &tmp_path,
+            worker,
+        )
+        .await?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -289,14 +657,20 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if params.skip_chunk_sync() {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    params
+                        .source
+                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+                    params.target.store.clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -305,14 +679,20 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if params.skip_chunk_sync() {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    params
+                        .source
+                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+                    params.target.store.clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::Blob => {
             tmpfile.seek(SeekFrom::Start(0))?;
@@ -326,33 +706,6 @@ async fn pull_single_archive(
     Ok(())
 }
 
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    path: &std::path::Path,
-) -> Result<(), Error> {
-    let mut tmp_path = path.to_owned();
-    tmp_path.set_extension("tmp");
-
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    // Note: be silent if there is no log - only log successful download
-    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            bail!("Atomic rename file {:?} failed - {}", path, err);
-        }
-        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
-    }
-
-    Ok(())
-}
-
 /// Actual implementation of pulling a snapshot.
 ///
 /// Pulling a snapshot consists of the following steps:
@@ -364,44 +717,37 @@ async fn try_client_log_download(
 /// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let mut manifest_name = snapshot.full_path();
+    let mut manifest_name = to_snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME);
 
-    let mut client_log_name = snapshot.full_path();
+    let mut client_log_name = to_snapshot.full_path();
     client_log_name.push(CLIENT_LOG_BLOB_NAME);
 
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
 
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        }
-    };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+    let tmp_manifest_blob;
+    if let Some(data) = params
+        .source
+        .load_file_into(
+            namespace,
+            from_snapshot,
+            MANIFEST_BLOB_NAME,
+            &tmp_manifest_name,
+            worker,
+        )
+        .await?
+    {
+        tmp_manifest_blob = data;
+    } else {
+        return Ok(());
+    }
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -418,8 +764,11 @@ async fn pull_snapshot(
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
-            }
+                params
+                    .source
+                    .try_download_client_log(from_snapshot, &client_log_name, worker)
+                    .await?;
+            };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
@@ -429,7 +778,7 @@ async fn pull_snapshot(
     let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
 
     for item in manifest.files() {
-        let mut path = snapshot.full_path();
+        let mut path = to_snapshot.full_path();
         path.push(&item.filename);
 
         if path.exists() {
@@ -467,18 +816,12 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
         pull_single_archive(
             worker,
-            &reader,
-            &mut chunk_reader,
-            snapshot,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
             item,
             downloaded_chunks.clone(),
         )
@@ -490,10 +833,12 @@ async fn pull_snapshot(
     }
 
     if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
-    }
-
-    snapshot
+        params
+            .source
+            .try_download_client_log(from_snapshot, &client_log_name, worker)
+            .await?;
+    };
+    to_snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
 
@@ -501,37 +846,53 @@ async fn pull_snapshot(
 }
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
-///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
 async fn pull_snapshot_from(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let (_path, is_new, _snap_lock) = snapshot
+    let (_path, is_new, _snap_lock) = to_snapshot
         .datastore()
-        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
 
     if is_new {
-        task_log!(worker, "sync snapshot {}", snapshot.dir());
+        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
-            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
-                snapshot.backup_ns(),
-                snapshot.as_ref(),
+        if let Err(err) = pull_snapshot(
+            worker,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
+            downloaded_chunks,
+        )
+        .await
+        {
+            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
+                to_snapshot.backup_ns(),
+                to_snapshot.as_ref(),
                 true,
             ) {
                 task_log!(worker, "cleanup error - {}", cleanup_err);
             }
             return Err(err);
         }
-        task_log!(worker, "sync snapshot {} done", snapshot.dir());
+        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
     } else {
-        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
-        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
+        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
+        pull_snapshot(
+            worker,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
+            downloaded_chunks,
+        )
+        .await?;
+        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
     }
 
     Ok(())
@@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
 /// - Sort by snapshot time
 /// - Get last snapshot timestamp on local datastore
 /// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
 ///
@@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
+    params: &mut PullParameters,
+    source_namespace: &BackupNamespace,
     group: &pbs_api_types::BackupGroup,
-    remote_ns: BackupNamespace,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
-    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
-    client.login().await?; // make sure auth is complete
-
-    let fingerprint = client.fingerprint();
-
-    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-
-    let mut remote_snapshots = std::collections::HashSet::new();
-
-    // start with 65536 chunks (up to 256 GiB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
-    progress.group_snapshots = list.len() as u64;
+    let target_ns = params.get_target_ns()?;
 
+    let mut source_snapshots = HashSet::new();
+    let last_sync = params
+        .target
+        .store
+        .last_successful_backup(&target_ns, group)?;
     let mut skip_info = SkipInfo {
         oldest: i64::MAX,
         newest: i64::MIN,
         count: 0,
     };
 
-    for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = item.backup;
-
-        // in-progress backups can't be synced
-        if item.size.is_none() {
-            task_log!(
-                worker,
-                "skipping snapshot {} - in-progress backup",
-                snapshot
-            );
-            continue;
-        }
+    let mut list: Vec<BackupDir> = params
+        .source
+        .list_backup_dirs(source_namespace, group, worker)
+        .await?
+        .into_iter()
+        .filter(|dir| {
+            source_snapshots.insert(dir.time);
+            if let Some(last_sync_time) = last_sync {
+                if last_sync_time > dir.time {
+                    skip_info.update(dir.time);
+                    return false;
+                }
+            }
+            true
+        })
+        .collect();
 
-        remote_snapshots.insert(snapshot.time);
+    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
 
-        if let Some(last_sync_time) = last_sync {
-            if last_sync_time > snapshot.time {
-                skip_info.update(snapshot.time);
-                continue;
-            }
-        }
+    // start with 65536 chunks (up to 256 GiB)
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
-
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
-
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
-
-        let reader = BackupReader::start(
-            new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+    progress.group_snapshots = list.len() as u64;
 
-        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+    for (pos, from_snapshot) in list.into_iter().enumerate() {
+        let to_snapshot = params
+            .target
+            .store
+            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let result = pull_snapshot_from(
+            worker,
+            params,
+            source_namespace,
+            &from_snapshot,
+            &to_snapshot,
+            downloaded_chunks.clone(),
+        )
+        .await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -703,11 +1025,14 @@ async fn pull_group(
     }
 
     if params.remove_vanished {
-        let group = params.store.backup_group(target_ns.clone(), group.clone());
+        let group = params
+            .target
+            .store
+            .backup_group(target_ns.clone(), group.clone());
         let local_list = group.list_backups()?;
         for info in local_list {
             let snapshot = info.backup_dir;
-            if remote_snapshots.contains(&snapshot.backup_time()) {
+            if source_snapshots.contains(&snapshot.backup_time()) {
                 continue;
             }
             if snapshot.is_protected() {
@@ -720,6 +1045,7 @@ async fn pull_group(
             }
             task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
             params
+                .target
                 .store
                 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
         }
@@ -732,64 +1058,12 @@ async fn pull_group(
     Ok(())
 }
 
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
-
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
-
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
-
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                }
-            },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
-    // parents first
-    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
-    Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
     let mut created = false;
-    let store_ns_str = print_store_and_ns(params.store.name(), ns);
+    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
 
-    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
-        check_ns_modification_privs(params.store.name(), ns, &params.owner)
+    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
             .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
 
         let name = match ns.components().last() {
@@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
             }
         };
 
-        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
             bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
         }
         created = true;
     }
 
     check_ns_privs(
-        params.store.name(),
+        params.target.store.name(),
         ns,
         &params.owner,
         PRIV_DATASTORE_BACKUP,
@@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
 }
 
 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
-    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
+    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
         .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
 
-    params.store.remove_namespace_recursive(local_ns, true)
+    params
+        .target
+        .store
+        .remove_namespace_recursive(local_ns, true)
 }
 
 fn check_and_remove_vanished_ns(
@@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
     // clamp like remote does so that we don't list more than we can ever have synced.
     let max_depth = params
         .max_depth
-        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
 
     let mut local_ns_list: Vec<BackupNamespace> = params
+        .target
         .store
-        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
         .filter(|ns| {
             let user_privs =
-                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
+                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
             user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
         })
         .collect();
@@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
     local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
 
     for local_ns in local_ns_list {
-        if local_ns == params.ns {
+        if local_ns == params.target.ns {
             continue;
         }
 
@@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
 
     let old_max_depth = params.max_depth;
-    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
-        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
-    } else {
-        query_namespaces(worker, client, &mut params).await?
-    };
+    let mut namespaces = params
+        .source
+        .list_namespaces(&mut params.max_depth, worker)
+        .await?;
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
 
     for namespace in namespaces {
-        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+        let source_store_ns_str = params.source.print_store_and_ns();
 
-        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
-        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
 
         task_log!(worker, "----");
         task_log!(
@@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(worker, &namespace, &mut params).await {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
                 task_log!(
                     worker,
                     "Encountered errors while syncing namespace {} - {}",
-                    namespace,
+                    &namespace,
                     err,
                 );
             }
@@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
-    source_ns: BackupNamespace,
-    target_ns: BackupNamespace,
+    namespace: &BackupNamespace,
+    params: &mut PullParameters,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
-
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+    let mut list: Vec<pbs_api_types::BackupGroup> =
+        params.source.list_groups(namespace, &params.owner).await?;
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup.ty.cmp(&b.backup.ty);
+        let type_order = a.ty.cmp(&b.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup.id.cmp(&b.backup.id)
+            a.id.cmp(&b.id)
         } else {
             type_order
         }
@@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    // Get groups with target NS set
-    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
         let list: Vec<pbs_api_types::BackupGroup> = list
@@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
+    let target_ns = params.get_target_ns()?;
     for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
@@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
 
         let (owner, _lock_guard) =
             match params
+                .target
                 .store
                 .create_locked_backup_group(&target_ns, &group, &params.owner)
             {
@@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
                         err
                     );
                     errors = true; // do not stop here, instead continue
+                    task_log!(worker, "create_locked_backup_group failed");
                     continue;
                 }
             };
@@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            source_ns.clone(),
-            &mut progress,
-        )
-        .await
+        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
         {
             task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
@@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
                 let local_group = local_group?;
                 let local_group = local_group.group();
                 if new_groups.contains(local_group) {
                     continue;
                 }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
+                let owner = params.target.store.get_owner(&target_ns, local_group)?;
                 if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
@@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
                     }
                 }
                 task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
+                match params
+                    .target
+                    .store
+                    .remove_backup_group(&target_ns, local_group)
+                {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(
-- 
2.30.2






More information about the pbs-devel mailing list