[pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source to common sync module

Christian Ebner c.ebner at proxmox.com
Mon Jul 15 12:15:46 CEST 2024


Rename the `PullSource` trait to `SyncSource` and move the trait and
types implementing it to the common sync module, making them
reusable for both sync directions, push and pull.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 src/server/pull.rs | 281 ++-------------------------------------------
 src/server/sync.rs | 276 +++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 280 insertions(+), 277 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5efe2d5f7..c6932dcc5 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,18 +7,14 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
-use http::StatusCode;
 use proxmox_human_byte::HumanByte;
-use proxmox_router::HttpError;
-use serde_json::json;
-use tracing::{info, warn};
+use tracing::info;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
-    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
-    PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, Operation,
+    RateLimitConfig, Remote, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-use pbs_client::{BackupReader, BackupRepository, HttpClient};
+use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -28,13 +24,13 @@ use pbs_datastore::manifest::{
     ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
+use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
 
 use super::sync::{
-    LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+    LocalSource, RemoteSource, RemovedVanishedStats, SyncSource, SyncSourceReader, SyncStats,
 };
-use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
+use crate::backup::{check_ns_modification_privs, check_ns_privs};
 use crate::tools::parallel_handler::ParallelHandler;
 
 pub(crate) struct PullTarget {
@@ -42,269 +38,10 @@ pub(crate) struct PullTarget {
     ns: BackupNamespace,
 }
 
-pub(crate) struct RemoteSource {
-    repo: BackupRepository,
-    ns: BackupNamespace,
-    client: HttpClient,
-}
-
-pub(crate) struct LocalSource {
-    store: Arc<DataStore>,
-    ns: BackupNamespace,
-}
-
-#[async_trait::async_trait]
-/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
-/// The trait includes methods for listing namespaces, groups, and backup directories,
-/// as well as retrieving a reader for reading data from the source
-trait PullSource: Send + Sync {
-    /// Lists namespaces from the source.
-    async fn list_namespaces(
-        &self,
-        max_depth: &mut Option<usize>,
-    ) -> Result<Vec<BackupNamespace>, Error>;
-
-    /// Lists groups within a specific namespace from the source.
-    async fn list_groups(
-        &self,
-        namespace: &BackupNamespace,
-        owner: &Authid,
-    ) -> Result<Vec<BackupGroup>, Error>;
-
-    /// Lists backup directories for a specific group within a specific namespace from the source.
-    async fn list_backup_dirs(
-        &self,
-        namespace: &BackupNamespace,
-        group: &BackupGroup,
-    ) -> Result<Vec<BackupDir>, Error>;
-    fn get_ns(&self) -> BackupNamespace;
-    fn get_store(&self) -> &str;
-
-    /// Returns a reader for reading data from a specific backup directory.
-    async fn reader(
-        &self,
-        ns: &BackupNamespace,
-        dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error>;
-}
-
-#[async_trait::async_trait]
-impl PullSource for RemoteSource {
-    async fn list_namespaces(
-        &self,
-        max_depth: &mut Option<usize>,
-    ) -> Result<Vec<BackupNamespace>, Error> {
-        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
-            return Ok(vec![self.ns.clone()]);
-        }
-
-        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
-        let mut data = json!({});
-        if let Some(max_depth) = max_depth {
-            data["max-depth"] = json!(max_depth);
-        }
-
-        if !self.ns.is_root() {
-            data["parent"] = json!(self.ns);
-        }
-        self.client.login().await?;
-
-        let mut result = match self.client.get(&path, Some(data)).await {
-            Ok(res) => res,
-            Err(err) => match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match code {
-                    &StatusCode::NOT_FOUND => {
-                        if self.ns.is_root() && max_depth.is_none() {
-                            warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                            warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                            max_depth.replace(0);
-                        } else {
-                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                        }
-
-                        return Ok(vec![self.ns.clone()]);
-                    }
-                    _ => {
-                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    bail!("Querying namespaces failed - {err}");
-                }
-            },
-        };
-
-        let list: Vec<BackupNamespace> =
-            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
-                .into_iter()
-                .map(|list_item| list_item.ns)
-                .collect();
-
-        Ok(list)
-    }
-
-    async fn list_groups(
-        &self,
-        namespace: &BackupNamespace,
-        _owner: &Authid,
-    ) -> Result<Vec<BackupGroup>, Error> {
-        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
-
-        let args = if !namespace.is_root() {
-            Some(json!({ "ns": namespace.clone() }))
-        } else {
-            None
-        };
-
-        self.client.login().await?;
-        let mut result =
-            self.client.get(&path, args).await.map_err(|err| {
-                format_err!("Failed to retrieve backup groups from remote - {}", err)
-            })?;
-
-        Ok(
-            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
-                .map_err(Error::from)?
-                .into_iter()
-                .map(|item| item.backup)
-                .collect::<Vec<BackupGroup>>(),
-        )
-    }
-
-    async fn list_backup_dirs(
-        &self,
-        namespace: &BackupNamespace,
-        group: &BackupGroup,
-    ) -> Result<Vec<BackupDir>, Error> {
-        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
-
-        let mut args = json!({
-            "backup-type": group.ty,
-            "backup-id": group.id,
-        });
-
-        if !namespace.is_root() {
-            args["ns"] = serde_json::to_value(namespace)?;
-        }
-
-        self.client.login().await?;
-
-        let mut result = self.client.get(&path, Some(args)).await?;
-        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-        Ok(snapshot_list
-            .into_iter()
-            .filter_map(|item: SnapshotListItem| {
-                let snapshot = item.backup;
-                // in-progress backups can't be synced
-                if item.size.is_none() {
-                    info!("skipping snapshot {snapshot} - in-progress backup");
-                    return None;
-                }
-
-                Some(snapshot)
-            })
-            .collect::<Vec<BackupDir>>())
-    }
-
-    fn get_ns(&self) -> BackupNamespace {
-        self.ns.clone()
-    }
-
-    fn get_store(&self) -> &str {
-        self.repo.store()
-    }
-
-    async fn reader(
-        &self,
-        ns: &BackupNamespace,
-        dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
-        let backup_reader =
-            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
-        Ok(Arc::new(RemoteSourceReader {
-            backup_reader,
-            dir: dir.clone(),
-        }))
-    }
-}
-
-#[async_trait::async_trait]
-impl PullSource for LocalSource {
-    async fn list_namespaces(
-        &self,
-        max_depth: &mut Option<usize>,
-    ) -> Result<Vec<BackupNamespace>, Error> {
-        ListNamespacesRecursive::new_max_depth(
-            self.store.clone(),
-            self.ns.clone(),
-            max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
-        )?
-        .collect()
-    }
-
-    async fn list_groups(
-        &self,
-        namespace: &BackupNamespace,
-        owner: &Authid,
-    ) -> Result<Vec<BackupGroup>, Error> {
-        Ok(ListAccessibleBackupGroups::new_with_privs(
-            &self.store,
-            namespace.clone(),
-            0,
-            Some(PRIV_DATASTORE_READ),
-            Some(PRIV_DATASTORE_BACKUP),
-            Some(owner),
-        )?
-        .filter_map(Result::ok)
-        .map(|backup_group| backup_group.group().clone())
-        .collect::<Vec<pbs_api_types::BackupGroup>>())
-    }
-
-    async fn list_backup_dirs(
-        &self,
-        namespace: &BackupNamespace,
-        group: &BackupGroup,
-    ) -> Result<Vec<BackupDir>, Error> {
-        Ok(self
-            .store
-            .backup_group(namespace.clone(), group.clone())
-            .iter_snapshots()?
-            .filter_map(Result::ok)
-            .map(|snapshot| snapshot.dir().to_owned())
-            .collect::<Vec<BackupDir>>())
-    }
-
-    fn get_ns(&self) -> BackupNamespace {
-        self.ns.clone()
-    }
-
-    fn get_store(&self) -> &str {
-        self.store.name()
-    }
-
-    async fn reader(
-        &self,
-        ns: &BackupNamespace,
-        dir: &BackupDir,
-    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
-        let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
-        let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
-            &dir.full_path(),
-            "snapshot",
-            "locked by another operation",
-        )?;
-        Ok(Arc::new(LocalSourceReader {
-            _dir_lock: Arc::new(Mutex::new(dir_lock)),
-            path: dir.full_path(),
-            datastore: dir.datastore().clone(),
-        }))
-    }
-}
-
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Where data is pulled from
-    source: Arc<dyn PullSource>,
+    source: Arc<dyn SyncSource>,
     /// Where data should be pulled into
     target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
@@ -341,7 +78,7 @@ impl PullParameters {
         };
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+        let source: Arc<dyn SyncSource> = if let Some(remote) = remote {
             let (remote_config, _digest) = pbs_config::remote::config()?;
             let remote: Remote = remote_config.lookup("remote", remote)?;
 
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 323bc1a27..f8a1e133d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -6,18 +6,24 @@ use std::path::{Path, PathBuf};
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
-use anyhow::{bail, Error};
+use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use tracing::info;
+use serde_json::json;
+use tracing::{info, warn};
 
 use proxmox_router::HttpError;
 
-use pbs_api_types::{BackupDir, CryptMode};
-use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_api_types::{
+    Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
+    MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{DataStore, LocalChunkReader};
+use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
+
+use crate::backup::ListAccessibleBackupGroups;
 
 #[derive(Default)]
 pub(crate) struct RemovedVanishedStats {
@@ -201,3 +207,263 @@ impl SyncSourceReader for LocalSourceReader {
         self.datastore.name() == target_store_name
     }
 }
+
+#[async_trait::async_trait]
+/// `SyncSource` is a trait that provides an interface for synchronizing data/information from a
+/// source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source.
+pub(crate) trait SyncSource: Send + Sync {
+    /// Lists namespaces from the source.
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+    ) -> Result<Vec<BackupNamespace>, Error>;
+
+    /// Lists groups within a specific namespace from the source.
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error>;
+
+    /// Lists backup directories for a specific group within a specific namespace from the source.
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+    ) -> Result<Vec<BackupDir>, Error>;
+    fn get_ns(&self) -> BackupNamespace;
+    fn get_store(&self) -> &str;
+
+    /// Returns a reader for reading data from a specific backup directory.
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn SyncSourceReader>, Error>;
+}
+
+pub(crate) struct RemoteSource {
+    pub(crate) repo: BackupRepository,
+    pub(crate) ns: BackupNamespace,
+    pub(crate) client: HttpClient,
+}
+
+pub(crate) struct LocalSource {
+    pub(crate) store: Arc<DataStore>,
+    pub(crate) ns: BackupNamespace,
+}
+
+#[async_trait::async_trait]
+impl SyncSource for RemoteSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+            return Ok(vec![self.ns.clone()]);
+        }
+
+        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+        let mut data = json!({});
+        if let Some(max_depth) = max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
+
+        if !self.ns.is_root() {
+            data["parent"] = json!(self.ns);
+        }
+        self.client.login().await?;
+
+        let mut result = match self.client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match code {
+                    &StatusCode::NOT_FOUND => {
+                        if self.ns.is_root() && max_depth.is_none() {
+                            warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            max_depth.replace(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
+
+                        return Ok(vec![self.ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
+                }
+            },
+        };
+
+        let list: Vec<BackupNamespace> =
+            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+                .into_iter()
+                .map(|list_item| list_item.ns)
+                .collect();
+
+        Ok(list)
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        _owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+        let args = if !namespace.is_root() {
+            Some(json!({ "ns": namespace.clone() }))
+        } else {
+            None
+        };
+
+        self.client.login().await?;
+        let mut result =
+            self.client.get(&path, args).await.map_err(|err| {
+                format_err!("Failed to retrieve backup groups from remote - {}", err)
+            })?;
+
+        Ok(
+            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                .map_err(Error::from)?
+                .into_iter()
+                .map(|item| item.backup)
+                .collect::<Vec<BackupGroup>>(),
+        )
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+    ) -> Result<Vec<BackupDir>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
+
+        if !namespace.is_root() {
+            args["ns"] = serde_json::to_value(namespace)?;
+        }
+
+        self.client.login().await?;
+
+        let mut result = self.client.get(&path, Some(args)).await?;
+        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        Ok(snapshot_list
+            .into_iter()
+            .filter_map(|item: SnapshotListItem| {
+                let snapshot = item.backup;
+                // in-progress backups can't be synced
+                if item.size.is_none() {
+                    info!("skipping snapshot {snapshot} - in-progress backup");
+                    return None;
+                }
+
+                Some(snapshot)
+            })
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn get_store(&self) -> &str {
+        self.repo.store()
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+        let backup_reader =
+            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+        Ok(Arc::new(RemoteSourceReader {
+            backup_reader,
+            dir: dir.clone(),
+        }))
+    }
+}
+
+#[async_trait::async_trait]
+impl SyncSource for LocalSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        ListNamespacesRecursive::new_max_depth(
+            self.store.clone(),
+            self.ns.clone(),
+            max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+        )?
+        .collect()
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        Ok(ListAccessibleBackupGroups::new_with_privs(
+            &self.store,
+            namespace.clone(),
+            0,
+            Some(PRIV_DATASTORE_READ),
+            Some(PRIV_DATASTORE_BACKUP),
+            Some(owner),
+        )?
+        .filter_map(Result::ok)
+        .map(|backup_group| backup_group.group().clone())
+        .collect::<Vec<pbs_api_types::BackupGroup>>())
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+    ) -> Result<Vec<BackupDir>, Error> {
+        Ok(self
+            .store
+            .backup_group(namespace.clone(), group.clone())
+            .iter_snapshots()?
+            .filter_map(Result::ok)
+            .map(|snapshot| snapshot.dir().to_owned())
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn get_store(&self) -> &str {
+        self.store.name()
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
+        let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+        let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+            &dir.full_path(),
+            "snapshot",
+            "locked by another operation",
+        )?;
+        Ok(Arc::new(LocalSourceReader {
+            _dir_lock: Arc::new(Mutex::new(dir_lock)),
+            path: dir.full_path(),
+            datastore: dir.datastore().clone(),
+        }))
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list