[pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
Lukas Wagner
l.wagner at proxmox.com
Thu Sep 21 13:10:34 CEST 2023
Some of the changed lines seem to be overly long (>100 chars), I've
noted some of the places, but probably did not catch everything.
On 8/8/23 14:13, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
>
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> Cargo.toml | 2 +
> pbs-datastore/src/read_chunk.rs | 2 +-
> src/api2/config/remote.rs | 14 +-
> src/api2/pull.rs | 31 +-
> src/server/pull.rs | 943 +++++++++++++++++++-------------
> 5 files changed, 570 insertions(+), 422 deletions(-)
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 4d34f8a1..74cb68e0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
>
> # regular crates
> anyhow = "1.0"
> +async-trait = "0.1.56"
> apt-pkg-native = "0.3.2"
> base64 = "0.13"
> bitflags = "1.2.1"
> @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
>
> [dependencies]
> anyhow.workspace = true
> +async-trait.workspace = true
> apt-pkg-native.workspace = true
> base64.workspace = true
> bitflags.workspace = true
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
> fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
> }
>
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
> /// Returns the encoded chunk data
> fn read_raw_chunk<'a>(
> &'a self,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 307cf3cd..2511c5d5 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
> Ok(())
> }
>
> -/// Helper to get client for remote.cfg entry
> -pub async fn remote_client(
> +/// Helper to get client for remote.cfg entry without login, just config
> +pub fn remote_client_config(
> remote: &Remote,
> limit: Option<RateLimitConfig>,
> ) -> Result<HttpClient, Error> {
> @@ -320,6 +320,16 @@ pub async fn remote_client(
> &remote.config.auth_id,
> options,
> )?;
> +
> + Ok(client)
> +}
> +
> +/// Helper to get client for remote.cfg entry
> +pub async fn remote_client(
> + remote: &Remote,
> + limit: Option<RateLimitConfig>,
> +) -> Result<HttpClient, Error> {
> + let client = remote_client_config(remote, limit)?;
> let _auth_info = client
> .login() // make sure we can auth
> .await
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 664ecce5..e36a5b14 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>
> use pbs_api_types::{
> Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> - GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> + GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
> TRANSFER_LAST_SCHEMA,
> };
> @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
> PullParameters::new(
> &sync_job.store,
> sync_job.ns.clone().unwrap_or_default(),
> - sync_job.remote.as_deref().unwrap_or("local"),
> + sync_job.remote.as_deref(),
> &sync_job.remote_store,
> sync_job.remote_ns.clone().unwrap_or_default(),
> sync_job
> @@ -124,7 +124,6 @@ pub fn do_sync_job(
>
> let worker_future = async move {
> let pull_params = PullParameters::try_from(&sync_job)?;
> - let client = pull_params.client().await?;
>
> task_log!(worker, "Starting datastore sync job '{}'", job_id);
> if let Some(event_str) = schedule {
> @@ -138,24 +137,7 @@ pub fn do_sync_job(
> sync_job.remote_store,
> );
>
> - if sync_job.remote.is_some() {
> - pull_store(&worker, &client, pull_params).await?;
> - } else {
> - if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> - if target_ns.path().starts_with(source_ns.path())
> - && sync_job.store == sync_job.remote_store
> - && sync_job.max_depth.map_or(true, |sync_depth| {
> - target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> - }) {
> - task_log!(
> - worker,
> - "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> - );
> - }
> - } else {
> - pull_store(&worker, &client, pull_params).await?;
> - }
> - }
> + pull_store(&worker, pull_params).await?;
>
> task_log!(worker, "sync job '{}' end", &job_id);
>
> @@ -284,7 +266,7 @@ async fn pull(
> let pull_params = PullParameters::new(
> &store,
> ns,
> - remote.as_deref().unwrap_or("local"),
> + remote.as_deref(),
> &remote_store,
> remote_ns.unwrap_or_default(),
> auth_id.clone(),
> @@ -294,7 +276,6 @@ async fn pull(
> limit,
> transfer_last,
> )?;
> - let client = pull_params.client().await?;
>
> // fixme: set to_stdout to false?
> // FIXME: add namespace to worker id?
> @@ -312,7 +293,7 @@ async fn pull(
> remote_store,
> );
>
> - let pull_future = pull_store(&worker, &client, pull_params);
> + let pull_future = pull_store(&worker, pull_params);
> (select! {
> success = pull_future.fuse() => success,
> abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> @@ -327,4 +308,4 @@ async fn pull(
> Ok(upid_str)
> }
>
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
> //! Sync datastore from remote server
>
> use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> use anyhow::{bail, format_err, Error};
> use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
> use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>
> use pbs_api_types::{
> - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
> };
> -
> -use pbs_client::{
> - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
> use pbs_datastore::data_blob::DataBlob;
> use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
> use pbs_datastore::manifest::{
> archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
> };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>
> use crate::backup::{check_ns_modification_privs, check_ns_privs};
> use crate::tools::parallel_handler::ParallelHandler;
>
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> - /// Remote that is pulled from
> - remote: Remote,
> - /// Full specification of remote datastore
> - source: BackupRepository,
> - /// Local store that is pulled into
> +struct RemoteReader {
> + backup_reader: Arc<BackupReader>,
> + dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
> store: Arc<DataStore>,
> - /// Remote namespace
> - remote_ns: BackupNamespace,
> - /// Local namespace (anchor)
> ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> + repo: BackupRepository,
> + ns: BackupNamespace,
> + client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> + /// Lists namespaces from the source.
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error>;
> +
> + /// Lists groups within a specific namespace from the source.
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error>;
> +
> + /// Lists backup directories for a specific group within a specific namespace from the source.
> + async fn list_backup_dirs(
> + &self,
> + namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error>;
> + fn get_ns(&self) -> BackupNamespace;
> + fn print_store_and_ns(&self) -> String;
> +
> + /// Returns a reader for reading data from a specific backup directory.
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error> {
> + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> + vec![self.ns.clone()];
> + }
> +
> + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> + let mut data = json!({});
> + if let Some(max_depth) = max_depth {
> + data["max-depth"] = json!(max_depth);
> + }
> +
> + if !self.ns.is_root() {
> + data["parent"] = json!(self.ns);
> + }
> + self.client.login().await?;
> +
> + let mut result = match self.client.get(&path, Some(data)).await {
> + Ok(res) => res,
> + Err(err) => match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match code {
> + &StatusCode::NOT_FOUND => {
> + if self.ns.is_root() && max_depth.is_none() {
> + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
These lines exceed our 100 character limit.
> + max_depth.replace(0);
> + } else {
> + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> + }
> +
> + return Ok(vec![self.ns.clone()]);
> + }
> + _ => {
> + bail!("Querying namespaces failed - HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + bail!("Querying namespaces failed - {err}");
> + }
> + },
> + };
> +
> + let list: Vec<BackupNamespace> =
> + serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> + .iter()
> + .map(|list_item| list_item.ns.clone())
> + .collect();
> +
> + Ok(list)
> + }
> +
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + _owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> + let args = if !namespace.is_root() {
> + Some(json!({ "ns": namespace.clone() }))
> + } else {
> + None
> + };
> +
> + self.client.login().await?;
> + let mut result =
> + self.client.get(&path, args).await.map_err(|err| {
> + format_err!("Failed to retrieve backup groups from remote - {}", err)
> + })?;
> +
> + Ok(
> + serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> + .map_err(Error::from)?
> + .into_iter()
> + .map(|item| item.backup)
> + .collect::<Vec<BackupGroup>>(),
> + )
> + }
> +
> + async fn list_backup_dirs(
> + &self,
> + _namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> + let mut args = json!({
> + "backup-type": group.ty,
> + "backup-id": group.id,
> + });
> +
> + if !self.ns.is_root() {
> + args["ns"] = serde_json::to_value(&self.ns)?;
> + }
> +
> + self.client.login().await?;
> +
> + let mut result = self.client.get(&path, Some(args)).await?;
> + let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> + Ok(snapshot_list
> + .into_iter()
> + .filter_map(|item: SnapshotListItem| {
> + let snapshot = item.backup;
> + // in-progress backups can't be synced
> + if item.size.is_none() {
> + task_log!(
> + worker,
> + "skipping snapshot {} - in-progress backup",
> + snapshot
> + );
> + return None;
> + }
> +
> + Some(snapshot)
> + })
> + .collect::<Vec<BackupDir>>())
> + }
> +
> + fn get_ns(&self) -> BackupNamespace {
> + self.ns.clone()
> + }
> +
> + fn print_store_and_ns(&self) -> String {
> + print_store_and_ns(self.repo.store(), &self.ns)
> + }
> +
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error> {
> + let backup_reader =
> + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> + Ok(Arc::new(RemoteReader {
> + backup_reader,
> + dir: dir.clone(),
> + }))
> + }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
Long line again
> +trait PullReader: Send + Sync {
> + /// Returns a chunk reader with the specified encryption mode.
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> + /// Asynchronously loads a file from the source into a local file.
> + /// `filename` is the name of the file to load from the source.
> + /// `into` is the path of the local file to load the source file into.
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error>;
> +
> + /// Tries to download the client log from the source and save it into a local file.
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error>;
> +
> + fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> + Arc::new(RemoteChunkReader::new(
> + self.backup_reader.clone(),
> + None,
> + crypt_mode,
> + HashMap::new(),
> + ))
> + }
> +
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error> {
> + let mut tmp_file = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .truncate(true)
> + .read(true)
> + .open(into)?;
> + let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> + if let Err(err) = download_result {
> + match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match *code {
> + StatusCode::NOT_FOUND => {
> + task_log!(
> + worker,
> + "skipping snapshot {} - vanished since start of sync",
> + &self.dir,
> + );
> + return Ok(None);
> + }
> + _ => {
> + bail!("HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + return Err(err);
> + }
> + };
> + };
> + tmp_file.rewind()?;
> + Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> + }
> +
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error> {
> + let mut tmp_path = to_path.to_owned();
> + tmp_path.set_extension("tmp");
> +
> + let tmpfile = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .read(true)
> + .open(&tmp_path)?;
> +
> + // Note: be silent if there is no log - only log successful download
> + if let Ok(()) = self
> + .backup_reader
> + .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> + .await
> + {
> + if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> + bail!("Atomic rename file {:?} failed - {}", to_path, err);
> + }
> + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> + }
> +
> + Ok(())
> + }
> +
> + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> + false
> + }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> + /// Where data is pulled from
> + source: Arc<dyn PullSource>,
> + /// Where data should be pulled into
> + target: PullTarget,
> /// Owner of synced groups (needs to match local owner of pre-existing groups)
> owner: Authid,
> /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
> max_depth: Option<usize>,
> /// Filters for reducing the pull scope
> group_filter: Option<Vec<GroupFilter>>,
> - /// Rate limits for all transfers from `remote`
> - limit: RateLimitConfig,
> /// How many snapshots should be transferred at most (taking the newest N snapshots)
> transfer_last: Option<usize>,
> }
>
> impl PullParameters {
> /// Creates a new instance of `PullParameters`.
> - ///
> - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> - /// [BackupRepository] with `remote_store`.
> - #[allow(clippy::too_many_arguments)]
> pub(crate) fn new(
> store: &str,
> ns: BackupNamespace,
> - remote: &str,
> + remote: Option<&str>,
> remote_store: &str,
> remote_ns: BackupNamespace,
> owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
> limit: RateLimitConfig,
> transfer_last: Option<usize>,
> ) -> Result<Self, Error> {
> - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
> if let Some(max_depth) = max_depth {
> ns.check_max_depth(max_depth)?;
> remote_ns.check_max_depth(max_depth)?;
> - }
> -
> - let (remote_config, _digest) = pbs_config::remote::config()?;
> - let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> + };
> let remove_vanished = remove_vanished.unwrap_or(false);
>
> - let source = BackupRepository::new(
> - Some(remote.config.auth_id.clone()),
> - Some(remote.config.host.clone()),
> - remote.config.port,
> - remote_store.to_string(),
> - );
> + let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> + let (remote_config, _digest) = pbs_config::remote::config()?;
> + let remote: Remote = remote_config.lookup("remote", remote)?;
>
> - Ok(Self {
> - remote,
> - remote_ns,
> + let repo = BackupRepository::new(
> + Some(remote.config.auth_id.clone()),
> + Some(remote.config.host.clone()),
> + remote.config.port,
> + remote_store.to_string(),
> + );
> + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> + Arc::new(RemoteSource {
> + repo,
> + ns: remote_ns,
> + client,
> + })
> + } else {
> + bail!("local sync not implemented yet")
> + };
> + let target = PullTarget {
> + store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> ns,
> + };
> +
> + Ok(Self {
> source,
> - store,
> + target,
> owner,
> remove_vanished,
> max_depth,
> group_filter,
> - limit,
> transfer_last,
> })
> }
>
> - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> - pub async fn client(&self) -> Result<HttpClient, Error> {
> - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> + pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> + let source_ns = self.source.get_ns();
> + source_ns.map_prefix(&source_ns, &self.target.ns)
> }
> }
>
> async fn pull_index_chunks<I: IndexFile>(
> worker: &WorkerTask,
> - chunk_reader: RemoteChunkReader,
> + chunk_reader: Arc<dyn AsyncReadChunk>,
> target: Arc<DataStore>,
> index: I,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
> Ok(())
> }
>
> -async fn download_manifest(
> - reader: &BackupReader,
> - filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> - let mut tmp_manifest_file = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .truncate(true)
> - .read(true)
> - .open(filename)?;
> -
> - reader
> - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> - .await?;
> -
> - tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> - Ok(tmp_manifest_file)
> -}
> -
> fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
> if size != info.size {
> bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
> /// Pulls a single file referenced by a manifest.
> ///
> /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
> /// - if archive is an index, pull referenced chunks
> /// - Rename tmp file into real path
> -async fn pull_single_archive(
> - worker: &WorkerTask,
> - reader: &BackupReader,
> - chunk_reader: &mut RemoteChunkReader,
> - snapshot: &pbs_datastore::BackupDir,
> - archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> + archive_info: &'a FileInfo,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>
> task_log!(worker, "sync archive {}", archive_name);
>
> - let mut tmpfile = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .read(true)
> - .open(&tmp_path)?;
> + reader
> + .load_file_into(archive_name, &tmp_path, worker)
> + .await?;
>
> - reader.download(archive_name, &mut tmpfile).await?;
> + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>
> match archive_type(archive_name)? {
> ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::FixedIndex => {
> let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::Blob => {
> - tmpfile.seek(SeekFrom::Start(0))?;
> + tmpfile.rewind()?;
> let (csum, size) = sha256(&mut tmpfile)?;
> verify_archive(archive_info, &csum, size)?;
> }
> @@ -330,33 +616,6 @@ async fn pull_single_archive(
> Ok(())
> }
>
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - path: &std::path::Path,
> -) -> Result<(), Error> {
> - let mut tmp_path = path.to_owned();
> - tmp_path.set_extension("tmp");
> -
> - let tmpfile = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .read(true)
> - .open(&tmp_path)?;
> -
> - // Note: be silent if there is no log - only log successful download
> - if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> - if let Err(err) = std::fs::rename(&tmp_path, path) {
> - bail!("Atomic rename file {:?} failed - {}", path, err);
> - }
> - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> - }
> -
> - Ok(())
> -}
> -
> /// Actual implementation of pulling a snapshot.
> ///
> /// Pulling a snapshot consists of the following steps:
> @@ -366,10 +625,10 @@ async fn try_client_log_download(
> /// -- if file already exists, verify contents
> /// -- if not, pull it from the remote
> /// - Download log if not already existing
> -async fn pull_snapshot(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - snapshot: &pbs_datastore::BackupDir,
> +async fn pull_snapshot<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let mut manifest_name = snapshot.full_path();
> @@ -380,32 +639,15 @@ async fn pull_snapshot(
>
> let mut tmp_manifest_name = manifest_name.clone();
> tmp_manifest_name.set_extension("tmp");
> -
> - let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> - let mut tmp_manifest_file = match download_res {
> - Ok(manifest_file) => manifest_file,
> - Err(err) => {
> - match err.downcast_ref::<HttpError>() {
> - Some(HttpError { code, message }) => match *code {
> - StatusCode::NOT_FOUND => {
> - task_log!(
> - worker,
> - "skipping snapshot {} - vanished since start of sync",
> - snapshot.dir(),
> - );
> - return Ok(());
> - }
> - _ => {
> - bail!("HTTP error {code} - {message}");
> - }
> - },
> - None => {
> - return Err(err);
> - }
> - };
> - }
> - };
> - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> + let tmp_manifest_blob;
> + if let Some(data) = reader
> + .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
> + .await?
> + {
> + tmp_manifest_blob = data;
> + } else {
> + return Ok(());
> + }
>
> if manifest_name.exists() {
> let manifest_blob = proxmox_lang::try_block!({
> @@ -422,8 +664,10 @@ async fn pull_snapshot(
>
> if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
> if !client_log_name.exists() {
> - try_client_log_download(worker, reader, &client_log_name).await?;
> - }
> + reader
> + .try_download_client_log(&client_log_name, worker)
> + .await?;
> + };
> task_log!(worker, "no data changes");
> let _ = std::fs::remove_file(&tmp_manifest_name);
> return Ok(()); // nothing changed
> @@ -471,17 +715,9 @@ async fn pull_snapshot(
> }
> }
>
> - let mut chunk_reader = RemoteChunkReader::new(
> - reader.clone(),
> - None,
> - item.chunk_crypt_mode(),
> - HashMap::new(),
> - );
> -
> pull_single_archive(
> worker,
> - &reader,
> - &mut chunk_reader,
> + reader.clone(),
> snapshot,
> item,
> downloaded_chunks.clone(),
> @@ -494,9 +730,10 @@ async fn pull_snapshot(
> }
>
> if !client_log_name.exists() {
> - try_client_log_download(worker, reader, &client_log_name).await?;
> - }
> -
> + reader
> + .try_download_client_log(&client_log_name, worker)
> + .await?;
> + };
> snapshot
> .cleanup_unreferenced_files(&manifest)
> .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,12 +743,12 @@ async fn pull_snapshot(
>
> /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> ///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
> -async fn pull_snapshot_from(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - snapshot: &pbs_datastore::BackupDir,
> +/// The `reader` is configured to read from the source backup directory, while the
> +/// `snapshot` is pointing to the local datastore and target namespace.
> +async fn pull_snapshot_from<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let (_path, is_new, _snap_lock) = snapshot
> @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
> /// - Sort by snapshot time
> /// - Get last snapshot timestamp on local datastore
> /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
> /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
> /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
> ///
> -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
> +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
> /// remote when querying snapshots. This allows us to interact with old remotes that don't have
> /// namespace support yet.
> ///
> @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
> /// - local group owner is already checked by pull_store
> async fn pull_group(
> worker: &WorkerTask,
> - client: &HttpClient,
> params: &PullParameters,
> - group: &pbs_api_types::BackupGroup,
> - remote_ns: BackupNamespace,
> + source_namespace: &BackupNamespace,
> + group: &BackupGroup,
> progress: &mut StoreProgress,
> ) -> Result<(), Error> {
> - task_log!(worker, "sync group {}", group);
> -
> - let path = format!(
> - "api2/json/admin/datastore/{}/snapshots",
> - params.source.store()
> - );
> -
> - let mut args = json!({
> - "backup-type": group.ty,
> - "backup-id": group.id,
> - });
> -
> - if !remote_ns.is_root() {
> - args["ns"] = serde_json::to_value(&remote_ns)?;
> - }
> -
> - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
> -
> - let mut result = client.get(&path, Some(args)).await?;
> - let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> - client.login().await?; // make sure auth is complete
> -
> - let fingerprint = client.fingerprint();
> -
> - let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> - let last_sync_time = last_sync.unwrap_or(i64::MIN);
> -
> - let mut remote_snapshots = std::collections::HashSet::new();
> -
> - // start with 65536 chunks (up to 256 GiB)
> - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> - progress.group_snapshots = list.len() as u64;
> -
> let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>
> - let total_amount = list.len();
> + let mut raw_list: Vec<BackupDir> = params
> + .source
> + .list_backup_dirs(source_namespace, group, worker)
> + .await?;
> + raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
> +
> + let total_amount = raw_list.len();
>
> let cutoff = params
> .transfer_last
> .map(|count| total_amount.saturating_sub(count))
> .unwrap_or_default();
>
> - for (pos, item) in list.into_iter().enumerate() {
> - let snapshot = item.backup;
> -
> - // in-progress backups can't be synced
> - if item.size.is_none() {
> - task_log!(
> - worker,
> - "skipping snapshot {} - in-progress backup",
> - snapshot
> - );
> - continue;
> - }
> -
> - remote_snapshots.insert(snapshot.time);
> + let target_ns = params.get_target_ns()?;
>
> - if last_sync_time > snapshot.time {
> - already_synced_skip_info.update(snapshot.time);
> - continue;
> - } else if already_synced_skip_info.count > 0 {
> - task_log!(worker, "{}", already_synced_skip_info);
> - already_synced_skip_info.reset();
> - }
> -
> - if pos < cutoff && last_sync_time != snapshot.time {
> - transfer_last_skip_info.update(snapshot.time);
> - continue;
> - } else if transfer_last_skip_info.count > 0 {
> - task_log!(worker, "{}", transfer_last_skip_info);
> - transfer_last_skip_info.reset();
> - }
> -
> - // get updated auth_info (new tickets)
> - let auth_info = client.login().await?;
> + let mut source_snapshots = HashSet::new();
> + let last_sync_time = params
> + .target
> + .store
> + .last_successful_backup(&target_ns, group)?
> + .unwrap_or(i64::MIN);
> +
> + let list: Vec<BackupDir> = raw_list
> + .into_iter()
> + .enumerate()
> + .filter(|&(pos, ref dir)| {
> + source_snapshots.insert(dir.time);
> + if last_sync_time > dir.time {
> + already_synced_skip_info.update(dir.time);
> + return false;
> + } else if already_synced_skip_info.count > 0 {
> + task_log!(worker, "{}", already_synced_skip_info);
> + already_synced_skip_info.reset();
> + return true;
> + }
>
> - let options =
> - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> - .rate_limit(params.limit.clone());
> + if pos < cutoff && last_sync_time != dir.time {
> + transfer_last_skip_info.update(dir.time);
> + return false;
> + } else if transfer_last_skip_info.count > 0 {
> + task_log!(worker, "{}", transfer_last_skip_info);
> + transfer_last_skip_info.reset();
> + }
> + true
> + })
> + .map(|(_, dir)| dir)
> + .collect();
>
> - let new_client = HttpClient::new(
> - params.source.host(),
> - params.source.port(),
> - params.source.auth_id(),
> - options,
> - )?;
> + // start with 65536 chunks (up to 256 GiB)
> + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
> - let reader = BackupReader::start(
> - &new_client,
> - None,
> - params.source.store(),
> - &remote_ns,
> - &snapshot,
> - true,
> - )
> - .await?;
> + progress.group_snapshots = list.len() as u64;
>
> - let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> + for (pos, from_snapshot) in list.into_iter().enumerate() {
> + let to_snapshot = params
> + .target
> + .store
> + .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>
> - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> + let reader = params
> + .source
> + .reader(source_namespace, &from_snapshot)
> + .await?;
> + let result =
> + pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
>
> progress.done_snapshots = pos as u64 + 1;
> task_log!(worker, "percentage done: {}", progress);
> @@ -758,11 +956,14 @@ async fn pull_group(
> }
>
> if params.remove_vanished {
> - let group = params.store.backup_group(target_ns.clone(), group.clone());
> + let group = params
> + .target
> + .store
> + .backup_group(params.get_target_ns()?, group.clone());
> let local_list = group.list_backups()?;
> for info in local_list {
> let snapshot = info.backup_dir;
> - if remote_snapshots.contains(&snapshot.backup_time()) {
> + if source_snapshots.contains(&snapshot.backup_time()) {
> continue;
> }
> if snapshot.is_protected() {
> @@ -774,73 +975,23 @@ async fn pull_group(
> continue;
> }
> task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
> - params
> - .store
> - .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
> + params.target.store.remove_backup_dir(
> + ¶ms.get_target_ns()?,
> + snapshot.as_ref(),
> + false,
> + )?;
> }
> }
>
> Ok(())
> }
>
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> - worker: &WorkerTask,
> - client: &HttpClient,
> - params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> - let path = format!(
> - "api2/json/admin/datastore/{}/namespace",
> - params.source.store()
> - );
> - let mut data = json!({});
> - if let Some(max_depth) = params.max_depth {
> - data["max-depth"] = json!(max_depth);
> - }
> -
> - if !params.remote_ns.is_root() {
> - data["parent"] = json!(params.remote_ns);
> - }
> -
> - let mut result = match client.get(&path, Some(data)).await {
> - Ok(res) => res,
> - Err(err) => match err.downcast_ref::<HttpError>() {
> - Some(HttpError { code, message }) => match *code {
> - StatusCode::NOT_FOUND => {
> - if params.remote_ns.is_root() && params.max_depth.is_none() {
> - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> - params.max_depth = Some(0);
> - } else {
> - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> - }
> -
> - return Ok(vec![params.remote_ns.clone()]);
> - }
> - _ => {
> - bail!("Querying namespaces failed - HTTP error {code} - {message}");
> - }
> - },
> - None => {
> - bail!("Querying namespaces failed - {err}");
> - }
> - },
> - };
> -
> - let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> - // parents first
> - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> - Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
> fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
> let mut created = false;
> - let store_ns_str = print_store_and_ns(params.store.name(), ns);
> + let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>
> - if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> - check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
> + if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> + check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
> .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>
> let name = match ns.components().last() {
> @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
> }
> };
>
> - if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> + if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
> bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
> }
> created = true;
> }
>
> check_ns_privs(
> - params.store.name(),
> + params.target.store.name(),
> ns,
> ¶ms.owner,
> PRIV_DATASTORE_BACKUP,
> @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
> }
>
> fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
> + check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
> .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>
> - params.store.remove_namespace_recursive(local_ns, true)
> + params
> + .target
> + .store
> + .remove_namespace_recursive(local_ns, true)
> }
>
> fn check_and_remove_vanished_ns(
> @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
> // clamp like remote does so that we don't list more than we can ever have synced.
> let max_depth = params
> .max_depth
> - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>
> let mut local_ns_list: Vec<BackupNamespace> = params
> + .target
> .store
> - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
> .filter(|ns| {
> let user_privs =
> - user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
> + user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
> user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
> })
> .collect();
> @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
> local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>
> for local_ns in local_ns_list {
> - if local_ns == params.ns {
> + if local_ns == params.target.ns {
> continue;
> }
>
> @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
> /// - access to sub-NS checked here
> pub(crate) async fn pull_store(
> worker: &WorkerTask,
> - client: &HttpClient,
> mut params: PullParameters,
> ) -> Result<(), Error> {
> // explicit create shared lock to prevent GC on newly created chunks
> - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> + let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
> let mut errors = false;
>
> let old_max_depth = params.max_depth;
> - let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> - vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> + let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
> + vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
> } else {
> - query_namespaces(worker, client, &mut params).await?
> + params
> + .source
> + .list_namespaces(&mut params.max_depth, worker)
> + .await?
> };
> +
> + let ns_layers_to_be_pulled = namespaces
> + .iter()
> + .map(BackupNamespace::depth)
> + .max()
> + .map_or(0, |v| v - params.source.get_ns().depth());
> + let target_depth = params.target.ns.depth();
> +
> + if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
> + bail!(
> + "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
> + ns_layers_to_be_pulled,
> + target_depth,
> + MAX_NAMESPACE_DEPTH
> + );
> + }
> +
> errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> + namespaces.sort_unstable_by_key(|a| a.name_len());
>
> let (mut groups, mut snapshots) = (0, 0);
> let mut synced_ns = HashSet::with_capacity(namespaces.len());
>
> for namespace in namespaces {
> - let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> + let source_store_ns_str = params.source.print_store_and_ns();
>
> - let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
> - let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> + let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
> + let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>
> task_log!(worker, "----");
> task_log!(
> @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
> }
> }
>
> - match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
> + match pull_ns(worker, &namespace, &mut params).await {
> Ok((ns_progress, ns_errors)) => {
> errors |= ns_errors;
>
> @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
> task_log!(
> worker,
> "Encountered errors while syncing namespace {} - {}",
> - namespace,
> + &namespace,
> err,
> );
> }
> @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
> /// - owner check for vanished groups done here
> pub(crate) async fn pull_ns(
> worker: &WorkerTask,
> - client: &HttpClient,
> - params: &PullParameters,
> - source_ns: BackupNamespace,
> - target_ns: BackupNamespace,
> + namespace: &BackupNamespace,
> + params: &mut PullParameters,
> ) -> Result<(StoreProgress, bool), Error> {
> - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> - let args = if !source_ns.is_root() {
> - Some(json!({
> - "ns": source_ns,
> - }))
> - } else {
> - None
> - };
> -
> - let mut result = client
> - .get(&path, args)
> - .await
> - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> - let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> + let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
>
> let total_count = list.len();
> list.sort_unstable_by(|a, b| {
> - let type_order = a.backup.ty.cmp(&b.backup.ty);
> + let type_order = a.ty.cmp(&b.ty);
> if type_order == std::cmp::Ordering::Equal {
> - a.backup.id.cmp(&b.backup.id)
> + a.id.cmp(&b.id)
> } else {
> type_order
> }
> });
>
> - let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
> + let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
> filters.iter().any(|filter| group.matches(filter))
> };
>
> - // Get groups with target NS set
> - let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
> let list = if let Some(ref group_filter) = ¶ms.group_filter {
> let unfiltered_count = list.len();
> - let list: Vec<pbs_api_types::BackupGroup> = list
> + let list: Vec<BackupGroup> = list
> .into_iter()
> .filter(|group| apply_filters(group, group_filter))
> .collect();
> @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
>
> let mut errors = false;
>
> - let mut new_groups = std::collections::HashSet::new();
> + let mut new_groups = HashSet::new();
> for group in list.iter() {
> new_groups.insert(group.clone());
> }
>
> let mut progress = StoreProgress::new(list.len() as u64);
>
> + let target_ns = params.get_target_ns()?;
> for (done, group) in list.into_iter().enumerate() {
> progress.done_groups = done as u64;
> progress.done_snapshots = 0;
> @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
>
> let (owner, _lock_guard) =
> match params
> + .target
> .store
> .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
> {
> @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
> &group,
> err
> );
> - errors = true; // do not stop here, instead continue
> + errors = true;
> + // do not stop here, instead continue
> + task_log!(worker, "create_locked_backup_group failed");
> continue;
> }
> };
> @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
> owner
> );
> errors = true; // do not stop here, instead continue
> - } else if let Err(err) = pull_group(
> - worker,
> - client,
> - params,
> - &group,
> - source_ns.clone(),
> - &mut progress,
> - )
> - .await
> + } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
> {
> task_log!(worker, "sync group {} failed - {}", &group, err,);
> errors = true; // do not stop here, instead continue
> @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
>
> if params.remove_vanished {
> let result: Result<(), Error> = proxmox_lang::try_block!({
> - for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> + for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
> let local_group = local_group?;
> let local_group = local_group.group();
> if new_groups.contains(local_group) {
> continue;
> }
> - let owner = params.store.get_owner(&target_ns, local_group)?;
> + let owner = params.target.store.get_owner(&target_ns, local_group)?;
> if check_backup_owner(&owner, ¶ms.owner).is_err() {
> continue;
> }
> @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
> }
> }
> task_log!(worker, "delete vanished group '{local_group}'",);
> - match params.store.remove_backup_group(&target_ns, local_group) {
> + match params
> + .target
> + .store
> + .remove_backup_group(&target_ns, local_group)
> + {
> Ok(true) => {}
> Ok(false) => {
> task_log!(
--
- Lukas
More information about the pbs-devel
mailing list