[pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore

Hannes Laimer h.laimer at proxmox.com
Tue Aug 8 14:13:43 CEST 2023


... making the pull logic independent from the actual source
using two traits.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 Cargo.toml                      |   2 +
 pbs-datastore/src/read_chunk.rs |   2 +-
 src/api2/config/remote.rs       |  14 +-
 src/api2/pull.rs                |  31 +-
 src/server/pull.rs              | 943 +++++++++++++++++++-------------
 5 files changed, 570 insertions(+), 422 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 4d34f8a1..74cb68e0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
 
 # regular crates
 anyhow = "1.0"
+async-trait = "0.1.56"
 apt-pkg-native = "0.3.2"
 base64 = "0.13"
 bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
 
 [dependencies]
 anyhow.workspace = true
+async-trait.workspace = true
 apt-pkg-native.workspace = true
 base64.workspace = true
 bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
 }
 
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
     /// Returns the encoded chunk data
     fn read_raw_chunk<'a>(
         &'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 307cf3cd..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
     Ok(())
 }
 
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
     remote: &Remote,
     limit: Option<RateLimitConfig>,
 ) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
         &remote.config.auth_id,
         options,
     )?;
+
+    Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+    remote: &Remote,
+    limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+    let client = remote_client_config(remote, limit)?;
     let _auth_info = client
         .login() // make sure we can auth
         .await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 664ecce5..e36a5b14 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
 
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
-    GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+    GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
     PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     TRANSFER_LAST_SCHEMA,
 };
@@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            sync_job.remote.as_deref().unwrap_or("local"),
+            sync_job.remote.as_deref(),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -124,7 +124,6 @@ pub fn do_sync_job(
 
             let worker_future = async move {
                 let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -138,24 +137,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                if sync_job.remote.is_some() {
-                    pull_store(&worker, &client, pull_params).await?;
-                } else {
-                    if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
-                        if target_ns.path().starts_with(source_ns.path())
-                            && sync_job.store == sync_job.remote_store
-                            && sync_job.max_depth.map_or(true, |sync_depth| {
-                            target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
-                        }) {
-                            task_log!(
-                                worker,
-                                "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
-                            );
-                        }
-                    } else {
-                        pull_store(&worker, &client, pull_params).await?;
-                    }
-                }
+                pull_store(&worker, pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -284,7 +266,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        remote.as_deref().unwrap_or("local"),
+        remote.as_deref(),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -294,7 +276,6 @@ async fn pull(
         limit,
         transfer_last,
     )?;
-    let client = pull_params.client().await?;
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -312,7 +293,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
@@ -327,4 +308,4 @@ async fn pull(
     Ok(upid_str)
 }
 
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..e1a27a8c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
-    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-
-use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
 
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
 use crate::tools::parallel_handler::ParallelHandler;
 
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
-    /// Remote that is pulled from
-    remote: Remote,
-    /// Full specification of remote datastore
-    source: BackupRepository,
-    /// Local store that is pulled into
+struct RemoteReader {
+    backup_reader: Arc<BackupReader>,
+    dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
     store: Arc<DataStore>,
-    /// Remote namespace
-    remote_ns: BackupNamespace,
-    /// Local namespace (anchor)
     ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+    repo: BackupRepository,
+    ns: BackupNamespace,
+    client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+    /// Lists namespaces from the source.
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error>;
+
+    /// Lists groups within a specific namespace from the source.
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error>;
+
+    /// Lists backup directories for a specific group within a specific namespace from the source.
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error>;
+    fn get_ns(&self) -> BackupNamespace;
+    fn print_store_and_ns(&self) -> String;
+
+    /// Returns a reader for reading data from a specific backup directory.
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+            vec![self.ns.clone()];
+        }
+
+        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+        let mut data = json!({});
+        if let Some(max_depth) = max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
+
+        if !self.ns.is_root() {
+            data["parent"] = json!(self.ns);
+        }
+        self.client.login().await?;
+
+        let mut result = match self.client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match code {
+                    &StatusCode::NOT_FOUND => {
+                        if self.ns.is_root() && max_depth.is_none() {
+                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            max_depth.replace(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
+
+                        return Ok(vec![self.ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
+                }
+            },
+        };
+
+        let list: Vec<BackupNamespace> =
+            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+                .iter()
+                .map(|list_item| list_item.ns.clone())
+                .collect();
+
+        Ok(list)
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        _owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+        let args = if !namespace.is_root() {
+            Some(json!({ "ns": namespace.clone() }))
+        } else {
+            None
+        };
+
+        self.client.login().await?;
+        let mut result =
+            self.client.get(&path, args).await.map_err(|err| {
+                format_err!("Failed to retrieve backup groups from remote - {}", err)
+            })?;
+
+        Ok(
+            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                .map_err(Error::from)?
+                .into_iter()
+                .map(|item| item.backup)
+                .collect::<Vec<BackupGroup>>(),
+        )
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        _namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
+
+        if !self.ns.is_root() {
+            args["ns"] = serde_json::to_value(&self.ns)?;
+        }
+
+        self.client.login().await?;
+
+        let mut result = self.client.get(&path, Some(args)).await?;
+        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        Ok(snapshot_list
+            .into_iter()
+            .filter_map(|item: SnapshotListItem| {
+                let snapshot = item.backup;
+                // in-progress backups can't be synced
+                if item.size.is_none() {
+                    task_log!(
+                        worker,
+                        "skipping snapshot {} - in-progress backup",
+                        snapshot
+                    );
+                    return None;
+                }
+
+                Some(snapshot)
+            })
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.repo.store(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let backup_reader =
+            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+        Ok(Arc::new(RemoteReader {
+            backup_reader,
+            dir: dir.clone(),
+        }))
+    }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+    /// Returns a chunk reader with the specified encryption mode.
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+    /// Asynchronously loads a file from the source into a local file.
+    /// `filename` is the name of the file to load from the source.
+    /// `into` is the path of the local file to load the source file into.
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error>;
+
+    /// Tries to download the client log from the source and save it into a local file.
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error>;
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(RemoteChunkReader::new(
+            self.backup_reader.clone(),
+            None,
+            crypt_mode,
+            HashMap::new(),
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+        if let Err(err) = download_result {
+            match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        task_log!(
+                            worker,
+                            "skipping snapshot {} - vanished since start of sync",
+                            &self.dir,
+                        );
+                        return Ok(None);
+                    }
+                    _ => {
+                        bail!("HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    return Err(err);
+                }
+            };
+        };
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        let mut tmp_path = to_path.to_owned();
+        tmp_path.set_extension("tmp");
+
+        let tmpfile = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .read(true)
+            .open(&tmp_path)?;
+
+        // Note: be silent if there is no log - only log successful download
+        if let Ok(()) = self
+            .backup_reader
+            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+            .await
+        {
+            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                bail!("Atomic rename file {:?} failed - {}", to_path, err);
+            }
+            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+        }
+
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+        false
+    }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+    /// Where data is pulled from
+    source: Arc<dyn PullSource>,
+    /// Where data should be pulled into
+    target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
     max_depth: Option<usize>,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
-    /// Rate limits for all transfers from `remote`
-    limit: RateLimitConfig,
     /// How many snapshots should be transferred at most (taking the newest N snapshots)
     transfer_last: Option<usize>,
 }
 
 impl PullParameters {
     /// Creates a new instance of `PullParameters`.
-    ///
-    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
-    /// [BackupRepository] with `remote_store`.
-    #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
@@ -82,49 +376,56 @@ impl PullParameters {
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
             remote_ns.check_max_depth(max_depth)?;
-        }
-
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
-
+        };
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
+        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        Ok(Self {
-            remote,
-            remote_ns,
+            let repo = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
+            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+            Arc::new(RemoteSource {
+                repo,
+                ns: remote_ns,
+                client,
+            })
+        } else {
+            bail!("local sync not implemented yet")
+        };
+        let target = PullTarget {
+            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
             ns,
+        };
+
+        Ok(Self {
             source,
-            store,
+            target,
             owner,
             remove_vanished,
             max_depth,
             group_filter,
-            limit,
             transfer_last,
         })
     }
 
-    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
-    pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
+        let source_ns = self.source.get_ns();
+        source_ns.map_prefix(&source_ns, &self.target.ns)
     }
 }
 
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: Arc<dyn AsyncReadChunk>,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-    let mut tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .read(true)
-        .open(filename)?;
-
-    reader
-        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
-        .await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
         bail!(
@@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// Pulls a single file referenced by a manifest.
 ///
 /// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive(
-    worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    snapshot: &pbs_datastore::BackupDir,
-    archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
+    archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
@@ -277,13 +557,11 @@ async fn pull_single_archive(
 
     task_log!(worker, "sync archive {}", archive_name);
 
-    let mut tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
+    reader
+        .load_file_into(archive_name, &tmp_path, worker)
+        .await?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -293,14 +571,18 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +591,21 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::Blob => {
-            tmpfile.seek(SeekFrom::Start(0))?;
+            tmpfile.rewind()?;
             let (csum, size) = sha256(&mut tmpfile)?;
             verify_archive(archive_info, &csum, size)?;
         }
@@ -330,33 +616,6 @@ async fn pull_single_archive(
     Ok(())
 }
 
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    path: &std::path::Path,
-) -> Result<(), Error> {
-    let mut tmp_path = path.to_owned();
-    tmp_path.set_extension("tmp");
-
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    // Note: be silent if there is no log - only log successful download
-    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            bail!("Atomic rename file {:?} failed - {}", path, err);
-        }
-        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
-    }
-
-    Ok(())
-}
-
 /// Actual implementation of pulling a snapshot.
 ///
 /// Pulling a snapshot consists of the following steps:
@@ -366,10 +625,10 @@ async fn try_client_log_download(
 /// -- if file already exists, verify contents
 /// -- if not, pull it from the remote
 /// - Download log if not already existing
-async fn pull_snapshot(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let mut manifest_name = snapshot.full_path();
@@ -380,32 +639,15 @@ async fn pull_snapshot(
 
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
-
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        }
-    };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+    let tmp_manifest_blob;
+    if let Some(data) = reader
+        .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+        .await?
+    {
+        tmp_manifest_blob = data;
+    } else {
+        return Ok(());
+    }
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +664,10 @@ async fn pull_snapshot(
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
-            }
+                reader
+                    .try_download_client_log(&client_log_name, worker)
+                    .await?;
+            };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
@@ -471,17 +715,9 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
         pull_single_archive(
             worker,
-            &reader,
-            &mut chunk_reader,
+            reader.clone(),
             snapshot,
             item,
             downloaded_chunks.clone(),
@@ -494,9 +730,10 @@ async fn pull_snapshot(
     }
 
     if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
-    }
-
+        reader
+            .try_download_client_log(&client_log_name, worker)
+            .await?;
+    };
     snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +743,12 @@ async fn pull_snapshot(
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
 ///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
 /// - Sort by snapshot time
 /// - Get last snapshot timestamp on local datastore
 /// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
 ///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
 /// namespace support yet.
 ///
@@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
     params: &PullParameters,
-    group: &pbs_api_types::BackupGroup,
-    remote_ns: BackupNamespace,
+    source_namespace: &BackupNamespace,
+    group: &BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    task_log!(worker, "sync group {}", group);
-
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
-    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
-    client.login().await?; // make sure auth is complete
-
-    let fingerprint = client.fingerprint();
-
-    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-    let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
-    let mut remote_snapshots = std::collections::HashSet::new();
-
-    // start with 65536 chunks (up to 256 GiB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
-    progress.group_snapshots = list.len() as u64;
-
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
-    let total_amount = list.len();
+    let mut raw_list: Vec<BackupDir> = params
+        .source
+        .list_backup_dirs(source_namespace, group, worker)
+        .await?;
+    raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+    let total_amount = raw_list.len();
 
     let cutoff = params
         .transfer_last
         .map(|count| total_amount.saturating_sub(count))
         .unwrap_or_default();
 
-    for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = item.backup;
-
-        // in-progress backups can't be synced
-        if item.size.is_none() {
-            task_log!(
-                worker,
-                "skipping snapshot {} - in-progress backup",
-                snapshot
-            );
-            continue;
-        }
-
-        remote_snapshots.insert(snapshot.time);
+    let target_ns = params.get_target_ns()?;
 
-        if last_sync_time > snapshot.time {
-            already_synced_skip_info.update(snapshot.time);
-            continue;
-        } else if already_synced_skip_info.count > 0 {
-            task_log!(worker, "{}", already_synced_skip_info);
-            already_synced_skip_info.reset();
-        }
-
-        if pos < cutoff && last_sync_time != snapshot.time {
-            transfer_last_skip_info.update(snapshot.time);
-            continue;
-        } else if transfer_last_skip_info.count > 0 {
-            task_log!(worker, "{}", transfer_last_skip_info);
-            transfer_last_skip_info.reset();
-        }
-
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
+    let mut source_snapshots = HashSet::new();
+    let last_sync_time = params
+        .target
+        .store
+        .last_successful_backup(&target_ns, group)?
+        .unwrap_or(i64::MIN);
+
+    let list: Vec<BackupDir> = raw_list
+        .into_iter()
+        .enumerate()
+        .filter(|&(pos, ref dir)| {
+            source_snapshots.insert(dir.time);
+            if last_sync_time > dir.time {
+                already_synced_skip_info.update(dir.time);
+                return false;
+            } else if already_synced_skip_info.count > 0 {
+                task_log!(worker, "{}", already_synced_skip_info);
+                already_synced_skip_info.reset();
+                return true;
+            }
 
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
+            if pos < cutoff && last_sync_time != dir.time {
+                transfer_last_skip_info.update(dir.time);
+                return false;
+            } else if transfer_last_skip_info.count > 0 {
+                task_log!(worker, "{}", transfer_last_skip_info);
+                transfer_last_skip_info.reset();
+            }
+            true
+        })
+        .map(|(_, dir)| dir)
+        .collect();
 
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
+    // start with 65536 chunks (up to 256 GiB)
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-        let reader = BackupReader::start(
-            &new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+    progress.group_snapshots = list.len() as u64;
 
-        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+    for (pos, from_snapshot) in list.into_iter().enumerate() {
+        let to_snapshot = params
+            .target
+            .store
+            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let reader = params
+            .source
+            .reader(source_namespace, &from_snapshot)
+            .await?;
+        let result =
+            pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +956,14 @@ async fn pull_group(
     }
 
     if params.remove_vanished {
-        let group = params.store.backup_group(target_ns.clone(), group.clone());
+        let group = params
+            .target
+            .store
+            .backup_group(params.get_target_ns()?, group.clone());
         let local_list = group.list_backups()?;
         for info in local_list {
             let snapshot = info.backup_dir;
-            if remote_snapshots.contains(&snapshot.backup_time()) {
+            if source_snapshots.contains(&snapshot.backup_time()) {
                 continue;
             }
             if snapshot.is_protected() {
@@ -774,73 +975,23 @@ async fn pull_group(
                 continue;
             }
             task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
-            params
-                .store
-                .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
+            params.target.store.remove_backup_dir(
+                &params.get_target_ns()?,
+                snapshot.as_ref(),
+                false,
+            )?;
         }
     }
 
     Ok(())
 }
 
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
-
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
-
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
-
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                }
-            },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
-    // parents first
-    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
-    Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
     let mut created = false;
-    let store_ns_str = print_store_and_ns(params.store.name(), ns);
+    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
 
-    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
-        check_ns_modification_privs(params.store.name(), ns, &params.owner)
+    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
             .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
 
         let name = match ns.components().last() {
@@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
             }
         };
 
-        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
             bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
         }
         created = true;
     }
 
     check_ns_privs(
-        params.store.name(),
+        params.target.store.name(),
         ns,
         &params.owner,
         PRIV_DATASTORE_BACKUP,
@@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
 }
 
 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
-    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
+    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
         .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
 
-    params.store.remove_namespace_recursive(local_ns, true)
+    params
+        .target
+        .store
+        .remove_namespace_recursive(local_ns, true)
 }
 
 fn check_and_remove_vanished_ns(
@@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
     // clamp like remote does so that we don't list more than we can ever have synced.
     let max_depth = params
         .max_depth
-        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
 
     let mut local_ns_list: Vec<BackupNamespace> = params
+        .target
         .store
-        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
         .filter(|ns| {
             let user_privs =
-                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
+                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
             user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
         })
         .collect();
@@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
     local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
 
     for local_ns in local_ns_list {
-        if local_ns == params.ns {
+        if local_ns == params.target.ns {
             continue;
         }
 
@@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
 
     let old_max_depth = params.max_depth;
-    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
-        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+    let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+        vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
     } else {
-        query_namespaces(worker, client, &mut params).await?
+        params
+            .source
+            .list_namespaces(&mut params.max_depth, worker)
+            .await?
     };
+
+    let ns_layers_to_be_pulled = namespaces
+        .iter()
+        .map(BackupNamespace::depth)
+        .max()
+        .map_or(0, |v| v - params.source.get_ns().depth());
+    let target_depth = params.target.ns.depth();
+
+    if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+        bail!(
+            "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+            ns_layers_to_be_pulled,
+            target_depth,
+            MAX_NAMESPACE_DEPTH
+        );
+    }
+
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+    namespaces.sort_unstable_by_key(|a| a.name_len());
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
 
     for namespace in namespaces {
-        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+        let source_store_ns_str = params.source.print_store_and_ns();
 
-        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
-        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
 
         task_log!(worker, "----");
         task_log!(
@@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(worker, &namespace, &mut params).await {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
                 task_log!(
                     worker,
                     "Encountered errors while syncing namespace {} - {}",
-                    namespace,
+                    &namespace,
                     err,
                 );
             }
@@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
-    source_ns: BackupNamespace,
-    target_ns: BackupNamespace,
+    namespace: &BackupNamespace,
+    params: &mut PullParameters,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
-
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+    let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup.ty.cmp(&b.backup.ty);
+        let type_order = a.ty.cmp(&b.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup.id.cmp(&b.backup.id)
+            a.id.cmp(&b.id)
         } else {
             type_order
         }
     });
 
-    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    // Get groups with target NS set
-    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
-        let list: Vec<pbs_api_types::BackupGroup> = list
+        let list: Vec<BackupGroup> = list
             .into_iter()
             .filter(|group| apply_filters(group, group_filter))
             .collect();
@@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
 
     let mut errors = false;
 
-    let mut new_groups = std::collections::HashSet::new();
+    let mut new_groups = HashSet::new();
     for group in list.iter() {
         new_groups.insert(group.clone());
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
+    let target_ns = params.get_target_ns()?;
     for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
@@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
 
         let (owner, _lock_guard) =
             match params
+                .target
                 .store
                 .create_locked_backup_group(&target_ns, &group, &params.owner)
             {
@@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
                         &group,
                         err
                     );
-                    errors = true; // do not stop here, instead continue
+                    errors = true;
+                    // do not stop here, instead continue
+                    task_log!(worker, "create_locked_backup_group failed");
                     continue;
                 }
             };
@@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            source_ns.clone(),
-            &mut progress,
-        )
-        .await
+        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
         {
             task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
@@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
                 let local_group = local_group?;
                 let local_group = local_group.group();
                 if new_groups.contains(local_group) {
                     continue;
                 }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
+                let owner = params.target.store.get_owner(&target_ns, local_group)?;
                 if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
@@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
                     }
                 }
                 task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
+                match params
+                    .target
+                    .store
+                    .remove_backup_group(&target_ns, local_group)
+                {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(
-- 
2.39.2






More information about the pbs-devel mailing list