[pbs-devel] [PATCH v3 proxmox-backup 03/33] server: sync: move reader trait to common sync module

Christian Ebner c.ebner at proxmox.com
Thu Sep 12 16:32:52 CEST 2024


Move the `PullReader` trait and the types implementing it to the
common sync module, so this can be reused for the push direction
variant for a sync job as well.

Adapt the naming to be more ambiguous by renaming `PullReader` trait to
`SyncSourceReader`, `LocalReader` to `LocalSourceReader` and
`RemoteReader` to `RemoteSourceReader`.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 2:
- no changes

 src/server/pull.rs | 167 +++++----------------------------------------
 src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 168 insertions(+), 151 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 4a97bfaa3..20b5d9af8 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,7 @@
 //! Sync datastore by pulling contents from remote server
 
-use std::collections::{HashMap, HashSet};
-use std::io::{Seek, Write};
-use std::path::{Path, PathBuf};
+use std::collections::HashSet;
+use std::io::Seek;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -15,11 +14,11 @@ use serde_json::json;
 use tracing::{info, warn};
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
     GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
 };
-use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_client::{BackupReader, BackupRepository, HttpClient};
 use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -29,26 +28,15 @@ use pbs_datastore::manifest::{
     ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{
-    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
-};
+use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
 use pbs_tools::sha::sha256;
 
-use super::sync::{RemovedVanishedStats, SyncStats};
+use super::sync::{
+    LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+};
 use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
-struct RemoteReader {
-    backup_reader: Arc<BackupReader>,
-    dir: BackupDir,
-}
-
-struct LocalReader {
-    _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
-    path: PathBuf,
-    datastore: Arc<DataStore>,
-}
-
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
     ns: BackupNamespace,
@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn PullReader>, Error>;
+    ) -> Result<Arc<dyn SyncSourceReader>, Error>;
 }
 
 #[async_trait::async_trait]
@@ -230,7 +218,7 @@ impl PullSource for RemoteSource {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn PullReader>, Error> {
+    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
         let backup_reader = BackupReader::start(
             &self.client,
             None,
@@ -240,7 +228,7 @@ impl PullSource for RemoteSource {
             tracing::enabled!(tracing::Level::DEBUG),
         )
         .await?;
-        Ok(Arc::new(RemoteReader {
+        Ok(Arc::new(RemoteSourceReader {
             backup_reader,
             dir: dir.clone(),
         }))
@@ -305,14 +293,14 @@ impl PullSource for LocalSource {
         &self,
         ns: &BackupNamespace,
         dir: &BackupDir,
-    ) -> Result<Arc<dyn PullReader>, Error> {
+    ) -> Result<Arc<dyn SyncSourceReader>, Error> {
         let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
         let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
             &dir.full_path(),
             "snapshot",
             "locked by another operation",
         )?;
-        Ok(Arc::new(LocalReader {
+        Ok(Arc::new(LocalSourceReader {
             _dir_lock: Arc::new(Mutex::new(dir_lock)),
             path: dir.full_path(),
             datastore: dir.datastore().clone(),
@@ -320,129 +308,6 @@ impl PullSource for LocalSource {
     }
 }
 
-#[async_trait::async_trait]
-/// `PullReader` is a trait that provides an interface for reading data from a source.
-/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
-trait PullReader: Send + Sync {
-    /// Returns a chunk reader with the specified encryption mode.
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
-
-    /// Asynchronously loads a file from the source into a local file.
-    /// `filename` is the name of the file to load from the source.
-    /// `into` is the path of the local file to load the source file into.
-    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
-
-    /// Tries to download the client log from the source and save it into a local file.
-    async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
-
-    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
-}
-
-#[async_trait::async_trait]
-impl PullReader for RemoteReader {
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
-        Arc::new(RemoteChunkReader::new(
-            self.backup_reader.clone(),
-            None,
-            crypt_mode,
-            HashMap::new(),
-        ))
-    }
-
-    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
-        let mut tmp_file = std::fs::OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .read(true)
-            .open(into)?;
-        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
-        if let Err(err) = download_result {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        info!(
-                            "skipping snapshot {} - vanished since start of sync",
-                            &self.dir,
-                        );
-                        return Ok(None);
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        };
-        tmp_file.rewind()?;
-        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
-    }
-
-    async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
-        let mut tmp_path = to_path.to_owned();
-        tmp_path.set_extension("tmp");
-
-        let tmpfile = std::fs::OpenOptions::new()
-            .write(true)
-            .create(true)
-            .read(true)
-            .open(&tmp_path)?;
-
-        // Note: be silent if there is no log - only log successful download
-        if let Ok(()) = self
-            .backup_reader
-            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
-            .await
-        {
-            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
-                bail!("Atomic rename file {:?} failed - {}", to_path, err);
-            }
-            info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
-        }
-
-        Ok(())
-    }
-
-    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
-        false
-    }
-}
-
-#[async_trait::async_trait]
-impl PullReader for LocalReader {
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
-        Arc::new(LocalChunkReader::new(
-            self.datastore.clone(),
-            None,
-            crypt_mode,
-        ))
-    }
-
-    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
-        let mut tmp_file = std::fs::OpenOptions::new()
-            .write(true)
-            .create(true)
-            .truncate(true)
-            .read(true)
-            .open(into)?;
-        let mut from_path = self.path.clone();
-        from_path.push(filename);
-        tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
-        tmp_file.rewind()?;
-        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
-    }
-
-    async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
-        Ok(())
-    }
-
-    fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
-        self.datastore.name() == target_store_name
-    }
-}
-
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Where data is pulled from
@@ -650,7 +515,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
 async fn pull_single_archive<'a>(
-    reader: Arc<dyn PullReader + 'a>,
+    reader: Arc<dyn SyncSourceReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -733,7 +598,7 @@ async fn pull_single_archive<'a>(
 /// -- if not, pull it from the remote
 /// - Download log if not already existing
 async fn pull_snapshot<'a>(
-    reader: Arc<dyn PullReader + 'a>,
+    reader: Arc<dyn SyncSourceReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<SyncStats, Error> {
@@ -844,7 +709,7 @@ async fn pull_snapshot<'a>(
 /// The `reader` is configured to read from the source backup directory, while the
 /// `snapshot` is pointing to the local datastore and target namespace.
 async fn pull_snapshot_from<'a>(
-    reader: Arc<dyn PullReader + 'a>,
+    reader: Arc<dyn SyncSourceReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<SyncStats, Error> {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 5f143ef63..323bc1a27 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,7 +1,24 @@
 //! Sync datastore contents from source to target, either in push or pull direction
 
+use std::collections::HashMap;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
+use anyhow::{bail, Error};
+use http::StatusCode;
+use tracing::info;
+
+use proxmox_router::HttpError;
+
+use pbs_api_types::{BackupDir, CryptMode};
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{DataStore, LocalChunkReader};
+
 #[derive(Default)]
 pub(crate) struct RemovedVanishedStats {
     pub(crate) groups: usize,
@@ -49,3 +66,138 @@ impl SyncStats {
         }
     }
 }
+
+#[async_trait::async_trait]
+/// `SyncReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log,
+/// and checking whether chunk sync should be skipped.
+pub(crate) trait SyncSourceReader: Send + Sync {
+    /// Returns a chunk reader with the specified encryption mode.
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+    /// Asynchronously loads a file from the source into a local file.
+    /// `filename` is the name of the file to load from the source.
+    /// `into` is the path of the local file to load the source file into.
+    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
+
+    /// Tries to download the client log from the source and save it into a local file.
+    async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+pub(crate) struct RemoteSourceReader {
+    pub(crate) backup_reader: Arc<BackupReader>,
+    pub(crate) dir: BackupDir,
+}
+
+pub(crate) struct LocalSourceReader {
+    pub(crate) _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+    pub(crate) path: PathBuf,
+    pub(crate) datastore: Arc<DataStore>,
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for RemoteSourceReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(RemoteChunkReader::new(
+            self.backup_reader.clone(),
+            None,
+            crypt_mode,
+            HashMap::new(),
+        ))
+    }
+
+    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+        if let Err(err) = download_result {
+            match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        info!(
+                            "skipping snapshot {} - vanished since start of sync",
+                            &self.dir
+                        );
+                        return Ok(None);
+                    }
+                    _ => {
+                        bail!("HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    return Err(err);
+                }
+            };
+        };
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
+        let mut tmp_path = to_path.to_owned();
+        tmp_path.set_extension("tmp");
+
+        let tmpfile = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .read(true)
+            .open(&tmp_path)?;
+
+        // Note: be silent if there is no log - only log successful download
+        if let Ok(()) = self
+            .backup_reader
+            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+            .await
+        {
+            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                bail!("Atomic rename file {to_path:?} failed - {err}");
+            }
+            info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
+        }
+
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+        false
+    }
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for LocalSourceReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(LocalChunkReader::new(
+            self.datastore.clone(),
+            None,
+            crypt_mode,
+        ))
+    }
+
+    async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let mut from_path = self.path.clone();
+        from_path.push(filename);
+        tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+        self.datastore.name() == target_store_name
+    }
+}
-- 
2.39.2





More information about the pbs-devel mailing list