[pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
Wolfgang Bumiller
w.bumiller at proxmox.com
Thu Aug 24 15:09:58 CEST 2023
On Tue, Aug 08, 2023 at 02:13:43PM +0200, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
>
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> Cargo.toml | 2 +
> pbs-datastore/src/read_chunk.rs | 2 +-
> src/api2/config/remote.rs | 14 +-
> src/api2/pull.rs | 31 +-
> src/server/pull.rs | 943 +++++++++++++++++++-------------
> 5 files changed, 570 insertions(+), 422 deletions(-)
>
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
> //! Sync datastore from remote server
>
> use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> use anyhow::{bail, format_err, Error};
> use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
> use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>
> use pbs_api_types::{
> - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
> };
> -
> -use pbs_client::{
> - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
> use pbs_datastore::data_blob::DataBlob;
> use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
> use pbs_datastore::manifest::{
> archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
> };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>
> use crate::backup::{check_ns_modification_privs, check_ns_privs};
> use crate::tools::parallel_handler::ParallelHandler;
>
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> - /// Remote that is pulled from
> - remote: Remote,
> - /// Full specification of remote datastore
> - source: BackupRepository,
> - /// Local store that is pulled into
> +struct RemoteReader {
> + backup_reader: Arc<BackupReader>,
> + dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
> store: Arc<DataStore>,
> - /// Remote namespace
> - remote_ns: BackupNamespace,
> - /// Local namespace (anchor)
> ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> + repo: BackupRepository,
> + ns: BackupNamespace,
> + client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> + /// Lists namespaces from the source.
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error>;
> +
> + /// Lists groups within a specific namespace from the source.
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error>;
> +
> + /// Lists backup directories for a specific group within a specific namespace from the source.
> + async fn list_backup_dirs(
> + &self,
> + namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error>;
> + fn get_ns(&self) -> BackupNamespace;
> + fn print_store_and_ns(&self) -> String;
> +
> + /// Returns a reader for reading data from a specific backup directory.
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error> {
> + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> + vec![self.ns.clone()];
This (still) does nothing, as mentioned in v2 ;-)
> + }
> +
> + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> + let mut data = json!({});
> + if let Some(max_depth) = max_depth {
> + data["max-depth"] = json!(max_depth);
> + }
> +
> + if !self.ns.is_root() {
> + data["parent"] = json!(self.ns);
> + }
> + self.client.login().await?;
> +
> + let mut result = match self.client.get(&path, Some(data)).await {
> + Ok(res) => res,
> + Err(err) => match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match code {
> + &StatusCode::NOT_FOUND => {
> + if self.ns.is_root() && max_depth.is_none() {
> + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> + max_depth.replace(0);
> + } else {
> + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> + }
> +
> + return Ok(vec![self.ns.clone()]);
> + }
> + _ => {
> + bail!("Querying namespaces failed - HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + bail!("Querying namespaces failed - {err}");
> + }
> + },
> + };
> +
> + let list: Vec<BackupNamespace> =
> + serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> + .iter()
> + .map(|list_item| list_item.ns.clone())
If you're already modifying this, use
.into_iter()
.map(|list_item| list_item.ns)
since we don't really need to clone() here
> + .collect();
> +
> + Ok(list)
> + }
> +
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + _owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> + let args = if !namespace.is_root() {
> + Some(json!({ "ns": namespace.clone() }))
> + } else {
> + None
> + };
> +
> + self.client.login().await?;
> + let mut result =
> + self.client.get(&path, args).await.map_err(|err| {
> + format_err!("Failed to retrieve backup groups from remote - {}", err)
> + })?;
> +
> + Ok(
> + serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> + .map_err(Error::from)?
> + .into_iter()
> + .map(|item| item.backup)
> + .collect::<Vec<BackupGroup>>(),
> + )
> + }
> +
> + async fn list_backup_dirs(
> + &self,
> + _namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> + let mut args = json!({
> + "backup-type": group.ty,
> + "backup-id": group.id,
> + });
> +
> + if !self.ns.is_root() {
> + args["ns"] = serde_json::to_value(&self.ns)?;
> + }
> +
> + self.client.login().await?;
> +
> + let mut result = self.client.get(&path, Some(args)).await?;
> + let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> + Ok(snapshot_list
> + .into_iter()
> + .filter_map(|item: SnapshotListItem| {
> + let snapshot = item.backup;
> + // in-progress backups can't be synced
> + if item.size.is_none() {
> + task_log!(
> + worker,
> + "skipping snapshot {} - in-progress backup",
> + snapshot
> + );
> + return None;
> + }
> +
> + Some(snapshot)
> + })
> + .collect::<Vec<BackupDir>>())
> + }
> +
> + fn get_ns(&self) -> BackupNamespace {
> + self.ns.clone()
> + }
> +
> + fn print_store_and_ns(&self) -> String {
> + print_store_and_ns(self.repo.store(), &self.ns)
> + }
> +
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error> {
> + let backup_reader =
> + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> + Ok(Arc::new(RemoteReader {
> + backup_reader,
> + dir: dir.clone(),
> + }))
> + }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
> +trait PullReader: Send + Sync {
> + /// Returns a chunk reader with the specified encryption mode.
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> + /// Asynchronously loads a file from the source into a local file.
> + /// `filename` is the name of the file to load from the source.
> + /// `into` is the path of the local file to load the source file into.
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error>;
> +
> + /// Tries to download the client log from the source and save it into a local file.
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error>;
> +
> + fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> + Arc::new(RemoteChunkReader::new(
> + self.backup_reader.clone(),
> + None,
> + crypt_mode,
> + HashMap::new(),
> + ))
> + }
> +
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error> {
> + let mut tmp_file = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .truncate(true)
> + .read(true)
> + .open(into)?;
> + let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> + if let Err(err) = download_result {
> + match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match *code {
> + StatusCode::NOT_FOUND => {
> + task_log!(
> + worker,
> + "skipping snapshot {} - vanished since start of sync",
> + &self.dir,
> + );
> + return Ok(None);
> + }
> + _ => {
> + bail!("HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + return Err(err);
> + }
> + };
> + };
> + tmp_file.rewind()?;
> + Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> + }
> +
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error> {
> + let mut tmp_path = to_path.to_owned();
> + tmp_path.set_extension("tmp");
> +
> + let tmpfile = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .read(true)
> + .open(&tmp_path)?;
> +
> + // Note: be silent if there is no log - only log successful download
> + if let Ok(()) = self
> + .backup_reader
> + .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> + .await
> + {
> + if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> + bail!("Atomic rename file {:?} failed - {}", to_path, err);
> + }
> + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> + }
> +
> + Ok(())
> + }
> +
> + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> + false
> + }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> + /// Where data is pulled from
> + source: Arc<dyn PullSource>,
> + /// Where data should be pulled into
> + target: PullTarget,
> /// Owner of synced groups (needs to match local owner of pre-existing groups)
> owner: Authid,
> /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
> max_depth: Option<usize>,
> /// Filters for reducing the pull scope
> group_filter: Option<Vec<GroupFilter>>,
> - /// Rate limits for all transfers from `remote`
> - limit: RateLimitConfig,
> /// How many snapshots should be transferred at most (taking the newest N snapshots)
> transfer_last: Option<usize>,
> }
>
> impl PullParameters {
> /// Creates a new instance of `PullParameters`.
> - ///
> - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> - /// [BackupRepository] with `remote_store`.
> - #[allow(clippy::too_many_arguments)]
> pub(crate) fn new(
> store: &str,
> ns: BackupNamespace,
> - remote: &str,
> + remote: Option<&str>,
> remote_store: &str,
> remote_ns: BackupNamespace,
> owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
> limit: RateLimitConfig,
> transfer_last: Option<usize>,
> ) -> Result<Self, Error> {
> - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
> if let Some(max_depth) = max_depth {
> ns.check_max_depth(max_depth)?;
> remote_ns.check_max_depth(max_depth)?;
> - }
> -
> - let (remote_config, _digest) = pbs_config::remote::config()?;
> - let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> + };
> let remove_vanished = remove_vanished.unwrap_or(false);
>
> - let source = BackupRepository::new(
> - Some(remote.config.auth_id.clone()),
> - Some(remote.config.host.clone()),
> - remote.config.port,
> - remote_store.to_string(),
> - );
> + let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> + let (remote_config, _digest) = pbs_config::remote::config()?;
> + let remote: Remote = remote_config.lookup("remote", remote)?;
>
> - Ok(Self {
> - remote,
> - remote_ns,
> + let repo = BackupRepository::new(
> + Some(remote.config.auth_id.clone()),
> + Some(remote.config.host.clone()),
> + remote.config.port,
> + remote_store.to_string(),
> + );
> + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> + Arc::new(RemoteSource {
> + repo,
> + ns: remote_ns,
> + client,
> + })
> + } else {
> + bail!("local sync not implemented yet")
> + };
> + let target = PullTarget {
> + store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> ns,
> + };
> +
> + Ok(Self {
> source,
> - store,
> + target,
> owner,
> remove_vanished,
> max_depth,
> group_filter,
> - limit,
> transfer_last,
> })
> }
>
> - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> - pub async fn client(&self) -> Result<HttpClient, Error> {
> - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> + pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> + let source_ns = self.source.get_ns();
> + source_ns.map_prefix(&source_ns, &self.target.ns)
^ This code is still weird, again, as already mentioned in v2
> }
> }
>
> async fn pull_index_chunks<I: IndexFile>(
> worker: &WorkerTask,
> - chunk_reader: RemoteChunkReader,
> + chunk_reader: Arc<dyn AsyncReadChunk>,
> target: Arc<DataStore>,
> index: I,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
> Ok(())
> }
>
> -async fn download_manifest(
> - reader: &BackupReader,
> - filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> - let mut tmp_manifest_file = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .truncate(true)
> - .read(true)
> - .open(filename)?;
> -
> - reader
> - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> - .await?;
> -
> - tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> - Ok(tmp_manifest_file)
> -}
> -
> fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
> if size != info.size {
> bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
> /// Pulls a single file referenced by a manifest.
> ///
> /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
> /// - if archive is an index, pull referenced chunks
> /// - Rename tmp file into real path
> -async fn pull_single_archive(
> - worker: &WorkerTask,
> - reader: &BackupReader,
> - chunk_reader: &mut RemoteChunkReader,
> - snapshot: &pbs_datastore::BackupDir,
> - archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> + archive_info: &'a FileInfo,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>
> task_log!(worker, "sync archive {}", archive_name);
>
> - let mut tmpfile = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .read(true)
> - .open(&tmp_path)?;
> + reader
> + .load_file_into(archive_name, &tmp_path, worker)
> + .await?;
>
> - reader.download(archive_name, &mut tmpfile).await?;
> + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>
> match archive_type(archive_name)? {
> ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
The t<->s typo is still there, as mentioned in v2.
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::FixedIndex => {
> let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
The t<->s typo is still there, as mentioned in v2.
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::Blob => {
> - tmpfile.seek(SeekFrom::Start(0))?;
> + tmpfile.rewind()?;
> let (csum, size) = sha256(&mut tmpfile)?;
> verify_archive(archive_info, &csum, size)?;
> }
More information about the pbs-devel
mailing list