[pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull

Hannes Laimer h.laimer at proxmox.com
Mon Feb 13 16:45:53 CET 2023


... since creating a HttpClient(which would be needed
to reuse existing pull logic) without a remote was not
possible. This also improves the speed for local
sync-jobs.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 pbs-client/src/backup_reader.rs |   5 +
 src/api2/admin/datastore.rs     |  10 +
 src/server/pull.rs              | 499 ++++++++++++++++++++------------
 3 files changed, 335 insertions(+), 179 deletions(-)

diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..9dacef74 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -20,6 +20,11 @@ use pbs_tools::sha::sha256;
 
 use super::{H2Client, HttpClient};
 
+pub enum BackupSource {
+    Remote(Arc<BackupReader>),
+    Local(pbs_datastore::BackupDir),
+}
+
 /// Backup Reader
 pub struct BackupReader {
     h2: H2Client,
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 8d3a6146..8ad78f29 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -445,6 +445,16 @@ pub async fn list_snapshots(
     _param: Value,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<SnapshotListItem>, Error> {
+    do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await
+}
+
+pub async fn do_list_snapshots(
+    store: String,
+    ns: Option<BackupNamespace>,
+    backup_type: Option<BackupType>,
+    backup_id: Option<String>,
+    rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Vec<SnapshotListItem>, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
 
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..81df00c3 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,7 +1,7 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -9,6 +9,7 @@ use std::time::SystemTime;
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
 use pbs_config::CachedUserInfo;
+use pbs_datastore::read_chunk::AsyncReadChunk;
 use serde_json::json;
 
 use proxmox_router::HttpError;
@@ -21,7 +22,7 @@ use pbs_api_types::{
 };
 
 use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+    BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader,
 };
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress};
 use pbs_tools::sha::sha256;
 use proxmox_rest_server::WorkerTask;
 
@@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler;
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Remote that is pulled from
-    remote: Remote,
+    remote: Option<Remote>,
     /// Full specification of remote datastore
     source: BackupRepository,
     /// Local store that is pulled into
@@ -70,7 +71,7 @@ impl PullParameters {
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
@@ -86,18 +87,24 @@ impl PullParameters {
             remote_ns.check_max_depth(max_depth)?;
         }
 
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
+        let (remote, source): (Option<Remote>, BackupRepository) = if let Some(remote_str) = remote
+        {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote = remote_config.lookup::<Remote>("remote", remote_str)?;
+            let source = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
+            (Some(remote), source)
+        } else {
+            let source = BackupRepository::new(None, None, None, remote_store.to_string());
+            (None, source)
+        };
 
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
-
         Ok(Self {
             remote,
             remote_ns,
@@ -114,13 +121,17 @@ impl PullParameters {
 
     /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
     pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+        if let Some(remote) = &self.remote {
+            crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await
+        } else {
+            bail!("No remote specified. Do not use a HttpClient for a local sync.")
+        }
     }
 }
 
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, C: AsyncReadChunk + Clone>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: C,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// - Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive(
+async fn pull_single_archive<C: AsyncReadChunk + Clone>(
     worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
+    source: &BackupSource,
+    chunk_reader: C,
     snapshot: &pbs_datastore::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -279,7 +290,15 @@ async fn pull_single_archive(
         .read(true)
         .open(&tmp_path)?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    match source {
+        BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?,
+        BackupSource::Local(dir) => {
+            let mut source_path = dir.full_path();
+            source_path.push(archive_name);
+            let data = std::fs::read(source_path)?;
+            tmpfile.write_all(&data)?;
+        }
+    };
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -289,14 +308,23 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            match source {
+                BackupSource::Local(ref dir)
+                    if dir.datastore().name() == snapshot.datastore().name() =>
+                {
+                    task_log!(worker, "skipping chunk sync for same datatsore");
+                }
+                _ => {
+                    pull_index_chunks(
+                        worker,
+                        chunk_reader,
+                        snapshot.datastore().clone(),
+                        index,
+                        downloaded_chunks,
+                    )
+                    .await?;
+                }
+            };
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -304,15 +332,23 @@ async fn pull_single_archive(
             })?;
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
-
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            match source {
+                BackupSource::Local(ref dir)
+                    if dir.datastore().name() == snapshot.datastore().name() =>
+                {
+                    task_log!(worker, "skipping chunk sync for same datatsore");
+                }
+                _ => {
+                    pull_index_chunks(
+                        worker,
+                        chunk_reader,
+                        snapshot.datastore().clone(),
+                        index,
+                        downloaded_chunks,
+                    )
+                    .await?;
+                }
+            };
         }
         ArchiveType::Blob => {
             tmpfile.seek(SeekFrom::Start(0))?;
@@ -321,6 +357,9 @@ async fn pull_single_archive(
         }
     }
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
+        task_log!(worker, "sync archive {}", archive_name);
+        task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str());
+        task_log!(worker, "path path {:?}", path.as_os_str());
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
     Ok(())
@@ -364,7 +403,7 @@ async fn try_client_log_download(
 /// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
+    source: BackupSource,
     snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
@@ -377,31 +416,45 @@ async fn pull_snapshot(
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
 
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
+    let tmp_manifest_blob = match source {
+        BackupSource::Remote(ref reader) => {
+            let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await {
+                Ok(manifest_file) => manifest_file,
+                Err(err) => {
+                    match err.downcast_ref::<HttpError>() {
+                        Some(HttpError { code, message }) => match *code {
+                            StatusCode::NOT_FOUND => {
+                                task_log!(
+                                    worker,
+                                    "skipping snapshot {} - vanished since start of sync",
+                                    snapshot.dir(),
+                                );
+                                return Ok(());
+                            }
+                            _ => {
+                                bail!("HTTP error {code} - {message}");
+                            }
+                        },
+                        None => {
+                            return Err(err);
+                        }
+                    };
                 }
             };
+            DataBlob::load_from_reader(&mut tmp_manifest_file)?
+        }
+        BackupSource::Local(ref dir) => {
+            let data = dir.load_blob(MANIFEST_BLOB_NAME)?;
+            let mut tmp_manifest_file = std::fs::OpenOptions::new()
+                .write(true)
+                .create(true)
+                .truncate(true)
+                .read(true)
+                .open(&tmp_manifest_name)?;
+            tmp_manifest_file.write_all(data.raw_data())?;
+            data
         }
     };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -417,13 +470,17 @@ async fn pull_snapshot(
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
-            if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
+            if let BackupSource::Remote(ref reader) = source {
+                if !client_log_name.exists() {
+                    try_client_log_download(worker, reader.clone(), &client_log_name).await?;
+                };
+            }
+            if tmp_manifest_name.exists() {
+                let _ = std::fs::remove_file(&tmp_manifest_name);
             }
             task_log!(worker, "no data changes");
-            let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
-        }
+        };
     }
 
     let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
@@ -467,32 +524,49 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
-        pull_single_archive(
-            worker,
-            &reader,
-            &mut chunk_reader,
-            snapshot,
-            item,
-            downloaded_chunks.clone(),
-        )
-        .await?;
+        match source {
+            BackupSource::Remote(ref reader) => {
+                let chunk_reader = RemoteChunkReader::new(
+                    reader.clone(),
+                    None,
+                    item.chunk_crypt_mode(),
+                    HashMap::new(),
+                );
+                pull_single_archive(
+                    worker,
+                    &source,
+                    chunk_reader,
+                    snapshot,
+                    item,
+                    downloaded_chunks.clone(),
+                )
+                .await?
+            }
+            BackupSource::Local(ref dir) => {
+                let chunk_reader =
+                    LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode());
+                pull_single_archive(
+                    worker,
+                    &source,
+                    chunk_reader,
+                    snapshot,
+                    item,
+                    downloaded_chunks.clone(),
+                )
+                .await?
+            }
+        }
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
         bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
     }
 
-    if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
+    if let BackupSource::Remote(reader) = source {
+        if !client_log_name.exists() {
+            try_client_log_download(worker, reader, &client_log_name).await?;
+        };
     }
-
     snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,7 +580,7 @@ async fn pull_snapshot(
 /// pointing to the local datastore and target namespace.
 async fn pull_snapshot_from(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
+    source: BackupSource,
     snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
@@ -517,7 +591,7 @@ async fn pull_snapshot_from(
     if is_new {
         task_log!(worker, "sync snapshot {}", snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+        if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await {
             if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                 snapshot.backup_ns(),
                 snapshot.as_ref(),
@@ -530,7 +604,7 @@ async fn pull_snapshot_from(
         task_log!(worker, "sync snapshot {} done", snapshot.dir());
     } else {
         task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
+        pull_snapshot(worker, source, snapshot, downloaded_chunks).await?;
         task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
     }
 
@@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &PullParameters,
     group: &pbs_api_types::BackupGroup,
     remote_ns: BackupNamespace,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
     let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
+    let mut list: Vec<SnapshotListItem> = if let Some(client) = client {
+        let path = format!(
+            "api2/json/admin/datastore/{}/snapshots",
+            params.source.store()
+        );
 
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
 
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
+        if !remote_ns.is_root() {
+            args["ns"] = serde_json::to_value(&remote_ns)?;
+        }
 
-    client.login().await?; // make sure auth is complete
+        let mut result = client.get(&path, Some(args)).await?;
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+        let source_ns = if remote_ns.is_root() {
+            None
+        } else {
+            Some(remote_ns.clone())
+        };
+        crate::api2::admin::datastore::do_list_snapshots(
+            params.source.store().to_string(),
+            source_ns,
+            Some(group.ty),
+            Some(group.id.clone()),
+            &mut rpcenv,
+        )
+        .await?
+    };
+    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
 
-    let fingerprint = client.fingerprint();
+    if let Some(client) = client {
+        client.login().await?; // make sure auth is complete
+    }
 
     let last_sync = params.store.last_successful_backup(&target_ns, group)?;
 
@@ -646,6 +736,13 @@ async fn pull_group(
         count: 0,
     };
 
+    let datastore: Option<Arc<DataStore>> = match client {
+        None => Some(DataStore::lookup_datastore(
+            params.source.store(),
+            Some(Operation::Read),
+        )?),
+        _ => None,
+    };
     for (pos, item) in list.into_iter().enumerate() {
         let snapshot = item.backup;
 
@@ -668,33 +765,47 @@ async fn pull_group(
             }
         }
 
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
-
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
+        let backup_source = if let Some(client) = client {
+            // get updated auth_info (new tickets)
+            let auth_info = client.login().await?;
+            let fingerprint = client.fingerprint();
 
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
-
-        let reader = BackupReader::start(
-            new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+            let options = HttpClientOptions::new_non_interactive(
+                auth_info.ticket.clone(),
+                fingerprint.clone(),
+            )
+            .rate_limit(params.limit.clone());
+
+            let new_client = HttpClient::new(
+                params.source.host(),
+                params.source.port(),
+                params.source.auth_id(),
+                options,
+            )?;
+
+            BackupSource::Remote(
+                BackupReader::start(
+                    new_client,
+                    None,
+                    params.source.store(),
+                    &remote_ns,
+                    &snapshot,
+                    true,
+                )
+                .await?,
+            )
+        } else {
+            if let Some(datastore) = datastore.clone() {
+                BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?)
+            } else {
+                unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier")
+            }
+        };
 
         let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -735,49 +846,64 @@ async fn pull_group(
 // will modify params if switching to backwards mode for lack of NS support on remote end
 async fn query_namespaces(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &mut PullParameters,
 ) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
+    let mut list: Vec<NamespaceListItem> = if let Some(client) = client {
+        let path = format!(
+            "api2/json/admin/datastore/{}/namespace",
+            params.source.store()
+        );
+        let mut data = json!({});
+        if let Some(max_depth) = params.max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
 
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
+        if !params.remote_ns.is_root() {
+            data["parent"] = json!(params.remote_ns);
+        }
 
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
+        let mut result = match client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        if params.remote_ns.is_root() && params.max_depth.is_none() {
+                            task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            params.max_depth = Some(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
 
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                        return Ok(vec![params.remote_ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
                 }
             },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
+        };
 
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+        let parent_ns = if params.remote_ns.is_root() {
+            None
+        } else {
+            Some(params.remote_ns.clone())
+        };
+        crate::api2::admin::namespace::list_namespaces(
+            params.source.store().to_string(),
+            parent_ns,
+            params.max_depth,
+            &mut rpcenv,
+        )?
+    };
     // parents first
     list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
 
@@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
@@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
+    client: Option<&HttpClient>,
     params: &PullParameters,
     source_ns: BackupNamespace,
     target_ns: BackupNamespace,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
+    let mut list: Vec<GroupListItem> = if let Some(client) = client {
+        let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
 
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
+        let args = if !source_ns.is_root() {
+            Some(json!({
+                "ns": source_ns,
+            }))
+        } else {
+            None
+        };
 
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
+        let mut result = client
+            .get(&path, args)
+            .await
+            .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
 
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+        serde_json::from_value(result["data"].take())?
+    } else {
+        let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+        proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+        let source_ns = if source_ns.is_root() {
+            None
+        } else {
+            Some(source_ns.clone())
+        };
+        crate::api2::admin::datastore::list_groups(
+            params.source.store().to_string(),
+            source_ns,
+            &mut rpcenv,
+        )?
+    };
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-- 
2.30.2






More information about the pbs-devel mailing list