[pbs-devel] [PATCH v2 proxmox-backup 04/31] server: sync: move source to common sync module
Christian Ebner
c.ebner at proxmox.com
Thu Aug 1 09:43:36 CEST 2024
Rename the `PullSource` trait to `SyncSource` and move the trait and
types implementing it to the common sync module, making them
reusable for both sync directions, push and pull.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- no changes
src/server/pull.rs | 281 ++-------------------------------------------
src/server/sync.rs | 276 +++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 280 insertions(+), 277 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5efe2d5f7..c6932dcc5 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,18 +7,14 @@ use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
-use http::StatusCode;
use proxmox_human_byte::HumanByte;
-use proxmox_router::HttpError;
-use serde_json::json;
-use tracing::{info, warn};
+use tracing::info;
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
- GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
- PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, Operation,
+ RateLimitConfig, Remote, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient};
+use pbs_client::BackupRepository;
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -28,13 +24,13 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
use pbs_tools::sha::sha256;
use super::sync::{
- LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+ LocalSource, RemoteSource, RemovedVanishedStats, SyncSource, SyncSourceReader, SyncStats,
};
-use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
+use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
pub(crate) struct PullTarget {
@@ -42,269 +38,10 @@ pub(crate) struct PullTarget {
ns: BackupNamespace,
}
-pub(crate) struct RemoteSource {
- repo: BackupRepository,
- ns: BackupNamespace,
- client: HttpClient,
-}
-
-pub(crate) struct LocalSource {
- store: Arc<DataStore>,
- ns: BackupNamespace,
-}
-
-#[async_trait::async_trait]
-/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
-/// The trait includes methods for listing namespaces, groups, and backup directories,
-/// as well as retrieving a reader for reading data from the source
-trait PullSource: Send + Sync {
- /// Lists namespaces from the source.
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error>;
-
- /// Lists groups within a specific namespace from the source.
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error>;
-
- /// Lists backup directories for a specific group within a specific namespace from the source.
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error>;
- fn get_ns(&self) -> BackupNamespace;
- fn get_store(&self) -> &str;
-
- /// Returns a reader for reading data from a specific backup directory.
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error>;
-}
-
-#[async_trait::async_trait]
-impl PullSource for RemoteSource {
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error> {
- if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
- return Ok(vec![self.ns.clone()]);
- }
-
- let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
- let mut data = json!({});
- if let Some(max_depth) = max_depth {
- data["max-depth"] = json!(max_depth);
- }
-
- if !self.ns.is_root() {
- data["parent"] = json!(self.ns);
- }
- self.client.login().await?;
-
- let mut result = match self.client.get(&path, Some(data)).await {
- Ok(res) => res,
- Err(err) => match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match code {
- &StatusCode::NOT_FOUND => {
- if self.ns.is_root() && max_depth.is_none() {
- warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
- warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
- max_depth.replace(0);
- } else {
- bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
- }
-
- return Ok(vec![self.ns.clone()]);
- }
- _ => {
- bail!("Querying namespaces failed - HTTP error {code} - {message}");
- }
- },
- None => {
- bail!("Querying namespaces failed - {err}");
- }
- },
- };
-
- let list: Vec<BackupNamespace> =
- serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
- .into_iter()
- .map(|list_item| list_item.ns)
- .collect();
-
- Ok(list)
- }
-
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- _owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error> {
- let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
-
- let args = if !namespace.is_root() {
- Some(json!({ "ns": namespace.clone() }))
- } else {
- None
- };
-
- self.client.login().await?;
- let mut result =
- self.client.get(&path, args).await.map_err(|err| {
- format_err!("Failed to retrieve backup groups from remote - {}", err)
- })?;
-
- Ok(
- serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
- .map_err(Error::from)?
- .into_iter()
- .map(|item| item.backup)
- .collect::<Vec<BackupGroup>>(),
- )
- }
-
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
-
- let mut args = json!({
- "backup-type": group.ty,
- "backup-id": group.id,
- });
-
- if !namespace.is_root() {
- args["ns"] = serde_json::to_value(namespace)?;
- }
-
- self.client.login().await?;
-
- let mut result = self.client.get(&path, Some(args)).await?;
- let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
- Ok(snapshot_list
- .into_iter()
- .filter_map(|item: SnapshotListItem| {
- let snapshot = item.backup;
- // in-progress backups can't be synced
- if item.size.is_none() {
- info!("skipping snapshot {snapshot} - in-progress backup");
- return None;
- }
-
- Some(snapshot)
- })
- .collect::<Vec<BackupDir>>())
- }
-
- fn get_ns(&self) -> BackupNamespace {
- self.ns.clone()
- }
-
- fn get_store(&self) -> &str {
- self.repo.store()
- }
-
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
- let backup_reader =
- BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteSourceReader {
- backup_reader,
- dir: dir.clone(),
- }))
- }
-}
-
-#[async_trait::async_trait]
-impl PullSource for LocalSource {
- async fn list_namespaces(
- &self,
- max_depth: &mut Option<usize>,
- ) -> Result<Vec<BackupNamespace>, Error> {
- ListNamespacesRecursive::new_max_depth(
- self.store.clone(),
- self.ns.clone(),
- max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
- )?
- .collect()
- }
-
- async fn list_groups(
- &self,
- namespace: &BackupNamespace,
- owner: &Authid,
- ) -> Result<Vec<BackupGroup>, Error> {
- Ok(ListAccessibleBackupGroups::new_with_privs(
- &self.store,
- namespace.clone(),
- 0,
- Some(PRIV_DATASTORE_READ),
- Some(PRIV_DATASTORE_BACKUP),
- Some(owner),
- )?
- .filter_map(Result::ok)
- .map(|backup_group| backup_group.group().clone())
- .collect::<Vec<pbs_api_types::BackupGroup>>())
- }
-
- async fn list_backup_dirs(
- &self,
- namespace: &BackupNamespace,
- group: &BackupGroup,
- ) -> Result<Vec<BackupDir>, Error> {
- Ok(self
- .store
- .backup_group(namespace.clone(), group.clone())
- .iter_snapshots()?
- .filter_map(Result::ok)
- .map(|snapshot| snapshot.dir().to_owned())
- .collect::<Vec<BackupDir>>())
- }
-
- fn get_ns(&self) -> BackupNamespace {
- self.ns.clone()
- }
-
- fn get_store(&self) -> &str {
- self.store.name()
- }
-
- async fn reader(
- &self,
- ns: &BackupNamespace,
- dir: &BackupDir,
- ) -> Result<Arc<dyn SyncSourceReader>, Error> {
- let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
- let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
- &dir.full_path(),
- "snapshot",
- "locked by another operation",
- )?;
- Ok(Arc::new(LocalSourceReader {
- _dir_lock: Arc::new(Mutex::new(dir_lock)),
- path: dir.full_path(),
- datastore: dir.datastore().clone(),
- }))
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
- source: Arc<dyn PullSource>,
+ source: Arc<dyn SyncSource>,
/// Where data should be pulled into
target: PullTarget,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
@@ -341,7 +78,7 @@ impl PullParameters {
};
let remove_vanished = remove_vanished.unwrap_or(false);
- let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+ let source: Arc<dyn SyncSource> = if let Some(remote) = remote {
let (remote_config, _digest) = pbs_config::remote::config()?;
let remote: Remote = remote_config.lookup("remote", remote)?;
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 323bc1a27..f8a1e133d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -6,18 +6,24 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::{bail, Error};
+use anyhow::{bail, format_err, Error};
use http::StatusCode;
-use tracing::info;
+use serde_json::json;
+use tracing::{info, warn};
use proxmox_router::HttpError;
-use pbs_api_types::{BackupDir, CryptMode};
-use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_api_types::{
+ Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
+ MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{DataStore, LocalChunkReader};
+use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
+
+use crate::backup::ListAccessibleBackupGroups;
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
@@ -201,3 +207,263 @@ impl SyncSourceReader for LocalSourceReader {
self.datastore.name() == target_store_name
}
}
+
+#[async_trait::async_trait]
+/// `SyncSource` is a trait that provides an interface for synchronizing data/information from a
+/// source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source.
+pub(crate) trait SyncSource: Send + Sync {
+ /// Lists namespaces from the source.
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error>;
+
+ /// Lists groups within a specific namespace from the source.
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error>;
+
+ /// Lists backup directories for a specific group within a specific namespace from the source.
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error>;
+ fn get_ns(&self) -> BackupNamespace;
+ fn get_store(&self) -> &str;
+
+ /// Returns a reader for reading data from a specific backup directory.
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+}
+
+pub(crate) struct RemoteSource {
+ pub(crate) repo: BackupRepository,
+ pub(crate) ns: BackupNamespace,
+ pub(crate) client: HttpClient,
+}
+
+pub(crate) struct LocalSource {
+ pub(crate) store: Arc<DataStore>,
+ pub(crate) ns: BackupNamespace,
+}
+
+#[async_trait::async_trait]
+impl SyncSource for RemoteSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+ return Ok(vec![self.ns.clone()]);
+ }
+
+ let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+ let mut data = json!({});
+ if let Some(max_depth) = max_depth {
+ data["max-depth"] = json!(max_depth);
+ }
+
+ if !self.ns.is_root() {
+ data["parent"] = json!(self.ns);
+ }
+ self.client.login().await?;
+
+ let mut result = match self.client.get(&path, Some(data)).await {
+ Ok(res) => res,
+ Err(err) => match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match code {
+ &StatusCode::NOT_FOUND => {
+ if self.ns.is_root() && max_depth.is_none() {
+ warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+ warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+ max_depth.replace(0);
+ } else {
+ bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+ }
+
+ return Ok(vec![self.ns.clone()]);
+ }
+ _ => {
+ bail!("Querying namespaces failed - HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ bail!("Querying namespaces failed - {err}");
+ }
+ },
+ };
+
+ let list: Vec<BackupNamespace> =
+ serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+ .into_iter()
+ .map(|list_item| list_item.ns)
+ .collect();
+
+ Ok(list)
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ _owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+ let args = if !namespace.is_root() {
+ Some(json!({ "ns": namespace.clone() }))
+ } else {
+ None
+ };
+
+ self.client.login().await?;
+ let mut result =
+ self.client.get(&path, args).await.map_err(|err| {
+ format_err!("Failed to retrieve backup groups from remote - {}", err)
+ })?;
+
+ Ok(
+ serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+ .map_err(Error::from)?
+ .into_iter()
+ .map(|item| item.backup)
+ .collect::<Vec<BackupGroup>>(),
+ )
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error> {
+ let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+ let mut args = json!({
+ "backup-type": group.ty,
+ "backup-id": group.id,
+ });
+
+ if !namespace.is_root() {
+ args["ns"] = serde_json::to_value(namespace)?;
+ }
+
+ self.client.login().await?;
+
+ let mut result = self.client.get(&path, Some(args)).await?;
+ let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+ Ok(snapshot_list
+ .into_iter()
+ .filter_map(|item: SnapshotListItem| {
+ let snapshot = item.backup;
+ // in-progress backups can't be synced
+ if item.size.is_none() {
+ info!("skipping snapshot {snapshot} - in-progress backup");
+ return None;
+ }
+
+ Some(snapshot)
+ })
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn get_store(&self) -> &str {
+ self.repo.store()
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ let backup_reader =
+ BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+ Ok(Arc::new(RemoteSourceReader {
+ backup_reader,
+ dir: dir.clone(),
+ }))
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSource for LocalSource {
+ async fn list_namespaces(
+ &self,
+ max_depth: &mut Option<usize>,
+ ) -> Result<Vec<BackupNamespace>, Error> {
+ ListNamespacesRecursive::new_max_depth(
+ self.store.clone(),
+ self.ns.clone(),
+ max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+ )?
+ .collect()
+ }
+
+ async fn list_groups(
+ &self,
+ namespace: &BackupNamespace,
+ owner: &Authid,
+ ) -> Result<Vec<BackupGroup>, Error> {
+ Ok(ListAccessibleBackupGroups::new_with_privs(
+ &self.store,
+ namespace.clone(),
+ 0,
+ Some(PRIV_DATASTORE_READ),
+ Some(PRIV_DATASTORE_BACKUP),
+ Some(owner),
+ )?
+ .filter_map(Result::ok)
+ .map(|backup_group| backup_group.group().clone())
+ .collect::<Vec<pbs_api_types::BackupGroup>>())
+ }
+
+ async fn list_backup_dirs(
+ &self,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ ) -> Result<Vec<BackupDir>, Error> {
+ Ok(self
+ .store
+ .backup_group(namespace.clone(), group.clone())
+ .iter_snapshots()?
+ .filter_map(Result::ok)
+ .map(|snapshot| snapshot.dir().to_owned())
+ .collect::<Vec<BackupDir>>())
+ }
+
+ fn get_ns(&self) -> BackupNamespace {
+ self.ns.clone()
+ }
+
+ fn get_store(&self) -> &str {
+ self.store.name()
+ }
+
+ async fn reader(
+ &self,
+ ns: &BackupNamespace,
+ dir: &BackupDir,
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+ let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+ let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+ &dir.full_path(),
+ "snapshot",
+ "locked by another operation",
+ )?;
+ Ok(Arc::new(LocalSourceReader {
+ _dir_lock: Arc::new(Mutex::new(dir_lock)),
+ path: dir.full_path(),
+ datastore: dir.datastore().clone(),
+ }))
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list