[pbs-devel] [PATCH proxmox-backup v4 5/6] pull: refactor pulling from a datastore
Hannes Laimer
h.laimer at proxmox.com
Fri Sep 29 14:49:00 CEST 2023
... making the pull logic independent from the actual source
using two traits.
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
Cargo.toml | 2 +
pbs-datastore/src/read_chunk.rs | 2 +-
src/api2/config/remote.rs | 14 +-
src/api2/pull.rs | 31 +-
src/server/pull.rs | 936 ++++++++++++++++++--------------
5 files changed, 564 insertions(+), 421 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 6e848843..570e1673 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
# regular crates
anyhow = "1.0"
+async-trait = "0.1.56"
apt-pkg-native = "0.3.2"
base64 = "0.13"
bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
[dependencies]
anyhow.workspace = true
+async-trait.workspace = true
apt-pkg-native.workspace = true
base64.workspace = true
bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
}
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
/// Returns the encoded chunk data
fn read_raw_chunk<'a>(
&'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 307cf3cd..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
Ok(())
}
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
remote: &Remote,
limit: Option<RateLimitConfig>,
) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
&remote.config.auth_id,
options,
)?;
+
+ Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+ remote: &Remote,
+ limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+ let client = remote_client_config(remote, limit)?;
let _auth_info = client
.login() // make sure we can auth
.await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 9ed83046..8f1aad43 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,7 +8,7 @@ use proxmox_sys::task_log;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
+ GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
TRANSFER_LAST_SCHEMA,
};
@@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
PullParameters::new(
&sync_job.store,
sync_job.ns.clone().unwrap_or_default(),
- sync_job.remote.as_deref().unwrap_or("local"),
+ sync_job.remote.as_deref(),
&sync_job.remote_store,
sync_job.remote_ns.clone().unwrap_or_default(),
sync_job
@@ -131,7 +131,6 @@ pub fn do_sync_job(
let worker_future = async move {
let pull_params = PullParameters::try_from(&sync_job)?;
- let client = pull_params.client().await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule {
@@ -148,24 +147,7 @@ pub fn do_sync_job(
sync_job.remote_store,
);
- if sync_job.remote.is_some() {
- pull_store(&worker, &client, pull_params).await?;
- } else {
- if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
- if target_ns.path().starts_with(source_ns.path())
- && sync_job.store == sync_job.remote_store
- && sync_job.max_depth.map_or(true, |sync_depth| {
- target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
- }) {
- task_log!(
- worker,
- "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
- );
- }
- } else {
- pull_store(&worker, &client, pull_params).await?;
- }
- }
+ pull_store(&worker, pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id);
@@ -294,7 +276,7 @@ async fn pull(
let pull_params = PullParameters::new(
&store,
ns,
- remote.as_deref().unwrap_or("local"),
+ remote.as_deref(),
&remote_store,
remote_ns.unwrap_or_default(),
auth_id.clone(),
@@ -304,7 +286,6 @@ async fn pull(
limit,
transfer_last,
)?;
- let client = pull_params.client().await?;
// fixme: set to_stdout to false?
// FIXME: add namespace to worker id?
@@ -322,7 +303,7 @@ async fn pull(
remote_store,
);
- let pull_future = pull_store(&worker, &client, pull_params);
+ let pull_future = pull_store(&worker, pull_params);
(select! {
success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
@@ -337,4 +318,4 @@ async fn pull(
Ok(upid_str)
}
-pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
\ No newline at end of file
+pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..ff3e6d0a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
//! Sync datastore from remote server
use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
- Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-
-use pbs_client::{
- BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
+use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
- /// Remote that is pulled from
- remote: Remote,
- /// Full specification of remote datastore
- source: BackupRepository,
- /// Local store that is pulled into
+struct RemoteReader {
+ backup_reader: Arc<BackupReader>,
+ dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
store: Arc<DataStore>,
- /// Remote namespace
- remote_ns: BackupNamespace,
- /// Local namespace (anchor)
ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+ repo: BackupRepository,
+ ns: BackupNamespace,
+ client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+ /// Lists namespaces from the source.
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error>;
+
+ /// Lists groups within a specific namespace from the source.
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error>;
+
+ /// Lists backup directories for a specific group within a specific namespace from the source.
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error>;
+ fn get_ns(&self) -> BackupNamespace;
+ fn print_store_and_ns(&self) -> String;
+
+ /// Returns a reader for reading data from a specific backup directory.
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ return Ok(vec![self.ns.clone()]);
+ }
+
+ let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !self.ns.is_root() {
+ data["parent"] = json!(self.ns);
+ }
+ self.client.login().await?;
+
+ let mut result = match self.client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if self.ns.is_root() && max_depth.is_none() {
+ task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![self.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .into_iter()
+ .map(|list_item| list_item.ns)
+ .collect();
+
+ Ok(list)
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ _owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ self.client.login().await?;
+ let mut result =
+ self.client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(Error::from)?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<BackupGroup>>(),
+ )
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ _namespace: &BackupNamespace,
+ group: &BackupGroup,
+ worker: &WorkerTask,
+ ) -> Result<Vec<BackupDir>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !self.ns.is_root() {
+ args["ns"] = serde_json::to_value(&self.ns)?;
+ }
+
+ self.client.login().await?;
+
+ let mut result = self.client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ task_log!(
+ worker,
+ "skipping snapshot {} - in-progress backup",
+ snapshot
+ );
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn print_store_and_ns(&self) -> String {
+ print_store_and_ns(self.repo.store(), &self.ns)
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn PullReader>, Error> {
+ let backup_reader =
+ BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+ Ok(Arc::new(RemoteReader {
+ backup_reader,
+ dir: dir.clone(),
+ }))
+ }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(
+ &self,
+ filename: &str,
+ into: &Path,
+ worker: &WorkerTask,
+ ) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ task_log!(
+ worker,
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir,
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(
+ &self,
+ to_path: &Path,
+ worker: &WorkerTask,
+ ) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {:?} failed - {}", to_path, err);
+ }
+ task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+ /// Where data is pulled from
+ source: Arc<dyn PullSource>,
+ /// Where data should be pulled into
+ target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
max_depth: Option<usize>,
/// Filters for reducing the pull scope
group_filter: Option<Vec<GroupFilter>>,
- /// Rate limits for all transfers from `remote`
- limit: RateLimitConfig,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
}
impl PullParameters {
/// Creates a new instance of `PullParameters`.
- ///
- /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
- /// [BackupRepository] with `remote_store`.
- #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
store: &str,
ns: BackupNamespace,
- remote: &str,
+ remote: Option<&str>,
remote_store: &str,
remote_ns: BackupNamespace,
owner: Authid,
@@ -82,49 +376,51 @@ impl PullParameters {
limit: RateLimitConfig,
transfer_last: Option<usize>,
) -> Result<Self, Error> {
- let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
remote_ns.check_max_depth(max_depth)?;
- }
-
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
-
+ };
let remove_vanished = remove_vanished.unwrap_or(false);
- let source = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
+ let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
- Ok(Self {
- remote,
- remote_ns,
+ let repo = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+ let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+ Arc::new(RemoteSource {
+ repo,
+ ns: remote_ns,
+ client,
+ })
+ } else {
+ bail!("local sync not implemented yet")
+ };
+ let target = PullTarget {
+ store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
ns,
+ };
+
+ Ok(Self {
source,
- store,
+ target,
owner,
remove_vanished,
max_depth,
group_filter,
- limit,
transfer_last,
})
}
-
- /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
- pub async fn client(&self) -> Result<HttpClient, Error> {
- crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
- }
}
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
- chunk_reader: RemoteChunkReader,
+ chunk_reader: Arc<dyn AsyncReadChunk>,
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +511,6 @@ async fn pull_index_chunks<I: IndexFile>(
Ok(())
}
-async fn download_manifest(
- reader: &BackupReader,
- filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
- let mut tmp_manifest_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(filename)?;
-
- reader
- .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
- .await?;
-
- tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
- Ok(tmp_manifest_file)
-}
-
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
if size != info.size {
bail!(
@@ -255,17 +531,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// Pulls a single file referenced by a manifest.
///
/// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive(
- worker: &WorkerTask,
- reader: &BackupReader,
- chunk_reader: &mut RemoteChunkReader,
- snapshot: &pbs_datastore::BackupDir,
- archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
+ archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let archive_name = &archive_info.filename;
@@ -277,13 +552,11 @@ async fn pull_single_archive(
task_log!(worker, "sync archive {}", archive_name);
- let mut tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
+ reader
+ .load_file_into(archive_name, &tmp_path, worker)
+ .await?;
- reader.download(archive_name, &mut tmpfile).await?;
+ let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
match archive_type(archive_name)? {
ArchiveType::DynamicIndex => {
@@ -293,14 +566,18 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +586,21 @@ async fn pull_single_archive(
let (csum, size) = index.compute_csum();
verify_archive(archive_info, &csum, size)?;
- pull_index_chunks(
- worker,
- chunk_reader.clone(),
- snapshot.datastore().clone(),
- index,
- downloaded_chunks,
- )
- .await?;
+ if reader.skip_chunk_sync(snapshot.datastore().name()) {
+ task_log!(worker, "skipping chunk sync for same datastore");
+ } else {
+ pull_index_chunks(
+ worker,
+ reader.chunk_reader(archive_info.crypt_mode),
+ snapshot.datastore().clone(),
+ index,
+ downloaded_chunks,
+ )
+ .await?;
+ }
}
ArchiveType::Blob => {
- tmpfile.seek(SeekFrom::Start(0))?;
+ tmpfile.rewind()?;
let (csum, size) = sha256(&mut tmpfile)?;
verify_archive(archive_info, &csum, size)?;
}
@@ -330,33 +611,6 @@ async fn pull_single_archive(
Ok(())
}
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- path: &std::path::Path,
-) -> Result<(), Error> {
- let mut tmp_path = path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
- if let Err(err) = std::fs::rename(&tmp_path, path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
- }
- task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
- }
-
- Ok(())
-}
-
/// Actual implementation of pulling a snapshot.
///
/// Pulling a snapshot consists of the following steps:
@@ -366,10 +620,10 @@ async fn try_client_log_download(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let mut manifest_name = snapshot.full_path();
@@ -380,32 +634,15 @@ async fn pull_snapshot(
let mut tmp_manifest_name = manifest_name.clone();
tmp_manifest_name.set_extension("tmp");
-
- let download_res = download_manifest(&reader, &tmp_manifest_name).await;
- let mut tmp_manifest_file = match download_res {
- Ok(manifest_file) => manifest_file,
- Err(err) => {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- task_log!(
- worker,
- "skipping snapshot {} - vanished since start of sync",
- snapshot.dir(),
- );
- return Ok(());
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- }
- };
- let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+ let tmp_manifest_blob;
+ if let Some(data) = reader
+ .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+ .await?
+ {
+ tmp_manifest_blob = data;
+ } else {
+ return Ok(());
+ }
if manifest_name.exists() {
let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +659,10 @@ async fn pull_snapshot(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(()); // nothing changed
@@ -471,17 +710,9 @@ async fn pull_snapshot(
}
}
- let mut chunk_reader = RemoteChunkReader::new(
- reader.clone(),
- None,
- item.chunk_crypt_mode(),
- HashMap::new(),
- );
-
pull_single_archive(
worker,
- &reader,
- &mut chunk_reader,
+ reader.clone(),
snapshot,
item,
downloaded_chunks.clone(),
@@ -494,9 +725,10 @@ async fn pull_snapshot(
}
if !client_log_name.exists() {
- try_client_log_download(worker, reader, &client_log_name).await?;
- }
-
+ reader
+ .try_download_client_log(&client_log_name, worker)
+ .await?;
+ };
snapshot
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +738,12 @@ async fn pull_snapshot(
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
- worker: &WorkerTask,
- reader: Arc<BackupReader>,
- snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+ worker: &'a WorkerTask,
+ reader: Arc<dyn PullReader + 'a>,
+ snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<(), Error> {
let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +858,10 @@ impl std::fmt::Display for SkipInfo {
/// - Sort by snapshot time
/// - Get last snapshot timestamp on local datastore
/// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
/// -- pull snapshot, unless it's not finished yet or older than last local snapshot
/// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
/// remote when querying snapshots. This allows us to interact with old remotes that don't have
/// namespace support yet.
///
@@ -639,117 +870,79 @@ impl std::fmt::Display for SkipInfo {
/// - local group owner is already checked by pull_store
async fn pull_group(
worker: &WorkerTask,
- client: &HttpClient,
params: &PullParameters,
- group: &pbs_api_types::BackupGroup,
- remote_ns: BackupNamespace,
+ source_namespace: &BackupNamespace,
+ group: &BackupGroup,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- task_log!(worker, "sync group {}", group);
-
- let path = format!(
- "api2/json/admin/datastore/{}/snapshots",
- params.source.store()
- );
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !remote_ns.is_root() {
- args["ns"] = serde_json::to_value(&remote_ns)?;
- }
-
- let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
-
- let mut result = client.get(&path, Some(args)).await?;
- let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
- list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
- client.login().await?; // make sure auth is complete
-
- let fingerprint = client.fingerprint();
-
- let last_sync = params.store.last_successful_backup(&target_ns, group)?;
- let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
- let mut remote_snapshots = std::collections::HashSet::new();
-
- // start with 65536 chunks (up to 256 GiB)
- let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
- progress.group_snapshots = list.len() as u64;
-
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
- let total_amount = list.len();
+ let mut raw_list: Vec<BackupDir> = params
+ .source
+ .list_backup_dirs(source_namespace, group, worker)
+ .await?;
+ raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+ let total_amount = raw_list.len();
let cutoff = params
.transfer_last
.map(|count| total_amount.saturating_sub(count))
.unwrap_or_default();
- for (pos, item) in list.into_iter().enumerate() {
- let snapshot = item.backup;
-
- // in-progress backups can't be synced
- if item.size.is_none() {
- task_log!(
- worker,
- "skipping snapshot {} - in-progress backup",
- snapshot
- );
- continue;
- }
+ let target_ns = source_namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- remote_snapshots.insert(snapshot.time);
-
- if last_sync_time > snapshot.time {
- already_synced_skip_info.update(snapshot.time);
- continue;
- } else if already_synced_skip_info.count > 0 {
- task_log!(worker, "{}", already_synced_skip_info);
- already_synced_skip_info.reset();
- }
-
- if pos < cutoff && last_sync_time != snapshot.time {
- transfer_last_skip_info.update(snapshot.time);
- continue;
- } else if transfer_last_skip_info.count > 0 {
- task_log!(worker, "{}", transfer_last_skip_info);
- transfer_last_skip_info.reset();
- }
-
- // get updated auth_info (new tickets)
- let auth_info = client.login().await?;
+ let mut source_snapshots = HashSet::new();
+ let last_sync_time = params
+ .target
+ .store
+ .last_successful_backup(&target_ns, group)?
+ .unwrap_or(i64::MIN);
+
+ let list: Vec<BackupDir> = raw_list
+ .into_iter()
+ .enumerate()
+ .filter(|&(pos, ref dir)| {
+ source_snapshots.insert(dir.time);
+ if last_sync_time > dir.time {
+ already_synced_skip_info.update(dir.time);
+ return false;
+ } else if already_synced_skip_info.count > 0 {
+ task_log!(worker, "{}", already_synced_skip_info);
+ already_synced_skip_info.reset();
+ return true;
+ }
- let options =
- HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
- .rate_limit(params.limit.clone());
+ if pos < cutoff && last_sync_time != dir.time {
+ transfer_last_skip_info.update(dir.time);
+ return false;
+ } else if transfer_last_skip_info.count > 0 {
+ task_log!(worker, "{}", transfer_last_skip_info);
+ transfer_last_skip_info.reset();
+ }
+ true
+ })
+ .map(|(_, dir)| dir)
+ .collect();
- let new_client = HttpClient::new(
- params.source.host(),
- params.source.port(),
- params.source.auth_id(),
- options,
- )?;
+ // start with 65536 chunks (up to 256 GiB)
+ let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- let reader = BackupReader::start(
- &new_client,
- None,
- params.source.store(),
- &remote_ns,
- &snapshot,
- true,
- )
- .await?;
+ progress.group_snapshots = list.len() as u64;
- let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+ for (pos, from_snapshot) in list.into_iter().enumerate() {
+ let to_snapshot = params
+ .target
+ .store
+ .backup_dir(target_ns.clone(), from_snapshot.clone())?;
- let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+ let reader = params
+ .source
+ .reader(source_namespace, &from_snapshot)
+ .await?;
+ let result =
+ pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +951,14 @@ async fn pull_group(
}
if params.remove_vanished {
- let group = params.store.backup_group(target_ns.clone(), group.clone());
+ let group = params
+ .target
+ .store
+ .backup_group(target_ns.clone(), group.clone());
let local_list = group.list_backups()?;
for info in local_list {
let snapshot = info.backup_dir;
- if remote_snapshots.contains(&snapshot.backup_time()) {
+ if source_snapshots.contains(&snapshot.backup_time()) {
continue;
}
if snapshot.is_protected() {
@@ -775,6 +971,7 @@ async fn pull_group(
}
task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
params
+ .target
.store
.remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
}
@@ -783,64 +980,12 @@ async fn pull_group(
Ok(())
}
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
- worker: &WorkerTask,
- client: &HttpClient,
- params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
- let path = format!(
- "api2/json/admin/datastore/{}/namespace",
- params.source.store()
- );
- let mut data = json!({});
- if let Some(max_depth) = params.max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !params.remote_ns.is_root() {
- data["parent"] = json!(params.remote_ns);
- }
-
- let mut result = match client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- if params.remote_ns.is_root() && params.max_depth.is_none() {
- task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- params.max_depth = Some(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![params.remote_ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
- // parents first
- list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
- Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
let mut created = false;
- let store_ns_str = print_store_and_ns(params.store.name(), ns);
+ let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
- if !ns.is_root() && !params.store.namespace_path(ns).exists() {
- check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
+ if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+ check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
.map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
let name = match ns.components().last() {
@@ -850,14 +995,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
};
- if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+ if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
}
created = true;
}
check_ns_privs(
- params.store.name(),
+ params.target.store.name(),
ns,
¶ms.owner,
PRIV_DATASTORE_BACKUP,
@@ -868,10 +1013,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
}
fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
- check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
+ check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
.map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
- params.store.remove_namespace_recursive(local_ns, true)
+ params
+ .target
+ .store
+ .remove_namespace_recursive(local_ns, true)
}
fn check_and_remove_vanished_ns(
@@ -885,14 +1033,15 @@ fn check_and_remove_vanished_ns(
// clamp like remote does so that we don't list more than we can ever have synced.
let max_depth = params
.max_depth
- .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+ .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
let mut local_ns_list: Vec<BackupNamespace> = params
+ .target
.store
- .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+ .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
.filter(|ns| {
let user_privs =
- user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
+ user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
})
.collect();
@@ -901,7 +1050,7 @@ fn check_and_remove_vanished_ns(
local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
for local_ns in local_ns_list {
- if local_ns == params.ns {
+ if local_ns == params.target.ns {
continue;
}
@@ -948,29 +1097,49 @@ fn check_and_remove_vanished_ns(
/// - access to sub-NS checked here
pub(crate) async fn pull_store(
worker: &WorkerTask,
- client: &HttpClient,
mut params: PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
let old_max_depth = params.max_depth;
- let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
- vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+ let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+ vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
} else {
- query_namespaces(worker, client, &mut params).await?
+ params
+ .source
+ .list_namespaces(&mut params.max_depth, worker)
+ .await?
};
+
+ let ns_layers_to_be_pulled = namespaces
+ .iter()
+ .map(BackupNamespace::depth)
+ .max()
+ .map_or(0, |v| v - params.source.get_ns().depth());
+ let target_depth = params.target.ns.depth();
+
+ if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+ bail!(
+ "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+ ns_layers_to_be_pulled,
+ target_depth,
+ MAX_NAMESPACE_DEPTH
+ );
+ }
+
errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+ namespaces.sort_unstable_by_key(|a| a.name_len());
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
for namespace in namespaces {
- let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+ let source_store_ns_str = params.source.print_store_and_ns();
- let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
- let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+ let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
task_log!(worker, "----");
task_log!(
@@ -998,7 +1167,7 @@ pub(crate) async fn pull_store(
}
}
- match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
+ match pull_ns(worker, &namespace, &mut params).await {
Ok((ns_progress, ns_errors)) => {
errors |= ns_errors;
@@ -1019,7 +1188,7 @@ pub(crate) async fn pull_store(
task_log!(
worker,
"Encountered errors while syncing namespace {} - {}",
- namespace,
+ &namespace,
err,
);
}
@@ -1051,48 +1220,28 @@ pub(crate) async fn pull_store(
/// - owner check for vanished groups done here
pub(crate) async fn pull_ns(
worker: &WorkerTask,
- client: &HttpClient,
- params: &PullParameters,
- source_ns: BackupNamespace,
- target_ns: BackupNamespace,
+ namespace: &BackupNamespace,
+ params: &mut PullParameters,
) -> Result<(StoreProgress, bool), Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
- let args = if !source_ns.is_root() {
- Some(json!({
- "ns": source_ns,
- }))
- } else {
- None
- };
-
- let mut result = client
- .get(&path, args)
- .await
- .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
- let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+ let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
let total_count = list.len();
list.sort_unstable_by(|a, b| {
- let type_order = a.backup.ty.cmp(&b.backup.ty);
+ let type_order = a.ty.cmp(&b.ty);
if type_order == std::cmp::Ordering::Equal {
- a.backup.id.cmp(&b.backup.id)
+ a.id.cmp(&b.id)
} else {
type_order
}
});
- let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+ let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
filters.iter().any(|filter| group.matches(filter))
};
- // Get groups with target NS set
- let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
let list = if let Some(ref group_filter) = ¶ms.group_filter {
let unfiltered_count = list.len();
- let list: Vec<pbs_api_types::BackupGroup> = list
+ let list: Vec<BackupGroup> = list
.into_iter()
.filter(|group| apply_filters(group, group_filter))
.collect();
@@ -1110,13 +1259,15 @@ pub(crate) async fn pull_ns(
let mut errors = false;
- let mut new_groups = std::collections::HashSet::new();
+ let mut new_groups = HashSet::new();
for group in list.iter() {
new_groups.insert(group.clone());
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
+
for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
progress.done_snapshots = 0;
@@ -1124,6 +1275,7 @@ pub(crate) async fn pull_ns(
let (owner, _lock_guard) =
match params
+ .target
.store
.create_locked_backup_group(&target_ns, &group, ¶ms.owner)
{
@@ -1135,7 +1287,9 @@ pub(crate) async fn pull_ns(
&group,
err
);
- errors = true; // do not stop here, instead continue
+ errors = true;
+ // do not stop here, instead continue
+ task_log!(worker, "create_locked_backup_group failed");
continue;
}
};
@@ -1151,15 +1305,7 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(
- worker,
- client,
- params,
- &group,
- source_ns.clone(),
- &mut progress,
- )
- .await
+ } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
{
task_log!(worker, "sync group {} failed - {}", &group, err,);
errors = true; // do not stop here, instead continue
@@ -1168,13 +1314,13 @@ pub(crate) async fn pull_ns(
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
let local_group = local_group?;
let local_group = local_group.group();
if new_groups.contains(local_group) {
continue;
}
- let owner = params.store.get_owner(&target_ns, local_group)?;
+ let owner = params.target.store.get_owner(&target_ns, local_group)?;
if check_backup_owner(&owner, ¶ms.owner).is_err() {
continue;
}
@@ -1184,7 +1330,11 @@ pub(crate) async fn pull_ns(
}
}
task_log!(worker, "delete vanished group '{local_group}'",);
- match params.store.remove_backup_group(&target_ns, local_group) {
+ match params
+ .target
+ .store
+ .remove_backup_group(&target_ns, local_group)
+ {
Ok(true) => {}
Ok(false) => {
task_log!(
--
2.39.2
More information about the pbs-devel
mailing list