[pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling
Hannes Laimer
h.laimer at proxmox.com
Thu Feb 23 13:55:40 CET 2023
... and rewrite pull logic.
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
pbs-api-types/src/datastore.rs | 2 +-
pbs-datastore/src/read_chunk.rs | 2 +-
src/api2/pull.rs | 13 +-
src/server/pull.rs | 1023 +++++++++++++++++++------------
4 files changed, 648 insertions(+), 392 deletions(-)
diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
index 72e8d1ee..9a692b08 100644
--- a/pbs-api-types/src/datastore.rs
+++ b/pbs-api-types/src/datastore.rs
@@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
/// Uniquely identify a Backup (relative to data store)
///
/// We also call this a backup snaphost.
-#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
+#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct BackupDir {
/// Backup group.
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
}
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
/// Returns the encoded chunk data
fn read_raw_chunk<'a>(
&'a self,
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index bb8f6fe1..2966190c 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -121,8 +121,8 @@ pub fn do_sync_job(
let sync_job2 = sync_job.clone();
let worker_future = async move {
- let pull_params = PullParameters::try_from(&sync_job)?;
- let client = pull_params.client().await?;
+ let mut pull_params = PullParameters::try_from(&sync_job)?;
+ pull_params.init_source(sync_job.limit).await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule {
@@ -137,7 +137,7 @@ pub fn do_sync_job(
);
if sync_job.remote.is_some() {
- pull_store(&worker, &client, pull_params).await?;
+ pull_store(&worker, pull_params).await?;
} else {
match (sync_job.ns, sync_job.remote_ns) {
(Some(target_ns), Some(source_ns))
@@ -280,7 +280,7 @@ async fn pull(
delete,
)?;
- let pull_params = PullParameters::new(
+ let mut pull_params = PullParameters::new(
&store,
ns,
remote.as_deref(),
@@ -290,9 +290,8 @@ async fn pull(
remove_vanished,
max_depth,
group_filter,
- limit,
)?;
- let client = pull_params.client().await?;
+ pull_params.init_source(limit).await?;
// fixme: set to_stdout to false?
// FIXME: add namespace to worker id?
@@ -310,7 +309,7 @@ async fn pull(
remote_store,
);
- let pull_future = pull_store(&worker, &client, pull_params);
+ let pull_future = pull_store(&worker, pull_params);
(select! {
success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..d3be39da 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
+use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
use proxmox_sys::task_log;
+use serde_json::json;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
+ print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-
-use pbs_client::{
- BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{
+ check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
- /// Remote that is pulled from
- remote: Remote,
- /// Full specification of remote datastore
- source: BackupRepository,
- /// Local store that is pulled into
- store: Arc<DataStore>,
- /// Remote namespace
- remote_ns: BackupNamespace,
- /// Local namespace (anchor)
- ns: BackupNamespace,
+ /// Where data is pulled from
+ source: PullSource,
+ /// Where data should be pulled into
+ target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
max_depth: Option<usize>,
/// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
- /// Rate limits for all transfers from `remote`
- limit: RateLimitConfig,
+}
+
+pub(crate) enum PullSource {
+ Remote(RemoteSource),
+ Local(LocalSource),
+}
+
+pub(crate) struct PullTarget {
+ store: Arc<DataStore>,
+ ns: BackupNamespace,
+}
+
+pub(crate) struct LocalSource {
+ store: Arc<DataStore>,
+ ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+ remote: Remote,
+ repo: BackupRepository,
+ ns: BackupNamespace,
+ client: Option<HttpClient>,
+ backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,
+}
+
+impl PullSource {
+ pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+ match self {
+ PullSource::Remote(source) => {
+ source.client.replace(
+ crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
+ );
+ }
+ PullSource::Local(_) => {}
+ };
+ Ok(())
+ }
+
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ match &self {
+ PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,
+ PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
+ source.store.clone(),
+ source.ns.clone(),
+ max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+ )?
+ .collect(),
+ }
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {
+ match &self {
+ PullSource::Remote(source) => {
+ let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ let client = source.get_client()?;
+ client.login().await?;
+ let mut result = client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(|e| Error::from(e))?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<pbs_api_types::BackupGroup>>(),
+ )
+ }
+ PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
+ &source.store,
+ namespace.clone(),
+ MAX_NAMESPACE_DEPTH,
+ None,
+ None,
+ Some(owner),
+ )?
+ .filter_map(Result::ok)
+ .map(|backup_group| backup_group.group().clone())
+ .collect::<Vec<pbs_api_types::BackupGroup>>()),
+ }
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &pbs_api_types::BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
+ match &self {
+ PullSource::Remote(source) => {
+ let path = format!(
+ "api2/json/admin/datastore/{}/snapshots",
+ source.repo.store()
+ );
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !source.ns.is_root() {
+ args["ns"] = serde_json::to_value(&source.ns)?;
+ }
+
+ let client = source.get_client()?;
+ client.login().await?;
+
+ let mut result = client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> =
+ serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ task_log!(
+ worker,
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ );
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+ PullSource::Local(source) => Ok(source
+ .store
+ .backup_group(namespace.clone(), group.clone())
+ .iter_snapshots()?
+ .filter_map(Result::ok)
+ .map(|snapshot| snapshot.dir().to_owned())
+ .collect::<Vec<BackupDir>>()),
+ }
+ }
+
+ /// Load file from source namespace and BackupDir into file
+ async fn load_file_into(
+ &mut self,
+ namespace: &BackupNamespace,
+ snapshot: &pbs_api_types::BackupDir,
+ filename: &str,
+ into: &PathBuf,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ match self {
+ PullSource::Remote(ref mut source) => {
+ let client = source.get_client()?;
+ client.login().await?;
+
+ let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
+ reader.clone()
+ } else {
+ let backup_reader = BackupReader::start(
+ client,
+ None,
+ source.repo.store(),
+ namespace,
+ snapshot,
+ true,
+ )
+ .await?;
+ source
+ .backup_reader
+ .insert(snapshot.clone(), backup_reader.clone());
+ backup_reader
+ };
+
+ let download_result = reader.download(filename, &mut tmp_file).await;
+
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ task_log!(
+ worker,
+ "skipping snapshot {} - vanished since start of sync",
+ snapshot,
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ }
+ PullSource::Local(source) => {
+ let dir = source
+ .store
+ .backup_dir(namespace.clone(), snapshot.clone())?;
+ let mut from_path = dir.full_path();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ }
+ }
+
+ tmp_file.seek(SeekFrom::Start(0))?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ // Note: The client.log.blob is uploaded after the backup, so it is
+ // not mentioned in the manifest.
+ async fn try_download_client_log(
+ &self,
+ from_snapshot: &pbs_api_types::BackupDir,
+ to_path: &std::path::Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ match &self {
+ PullSource::Remote(source) => {
+ let reader = source
+ .backup_reader
+ .get(from_snapshot)
+ .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {:?} failed - {}", to_path, err);
+ }
+ task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+ }
+
+ Ok(())
+ }
+ PullSource::Local(_) => Ok(()),
+ }
+ }
+
+ fn get_chunk_reader(
+ &self,
+ snapshot: &pbs_api_types::BackupDir,
+ crypt_mode: CryptMode,
+ ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+ Ok(match &self {
+ PullSource::Remote(source) => {
+ if let Some(reader) = source.backup_reader.get(snapshot) {
+ Arc::new(RemoteChunkReader::new(
+ reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ } else {
+ bail!("No initialized BackupReader!")
+ }
+ }
+ PullSource::Local(source) => Arc::new(LocalChunkReader::new(
+ source.store.clone(),
+ None,
+ crypt_mode,
+ )),
+ })
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ match &self {
+ PullSource::Remote(source) => source.ns.clone(),
+ PullSource::Local(source) => source.ns.clone(),
+ }
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ match &self {
+ PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
+ PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
+ }
+ }
+}
+
+impl RemoteSource {
+ fn get_client(&self) -> Result<&HttpClient, Error> {
+ if let Some(client) = &self.client {
+ Ok(client)
+ } else {
+ bail!("RemoteSource not initialized")
+ }
+ }
}
impl PullParameters {
/// Creates a new instance of `PullParameters`.
- ///
- /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
- /// [BackupRepository] with `remote_store`.
- #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
remote_ns: BackupNamespace,
owner: Authid,
remove_vanished: Option<bool>,
max_depth: Option<usize>,
group_filter: Option<Vec<GroupFilter>>,
- limit: RateLimitConfig,
) -> Result<Self, Error> {
- let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
remote_ns.check_max_depth(max_depth)?;
- }
+ };
+ let remove_vanished = remove_vanished.unwrap_or(false);
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
+ let source: PullSource = if let Some(remote) = remote {
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
- let remove_vanished = remove_vanished.unwrap_or(false);
+ let repo = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
- let source = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
+ PullSource::Remote(RemoteSource {
+ remote,
+ repo,
+ ns: remote_ns.clone(),
+ client: None,
+ backup_reader: HashMap::new(),
+ })
+ } else {
+ PullSource::Local(LocalSource {
+ store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+ ns: remote_ns,
+ })
+ };
+ let target = PullTarget {
+ store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
+ ns,
+ };
Ok(Self {
- remote,
- remote_ns,
- ns,
source,
- store,
+ target,
owner,
remove_vanished,
max_depth,
group_filter,
- limit,
})
}
- /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
- pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+ pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+ self.source.init(limit).await
+ }
+
+ pub(crate) fn skip_chunk_sync(&self) -> bool {
+ match &self.source {
+ PullSource::Local(source) => source.store.name() == self.target.store.name(),
+ PullSource::Remote(_) => false,
+ }
+ }
+
+ pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
+ let source_ns = self.source.get_ns();
+ source_ns.map_prefix(&source_ns, &self.target.ns)
}
}
+async fn list_remote_namespaces(
+ source: &RemoteSource,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+) -> Result<Vec<BackupNamespace>, Error> {
+ if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ vec![source.ns.clone()];
+ }
+
+ let path = format!(
+ "api2/json/admin/datastore/{}/namespace",
+ source.repo.store()
+ );
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !source.ns.is_root() {
+ data["parent"] = json!(source.ns);
+ }
+
+ let client = source.get_client()?;
+ client.login().await?;
+
+ let mut result = match client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if source.ns.is_root() && max_depth.is_none() {
+ task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![source.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<pbs_api_types::BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .iter()
+ .map(|list_item| list_item.ns.clone())
+ .collect();
+
+ Ok(list)
+}
+
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
+ chunk_reader: Arc<dyn AsyncReadChunk>,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
Ok(())
}
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
- let mut tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(filename)?;
-
- reader
- .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
- .await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
if size != info.size {
bail!(
@@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// Pulls a single file referenced by a manifest.
///
/// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
+/// - Load archive file into tmp file
/// - Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
async fn pull_single_archive(
worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- snapshot: &pbs_datastore::BackupDir,
+ params: &mut PullParameters,
+ from_namespace: &BackupNamespace,
+ from_snapshot: &pbs_api_types::BackupDir,
+ to_snapshot: &pbs_datastore::BackupDir,
archive_info: &FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
- let mut path = snapshot.full_path();
+ let mut path = to_snapshot.full_path();
path.push(archive_name);
let mut tmp_path = path.clone();
@@ -273,13 +636,18 @@ async fn pull_single_archive(
task_log!(worker, "sync archive {}", archive_name);
- let mut tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
+ params
+ .source
+ .load_file_into(
+ from_namespace,
+ from_snapshot,
+ archive_name,
+ &tmp_path,
+ worker,
+ )
+ .await?;
- reader.download(archive_name, &mut tmpfile).await?;
+ let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
@@ -289,14 +657,20 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if params.skip_chunk_sync() {
+ task_log!(worker, "skipping chunk sync for same datatsore");
+ } else {
+ pull_index_chunks(
+ worker,
+ params
+ .source
+ .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+ params.target.store.clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -305,14 +679,20 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if params.skip_chunk_sync() {
+ task_log!(worker, "skipping chunk sync for same datatsore");
+ } else {
+ pull_index_chunks(
+ worker,
+ params
+ .source
+ .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+ params.target.store.clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::Blob => {
tmpfile.seek(SeekFrom::Start(0))?;
@@ -326,33 +706,6 @@ async fn pull_single_archive(
Ok(())
}
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- path: &std::path::Path,
-) -> Result<(), Error> {
- let mut tmp_path = path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
- if let Err(err) = std::fs::rename(&tmp_path, path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
- }
-
- Ok(())
-}
-
/// Actual implementation of pulling a snapshot.
///
/// Pulling a snapshot consists of the following steps:
@@ -364,44 +717,37 @@ async fn try_client_log_download(
/// - Download log if not already existing
async fn pull_snapshot(
worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+ params: &mut PullParameters,
+ namespace: &BackupNamespace,
+ from_snapshot: &pbs_api_types::BackupDir,
+ to_snapshot: &pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
- let mut manifest_name = snapshot.full_path();
+ let mut manifest_name = to_snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
- let mut client_log_name = snapshot.full_path();
+ let mut client_log_name = to_snapshot.full_path();
client_log_name.push(CLIENT_LOG_BLOB_NAME);
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- task_log!(
- worker,
- "skipping snapshot {} - vanished since start of sync",
- snapshot.dir(),
- );
- return Ok(());
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- }
- };
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+ let tmp_manifest_blob;
+ if let Some(data) = params
+ .source
+ .load_file_into(
+ namespace,
+ from_snapshot,
+ MANIFEST_BLOB_NAME,
+ &tmp_manifest_name,
+ worker,
+ )
+ .await?
+ {
+ tmp_manifest_blob = data;
+ } else {
+ return Ok(());
+ }
if manifest_name.exists() {
let manifest_blob = proxmox_lang::try_block!({
@@ -418,8 +764,11 @@ async fn pull_snapshot(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
+ params
+ .source
+ .try_download_client_log(from_snapshot, &client_log_name, worker)
+ .await?;
+ };
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
@@ -429,7 +778,7 @@ async fn pull_snapshot(
let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
for item in manifest.files() {
- let mut path = snapshot.full_path();
+ let mut path = to_snapshot.full_path();
path.push(&item.filename);
if path.exists() {
@@ -467,18 +816,12 @@ async fn pull_snapshot(
}
}
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
pull_single_archive(
worker,
- &reader,
- &mut chunk_reader,
- snapshot,
+ params,
+ namespace,
+ from_snapshot,
+ to_snapshot,
item,
downloaded_chunks.clone(),
)
@@ -490,10 +833,12 @@ async fn pull_snapshot(
}
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
-
- snapshot
+ params
+ .source
+ .try_download_client_log(from_snapshot, &client_log_name, worker)
+ .await?;
+ };
+ to_snapshot
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -501,37 +846,53 @@ async fn pull_snapshot(
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
-///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
async fn pull_snapshot_from(
worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+ params: &mut PullParameters,
+ namespace: &BackupNamespace,
+ from_snapshot: &pbs_api_types::BackupDir,
+ to_snapshot: &pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
- let (_path, is_new, _snap_lock) = snapshot
+ let (_path, is_new, _snap_lock) = to_snapshot
.datastore()
- .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+ .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
if is_new {
- task_log!(worker, "sync snapshot {}", snapshot.dir());
+ task_log!(worker, "sync snapshot {}", to_snapshot.dir());
- if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
- if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
- snapshot.backup_ns(),
- snapshot.as_ref(),
+ if let Err(err) = pull_snapshot(
+ worker,
+ params,
+ namespace,
+ from_snapshot,
+ to_snapshot,
+ downloaded_chunks,
+ )
+ .await
+ {
+ if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
+ to_snapshot.backup_ns(),
+ to_snapshot.as_ref(),
true,
) {
task_log!(worker, "cleanup error - {}", cleanup_err);
}
return Err(err);
}
- task_log!(worker, "sync snapshot {} done", snapshot.dir());
+ task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
} else {
- task_log!(worker, "re-sync snapshot {}", snapshot.dir());
- pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
- task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
+ task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
+ pull_snapshot(
+ worker,
+ params,
+ namespace,
+ from_snapshot,
+ to_snapshot,
+ downloaded_chunks,
+ )
+ .await?;
+ task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
}
Ok(())
@@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
/// - Sort by snapshot time
/// - Get last snapshot timestamp on local datastore
/// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
///
@@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
/// - local group owner is already checked by pull_store
async fn pull_group(
worker: &WorkerTask,
- client: &HttpClient,
- params: &PullParameters,
+ params: &mut PullParameters,
+ source_namespace: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
- remote_ns: BackupNamespace,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/snapshots",
- params.source.store()
- );
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !remote_ns.is_root() {
- args["ns"] = serde_json::to_value(&remote_ns)?;
- }
-
- let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
- list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
- client.login().await?; // make sure auth is complete
-
- let fingerprint = client.fingerprint();
-
- let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-
- let mut remote_snapshots = std::collections::HashSet::new();
-
- // start with 65536 chunks (up to 256 GiB)
- let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
- progress.group_snapshots = list.len() as u64;
+ let target_ns = params.get_target_ns()?;
+ let mut source_snapshots = HashSet::new();
+ let last_sync = params
+ .target
+ .store
+ .last_successful_backup(&target_ns, group)?;
let mut skip_info = SkipInfo {
oldest: i64::MAX,
newest: i64::MIN,
count: 0,
};
- for (pos, item) in list.into_iter().enumerate() {
- let snapshot = item.backup;
-
- // in-progress backups can't be synced
- if item.size.is_none() {
- task_log!(
- worker,
- "skipping snapshot {} - in-progress backup",
- snapshot
- );
- continue;
- }
+ let mut list: Vec<BackupDir> = params
+ .source
+ .list_backup_dirs(source_namespace, group, worker)
+ .await?
+ .into_iter()
+ .filter(|dir| {
+ source_snapshots.insert(dir.time);
+ if let Some(last_sync_time) = last_sync {
+ if last_sync_time > dir.time {
+ skip_info.update(dir.time);
+ return false;
+ }
+ }
+ true
+ })
+ .collect();
- remote_snapshots.insert(snapshot.time);
+ list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
- if let Some(last_sync_time) = last_sync {
- if last_sync_time > snapshot.time {
- skip_info.update(snapshot.time);
- continue;
- }
- }
+ // start with 65536 chunks (up to 256 GiB)
+ let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
-
- let options =
- HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
- .rate_limit(params.limit.clone());
-
- let new_client = HttpClient::new(
- params.source.host(),
- params.source.port(),
- params.source.auth_id(),
- options,
- )?;
-
- let reader = BackupReader::start(
- new_client,
- None,
- params.source.store(),
- &remote_ns,
- &snapshot,
- true,
- )
- .await?;
+ progress.group_snapshots = list.len() as u64;
- let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+ for (pos, from_snapshot) in list.into_iter().enumerate() {
+ let to_snapshot = params
+ .target
+ .store
+ .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
- let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+ let result = pull_snapshot_from(
+ worker,
+ params,
+ source_namespace,
+ &from_snapshot,
+ &to_snapshot,
+ downloaded_chunks.clone(),
+ )
+ .await;
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
@@ -703,11 +1025,14 @@ async fn pull_group(
}
if params.remove_vanished {
- let group = params.store.backup_group(target_ns.clone(), group.clone());
+ let group = params
+ .target
+ .store
+ .backup_group(target_ns.clone(), group.clone());
let local_list = group.list_backups()?;
for info in local_list {
let snapshot = info.backup_dir;
- if remote_snapshots.contains(&snapshot.backup_time()) {
+ if source_snapshots.contains(&snapshot.backup_time()) {
continue;
}
if snapshot.is_protected() {
@@ -720,6 +1045,7 @@ async fn pull_group(
}
task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
params
+ .target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
}
@@ -732,64 +1058,12 @@ async fn pull_group(
Ok(())
}
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
- worker: &WorkerTask,
- client: &HttpClient,
- params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/namespace",
- params.source.store()
- );
- let mut data = json!({});
- if let Some(max_depth) = params.max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !params.remote_ns.is_root() {
- data["parent"] = json!(params.remote_ns);
- }
-
- let mut result = match client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- if params.remote_ns.is_root() && params.max_depth.is_none() {
- task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- params.max_depth = Some(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![params.remote_ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
- // parents first
- list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
- Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
let mut created = false;
- let store_ns_str = print_store_and_ns(params.store.name(), ns);
+ let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
- if !ns.is_root() && !params.store.namespace_path(ns).exists() {
- check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
+ if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+ check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
.map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
let name = match ns.components().last() {
@@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
};
- if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+ if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
}
created = true;
}
check_ns_privs(
- params.store.name(),
+ params.target.store.name(),
ns,
¶ms.owner,
PRIV_DATASTORE_BACKUP,
@@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
- check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
+ check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
.map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
- params.store.remove_namespace_recursive(local_ns, true)
+ params
+ .target
+ .store
+ .remove_namespace_recursive(local_ns, true)
}
fn check_and_remove_vanished_ns(
@@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
// clamp like remote does so that we don't list more than we can ever have synced.
let max_depth = params
.max_depth
- .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+ .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
let mut local_ns_list: Vec<BackupNamespace> = params
+ .target
.store
- .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+ .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
.filter(|ns| {
let user_privs =
- user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
+ user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
})
.collect();
@@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
for local_ns in local_ns_list {
- if local_ns == params.ns {
+ if local_ns == params.target.ns {
continue;
}
@@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
/// - access to sub-NS checked here
pub(crate) async fn pull_store(
worker: &WorkerTask,
- client: &HttpClient,
mut params: PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
let old_max_depth = params.max_depth;
- let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
- vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
- } else {
- query_namespaces(worker, client, &mut params).await?
- };
+ let mut namespaces = params
+ .source
+ .list_namespaces(&mut params.max_depth, worker)
+ .await?;
errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+ namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
for namespace in namespaces {
- let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+ let source_store_ns_str = params.source.print_store_and_ns();
- let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
- let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+ let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
task_log!(worker, "----");
task_log!(
@@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
}
}
- match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
+ match pull_ns(worker, &namespace, &mut params).await {
Ok((ns_progress, ns_errors)) => {
errors |= ns_errors;
@@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
task_log!(
worker,
"Encountered errors while syncing namespace {} - {}",
- namespace,
+ &namespace,
err,
);
}
@@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
/// - owner check for vanished groups done here
pub(crate) async fn pull_ns(
worker: &WorkerTask,
- client: &HttpClient,
- params: &PullParameters,
- source_ns: BackupNamespace,
- target_ns: BackupNamespace,
+ namespace: &BackupNamespace,
+ params: &mut PullParameters,
) -> Result<(StoreProgress, bool), Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
- let args = if !source_ns.is_root() {
- Some(json!({
- "ns": source_ns,
- }))
- } else {
- None
- };
-
- let mut result = client
- .get(&path, args)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ let mut list: Vec<pbs_api_types::BackupGroup> =
+ params.source.list_groups(namespace, ¶ms.owner).await?;
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup.ty.cmp(&b.backup.ty);
+ let type_order = a.ty.cmp(&b.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup.id.cmp(&b.backup.id)
+ a.id.cmp(&b.id)
} else {
type_order
}
@@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
filters.iter().any(|filter| group.matches(filter))
};
- // Get groups with target NS set
- let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
let list: Vec<pbs_api_types::BackupGroup> = list
@@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
let mut progress = StoreProgress::new(list.len() as u64);
+ let target_ns = params.get_target_ns()?;
for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
@@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
let (owner, _lock_guard) =
match params
+ .target
.store
.create_locked_backup_group(&target_ns, &group, ¶ms.owner)
{
@@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
err
);
errors = true; // do not stop here, instead continue
+ task_log!(worker, "create_locked_backup_group failed");
continue;
}
};
@@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- params,
- &group,
- source_ns.clone(),
- &mut progress,
- )
- .await
+ } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
{
task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
@@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
let local_group = local_group?;
let local_group = local_group.group();
if new_groups.contains(local_group) {
continue;
}
- let owner = params.store.get_owner(&target_ns, local_group)?;
+ let owner = params.target.store.get_owner(&target_ns, local_group)?;
if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
@@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
}
}
task_log!(worker, "delete vanished group '{local_group}'",);
- match params.store.remove_backup_group(&target_ns, local_group) {
+ match params
+ .target
+ .store
+ .remove_backup_group(&target_ns, local_group)
+ {
Ok(true) => {}
Ok(false) => {
task_log!(
--
2.30.2
More information about the pbs-devel
mailing list