[pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull
Hannes Laimer
h.laimer at proxmox.com
Mon Feb 13 16:45:53 CET 2023
... since creating a HttpClient(which would be needed
to reuse existing pull logic) without a remote was not
possible. This also improves the speed for local
sync-jobs.
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
pbs-client/src/backup_reader.rs | 5 +
src/api2/admin/datastore.rs | 10 +
src/server/pull.rs | 499 ++++++++++++++++++++------------
3 files changed, 335 insertions(+), 179 deletions(-)
diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..9dacef74 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -20,6 +20,11 @@ use pbs_tools::sha::sha256;
use super::{H2Client, HttpClient};
+pub enum BackupSource {
+ Remote(Arc<BackupReader>),
+ Local(pbs_datastore::BackupDir),
+}
+
/// Backup Reader
pub struct BackupReader {
h2: H2Client,
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 8d3a6146..8ad78f29 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -445,6 +445,16 @@ pub async fn list_snapshots(
_param: Value,
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<SnapshotListItem>, Error> {
+ do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await
+}
+
+pub async fn do_list_snapshots(
+ store: String,
+ ns: Option<BackupNamespace>,
+ backup_type: Option<BackupType>,
+ backup_id: Option<String>,
+ rpcenv: &mut dyn RpcEnvironment,
) -> Result<Vec<SnapshotListItem>, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..81df00c3 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,7 +1,7 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -9,6 +9,7 @@ use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
use pbs_config::CachedUserInfo;
+use pbs_datastore::read_chunk::AsyncReadChunk;
use serde_json::json;
use proxmox_router::HttpError;
@@ -21,7 +22,7 @@ use pbs_api_types::{
};
use pbs_client::{
- BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
+ BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader,
};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress};
use pbs_tools::sha::sha256;
use proxmox_rest_server::WorkerTask;
@@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler;
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Remote that is pulled from
- remote: Remote,
+ remote: Option<Remote>,
/// Full specification of remote datastore
source: BackupRepository,
/// Local store that is pulled into
@@ -70,7 +71,7 @@ impl PullParameters {
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
remote_ns: BackupNamespace,
owner: Authid,
@@ -86,18 +87,24 @@ impl PullParameters {
remote_ns.check_max_depth(max_depth)?;
}
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
+ let (remote, source): (Option<Remote>, BackupRepository) = if let Some(remote_str) = remote
+ {
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote = remote_config.lookup::<Remote>("remote", remote_str)?;
+ let source = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+ (Some(remote), source)
+ } else {
+ let source = BackupRepository::new(None, None, None, remote_store.to_string());
+ (None, source)
+ };
let remove_vanished = remove_vanished.unwrap_or(false);
- let source = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
-
Ok(Self {
remote,
remote_ns,
@@ -114,13 +121,17 @@ impl PullParameters {
/// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+ if let Some(remote) = &self.remote {
+ crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await
+ } else {
+ bail!("No remote specified. Do not use a HttpClient for a local sync.")
+ }
}
}
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, C: AsyncReadChunk + Clone>(
worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
+ chunk_reader: C,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// - Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive(
+async fn pull_single_archive<C: AsyncReadChunk + Clone>(
worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
+ source: &BackupSource,
+ chunk_reader: C,
snapshot: &pbs_datastore::BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -279,7 +290,15 @@ async fn pull_single_archive(
.read(true)
.open(&tmp_path)?;
- reader.download(archive_name, &mut tmpfile).await?;
+ match source {
+ BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?,
+ BackupSource::Local(dir) => {
+ let mut source_path = dir.full_path();
+ source_path.push(archive_name);
+ let data = std::fs::read(source_path)?;
+ tmpfile.write_all(&data)?;
+ }
+ };
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
@@ -289,14 +308,23 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ match source {
+ BackupSource::Local(ref dir)
+ if dir.datastore().name() == snapshot.datastore().name() =>
+ {
+ task_log!(worker, "skipping chunk sync for same datatsore");
+ }
+ _ => {
+ pull_index_chunks(
+ worker,
+ chunk_reader,
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
+ };
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -304,15 +332,23 @@ async fn pull_single_archive(
})?;
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
-
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ match source {
+ BackupSource::Local(ref dir)
+ if dir.datastore().name() == snapshot.datastore().name() =>
+ {
+ task_log!(worker, "skipping chunk sync for same datatsore");
+ }
+ _ => {
+ pull_index_chunks(
+ worker,
+ chunk_reader,
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
+ };
}
ArchiveType::Blob => {
tmpfile.seek(SeekFrom::Start(0))?;
@@ -321,6 +357,9 @@ async fn pull_single_archive(
}
}
if let Err(err) = std::fs::rename(&tmp_path, &path) {
+ task_log!(worker, "sync archive {}", archive_name);
+ task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str());
+ task_log!(worker, "path path {:?}", path.as_os_str());
bail!("Atomic rename file {:?} failed - {}", path, err);
}
Ok(())
@@ -364,7 +403,7 @@ async fn try_client_log_download(
/// - Download log if not already existing
async fn pull_snapshot(
worker: &WorkerTask,
- reader: Arc<BackupReader>,
+ source: BackupSource,
snapshot: &pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
@@ -377,31 +416,45 @@ async fn pull_snapshot(
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- task_log!(
- worker,
- "skipping snapshot {} - vanished since start of sync",
- snapshot.dir(),
- );
- return Ok(());
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
+ let tmp_manifest_blob = match source {
+ BackupSource::Remote(ref reader) => {
+ let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await {
+ Ok(manifest_file) => manifest_file,
+ Err(err) => {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ task_log!(
+ worker,
+ "skipping snapshot {} - vanished since start of sync",
+ snapshot.dir(),
+ );
+ return Ok(());
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
}
};
+ DataBlob::load_from_reader(&mut tmp_manifest_file)?
+ }
+ BackupSource::Local(ref dir) => {
+ let data = dir.load_blob(MANIFEST_BLOB_NAME)?;
+ let mut tmp_manifest_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(&tmp_manifest_name)?;
+ tmp_manifest_file.write_all(data.raw_data())?;
+ data
}
};
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
if manifest_name.exists() {
let manifest_blob = proxmox_lang::try_block!({
@@ -417,13 +470,17 @@ async fn pull_snapshot(
})?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
- if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
+ if let BackupSource::Remote(ref reader) = source {
+ if !client_log_name.exists() {
+ try_client_log_download(worker, reader.clone(), &client_log_name).await?;
+ };
+ }
+ if tmp_manifest_name.exists() {
+ let _ = std::fs::remove_file(&tmp_manifest_name);
}
task_log!(worker, "no data changes");
- let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
- }
+ };
}
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
@@ -467,32 +524,49 @@ async fn pull_snapshot(
}
}
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
- pull_single_archive(
- worker,
- &reader,
- &mut chunk_reader,
- snapshot,
- item,
- downloaded_chunks.clone(),
- )
- .await?;
+ match source {
+ BackupSource::Remote(ref reader) => {
+ let chunk_reader = RemoteChunkReader::new(
+ reader.clone(),
+ None,
+ item.chunk_crypt_mode(),
+ HashMap::new(),
+ );
+ pull_single_archive(
+ worker,
+ &source,
+ chunk_reader,
+ snapshot,
+ item,
+ downloaded_chunks.clone(),
+ )
+ .await?
+ }
+ BackupSource::Local(ref dir) => {
+ let chunk_reader =
+ LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode());
+ pull_single_archive(
+ worker,
+ &source,
+ chunk_reader,
+ snapshot,
+ item,
+ downloaded_chunks.clone(),
+ )
+ .await?
+ }
+ }
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
}
- if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
+ if let BackupSource::Remote(reader) = source {
+ if !client_log_name.exists() {
+ try_client_log_download(worker, reader, &client_log_name).await?;
+ };
}
-
snapshot
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,7 +580,7 @@ async fn pull_snapshot(
/// pointing to the local datastore and target namespace.
async fn pull_snapshot_from(
worker: &WorkerTask,
- reader: Arc<BackupReader>,
+ source: BackupSource,
snapshot: &pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
@@ -517,7 +591,7 @@ async fn pull_snapshot_from(
if is_new {
task_log!(worker, "sync snapshot {}", snapshot.dir());
- if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+ if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await {
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
snapshot.backup_ns(),
snapshot.as_ref(),
@@ -530,7 +604,7 @@ async fn pull_snapshot_from(
task_log!(worker, "sync snapshot {} done", snapshot.dir());
} else {
task_log!(worker, "re-sync snapshot {}", snapshot.dir());
- pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
+ pull_snapshot(worker, source, snapshot, downloaded_chunks).await?;
task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
}
@@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo {
/// - local group owner is already checked by pull_store
async fn pull_group(
worker: &WorkerTask,
- client: &HttpClient,
+ client: Option<&HttpClient>,
params: &PullParameters,
group: &pbs_api_types::BackupGroup,
remote_ns: BackupNamespace,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/snapshots",
- params.source.store()
- );
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !remote_ns.is_root() {
- args["ns"] = serde_json::to_value(&remote_ns)?;
- }
-
let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
+ let mut list: Vec<SnapshotListItem> = if let Some(client) = client {
+ let path = format!(
+ "api2/json/admin/datastore/{}/snapshots",
+ params.source.store()
+ );
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
- list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
+ if !remote_ns.is_root() {
+ args["ns"] = serde_json::to_value(&remote_ns)?;
+ }
- client.login().await?; // make sure auth is complete
+ let mut result = client.get(&path, Some(args)).await?;
+ serde_json::from_value(result["data"].take())?
+ } else {
+ let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+ proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+ let source_ns = if remote_ns.is_root() {
+ None
+ } else {
+ Some(remote_ns.clone())
+ };
+ crate::api2::admin::datastore::do_list_snapshots(
+ params.source.store().to_string(),
+ source_ns,
+ Some(group.ty),
+ Some(group.id.clone()),
+ &mut rpcenv,
+ )
+ .await?
+ };
+ list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
- let fingerprint = client.fingerprint();
+ if let Some(client) = client {
+ client.login().await?; // make sure auth is complete
+ }
let last_sync = params.store.last_successful_backup(&target_ns, group)?;
@@ -646,6 +736,13 @@ async fn pull_group(
count: 0,
};
+ let datastore: Option<Arc<DataStore>> = match client {
+ None => Some(DataStore::lookup_datastore(
+ params.source.store(),
+ Some(Operation::Read),
+ )?),
+ _ => None,
+ };
for (pos, item) in list.into_iter().enumerate() {
let snapshot = item.backup;
@@ -668,33 +765,47 @@ async fn pull_group(
}
}
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
-
- let options =
- HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
- .rate_limit(params.limit.clone());
+ let backup_source = if let Some(client) = client {
+ // get updated auth_info (new tickets)
+ let auth_info = client.login().await?;
+ let fingerprint = client.fingerprint();
- let new_client = HttpClient::new(
- params.source.host(),
- params.source.port(),
- params.source.auth_id(),
- options,
- )?;
-
- let reader = BackupReader::start(
- new_client,
- None,
- params.source.store(),
- &remote_ns,
- &snapshot,
- true,
- )
- .await?;
+ let options = HttpClientOptions::new_non_interactive(
+ auth_info.ticket.clone(),
+ fingerprint.clone(),
+ )
+ .rate_limit(params.limit.clone());
+
+ let new_client = HttpClient::new(
+ params.source.host(),
+ params.source.port(),
+ params.source.auth_id(),
+ options,
+ )?;
+
+ BackupSource::Remote(
+ BackupReader::start(
+ new_client,
+ None,
+ params.source.store(),
+ &remote_ns,
+ &snapshot,
+ true,
+ )
+ .await?,
+ )
+ } else {
+ if let Some(datastore) = datastore.clone() {
+ BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?)
+ } else {
+ unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier")
+ }
+ };
let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
- let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+ let result =
+ pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await;
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
@@ -735,49 +846,64 @@ async fn pull_group(
// will modify params if switching to backwards mode for lack of NS support on remote end
async fn query_namespaces(
worker: &WorkerTask,
- client: &HttpClient,
+ client: Option<&HttpClient>,
params: &mut PullParameters,
) -> Result<Vec<BackupNamespace>, Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/namespace",
- params.source.store()
- );
- let mut data = json!({});
- if let Some(max_depth) = params.max_depth {
- data["max-depth"] = json!(max_depth);
- }
+ let mut list: Vec<NamespaceListItem> = if let Some(client) = client {
+ let path = format!(
+ "api2/json/admin/datastore/{}/namespace",
+ params.source.store()
+ );
+ let mut data = json!({});
+ if let Some(max_depth) = params.max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
- if !params.remote_ns.is_root() {
- data["parent"] = json!(params.remote_ns);
- }
+ if !params.remote_ns.is_root() {
+ data["parent"] = json!(params.remote_ns);
+ }
- let mut result = match client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- if params.remote_ns.is_root() && params.max_depth.is_none() {
- task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- params.max_depth = Some(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
+ let mut result = match client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ if params.remote_ns.is_root() && params.max_depth.is_none() {
+ task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ params.max_depth = Some(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
- return Ok(vec![params.remote_ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ return Ok(vec![params.remote_ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
}
},
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
+ };
+ serde_json::from_value(result["data"].take())?
+ } else {
+ let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+ proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+ let parent_ns = if params.remote_ns.is_root() {
+ None
+ } else {
+ Some(params.remote_ns.clone())
+ };
+ crate::api2::admin::namespace::list_namespaces(
+ params.source.store().to_string(),
+ parent_ns,
+ params.max_depth,
+ &mut rpcenv,
+ )?
+ };
// parents first
list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
@@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns(
/// - access to sub-NS checked here
pub(crate) async fn pull_store(
worker: &WorkerTask,
- client: &HttpClient,
+ client: Option<&HttpClient>,
mut params: PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
@@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store(
/// - owner check for vanished groups done here
pub(crate) async fn pull_ns(
worker: &WorkerTask,
- client: &HttpClient,
+ client: Option<&HttpClient>,
params: &PullParameters,
source_ns: BackupNamespace,
target_ns: BackupNamespace,
) -> Result<(StoreProgress, bool), Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
+ let mut list: Vec<GroupListItem> = if let Some(client) = client {
+ let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
- let args = if !source_ns.is_root() {
- Some(json!({
- "ns": source_ns,
- }))
- } else {
- None
- };
+ let args = if !source_ns.is_root() {
+ Some(json!({
+ "ns": source_ns,
+ }))
+ } else {
+ None
+ };
- let mut result = client
- .get(&path, args)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
+ let mut result = client
+ .get(&path, args)
+ .await
+ .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ serde_json::from_value(result["data"].take())?
+ } else {
+ let mut rpcenv = proxmox_router::cli::CliEnvironment::new();
+ proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root at pam")));
+ let source_ns = if source_ns.is_root() {
+ None
+ } else {
+ Some(source_ns.clone())
+ };
+ crate::api2::admin::datastore::list_groups(
+ params.source.store().to_string(),
+ source_ns,
+ &mut rpcenv,
+ )?
+ };
let total_count = list.len();
list.sort_unstable_by(|a, b| {
--
2.30.2
More information about the pbs-devel
mailing list