[pbs-devel] [PATCH v2 proxmox-backup 03/31] server: sync: move reader trait to common sync module
Christian Ebner
c.ebner at proxmox.com
Thu Aug 1 09:43:35 CEST 2024
Move the `PullReader` trait and the types implementing it to the
common sync module, so this can be reused for the push direction
variant for a sync job as well.
Adapt the naming to be more ambiguous by renaming `PullReader` trait to
`SyncSourceReader`, `LocalReader` to `LocalSourceReader` and
`RemoteReader` to `RemoteSourceReader`.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
changes since version 1:
- no changes
src/server/pull.rs | 167 +++++----------------------------------------
src/server/sync.rs | 152 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 168 insertions(+), 151 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 24422ef41..5efe2d5f7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,7 @@
//! Sync datastore by pulling contents from remote server
-use std::collections::{HashMap, HashSet};
-use std::io::{Seek, Write};
-use std::path::{Path, PathBuf};
+use std::collections::HashSet;
+use std::io::Seek;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
@@ -15,11 +14,11 @@ use serde_json::json;
use tracing::{info, warn};
use pbs_api_types::{
- print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+ print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter,
GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
-use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_client::{BackupReader, BackupRepository, HttpClient};
use pbs_config::CachedUserInfo;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -29,26 +28,15 @@ use pbs_datastore::manifest::{
ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
};
use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{
- check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
-};
+use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress};
use pbs_tools::sha::sha256;
-use super::sync::{RemovedVanishedStats, SyncStats};
+use super::sync::{
+ LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats,
+};
use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
use crate::tools::parallel_handler::ParallelHandler;
-struct RemoteReader {
- backup_reader: Arc<BackupReader>,
- dir: BackupDir,
-}
-
-struct LocalReader {
- _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
- path: PathBuf,
- datastore: Arc<DataStore>,
-}
-
pub(crate) struct PullTarget {
store: Arc<DataStore>,
ns: BackupNamespace,
@@ -97,7 +85,7 @@ trait PullSource: Send + Sync {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error>;
+ ) -> Result<Arc<dyn SyncSourceReader>, Error>;
}
#[async_trait::async_trait]
@@ -230,10 +218,10 @@ impl PullSource for RemoteSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let backup_reader =
BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
- Ok(Arc::new(RemoteReader {
+ Ok(Arc::new(RemoteSourceReader {
backup_reader,
dir: dir.clone(),
}))
@@ -298,14 +286,14 @@ impl PullSource for LocalSource {
&self,
ns: &BackupNamespace,
dir: &BackupDir,
- ) -> Result<Arc<dyn PullReader>, Error> {
+ ) -> Result<Arc<dyn SyncSourceReader>, Error> {
let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
&dir.full_path(),
"snapshot",
"locked by another operation",
)?;
- Ok(Arc::new(LocalReader {
+ Ok(Arc::new(LocalSourceReader {
_dir_lock: Arc::new(Mutex::new(dir_lock)),
path: dir.full_path(),
datastore: dir.datastore().clone(),
@@ -313,129 +301,6 @@ impl PullSource for LocalSource {
}
}
-#[async_trait::async_trait]
-/// `PullReader` is a trait that provides an interface for reading data from a source.
-/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
-trait PullReader: Send + Sync {
- /// Returns a chunk reader with the specified encryption mode.
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
-
- /// Asynchronously loads a file from the source into a local file.
- /// `filename` is the name of the file to load from the source.
- /// `into` is the path of the local file to load the source file into.
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
-
- /// Tries to download the client log from the source and save it into a local file.
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
-}
-
-#[async_trait::async_trait]
-impl PullReader for RemoteReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(RemoteChunkReader::new(
- self.backup_reader.clone(),
- None,
- crypt_mode,
- HashMap::new(),
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
- if let Err(err) = download_result {
- match err.downcast_ref::<HttpError>() {
- Some(HttpError { code, message }) => match *code {
- StatusCode::NOT_FOUND => {
- info!(
- "skipping snapshot {} - vanished since start of sync",
- &self.dir,
- );
- return Ok(None);
- }
- _ => {
- bail!("HTTP error {code} - {message}");
- }
- },
- None => {
- return Err(err);
- }
- };
- };
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
- let mut tmp_path = to_path.to_owned();
- tmp_path.set_extension("tmp");
-
- let tmpfile = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .read(true)
- .open(&tmp_path)?;
-
- // Note: be silent if there is no log - only log successful download
- if let Ok(()) = self
- .backup_reader
- .download(CLIENT_LOG_BLOB_NAME, tmpfile)
- .await
- {
- if let Err(err) = std::fs::rename(&tmp_path, to_path) {
- bail!("Atomic rename file {:?} failed - {}", to_path, err);
- }
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
- }
-
- Ok(())
- }
-
- fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
- false
- }
-}
-
-#[async_trait::async_trait]
-impl PullReader for LocalReader {
- fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
- Arc::new(LocalChunkReader::new(
- self.datastore.clone(),
- None,
- crypt_mode,
- ))
- }
-
- async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
- let mut tmp_file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .truncate(true)
- .read(true)
- .open(into)?;
- let mut from_path = self.path.clone();
- from_path.push(filename);
- tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
- tmp_file.rewind()?;
- Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
- }
-
- async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
- Ok(())
- }
-
- fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
- self.datastore.name() == target_store_name
- }
-}
-
/// Parameters for a pull operation.
pub(crate) struct PullParameters {
/// Where data is pulled from
@@ -643,7 +508,7 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
async fn pull_single_archive<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -726,7 +591,7 @@ async fn pull_single_archive<'a>(
/// -- if not, pull it from the remote
/// - Download log if not already existing
async fn pull_snapshot<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -837,7 +702,7 @@ async fn pull_snapshot<'a>(
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
async fn pull_snapshot_from<'a>(
- reader: Arc<dyn PullReader + 'a>,
+ reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 5f143ef63..323bc1a27 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,7 +1,24 @@
//! Sync datastore contents from source to target, either in push or pull direction
+use std::collections::HashMap;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use std::time::Duration;
+use anyhow::{bail, Error};
+use http::StatusCode;
+use tracing::info;
+
+use proxmox_router::HttpError;
+
+use pbs_api_types::{BackupDir, CryptMode};
+use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME;
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{DataStore, LocalChunkReader};
+
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
pub(crate) groups: usize,
@@ -49,3 +66,138 @@ impl SyncStats {
}
}
}
+
+#[async_trait::async_trait]
+/// `SyncReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log,
+/// and checking whether chunk sync should be skipped.
+pub(crate) trait SyncSourceReader: Send + Sync {
+ /// Returns a chunk reader with the specified encryption mode.
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+ /// Asynchronously loads a file from the source into a local file.
+ /// `filename` is the name of the file to load from the source.
+ /// `into` is the path of the local file to load the source file into.
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error>;
+
+ /// Tries to download the client log from the source and save it into a local file.
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error>;
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+pub(crate) struct RemoteSourceReader {
+ pub(crate) backup_reader: Arc<BackupReader>,
+ pub(crate) dir: BackupDir,
+}
+
+pub(crate) struct LocalSourceReader {
+ pub(crate) _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+ pub(crate) path: PathBuf,
+ pub(crate) datastore: Arc<DataStore>,
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for RemoteSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(RemoteChunkReader::new(
+ self.backup_reader.clone(),
+ None,
+ crypt_mode,
+ HashMap::new(),
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+ if let Err(err) = download_result {
+ match err.downcast_ref::<HttpError>() {
+ Some(HttpError { code, message }) => match *code {
+ StatusCode::NOT_FOUND => {
+ info!(
+ "skipping snapshot {} - vanished since start of sync",
+ &self.dir
+ );
+ return Ok(None);
+ }
+ _ => {
+ bail!("HTTP error {code} - {message}");
+ }
+ },
+ None => {
+ return Err(err);
+ }
+ };
+ };
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, to_path: &Path) -> Result<(), Error> {
+ let mut tmp_path = to_path.to_owned();
+ tmp_path.set_extension("tmp");
+
+ let tmpfile = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .read(true)
+ .open(&tmp_path)?;
+
+ // Note: be silent if there is no log - only log successful download
+ if let Ok(()) = self
+ .backup_reader
+ .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+ .await
+ {
+ if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+ bail!("Atomic rename file {to_path:?} failed - {err}");
+ }
+ info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
+ }
+
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+ false
+ }
+}
+
+#[async_trait::async_trait]
+impl SyncSourceReader for LocalSourceReader {
+ fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+ Arc::new(LocalChunkReader::new(
+ self.datastore.clone(),
+ None,
+ crypt_mode,
+ ))
+ }
+
+ async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
+ let mut tmp_file = std::fs::OpenOptions::new()
+ .write(true)
+ .create(true)
+ .truncate(true)
+ .read(true)
+ .open(into)?;
+ let mut from_path = self.path.clone();
+ from_path.push(filename);
+ tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+ tmp_file.rewind()?;
+ Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+ }
+
+ async fn try_download_client_log(&self, _to_path: &Path) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+ self.datastore.name() == target_store_name
+ }
+}
--
2.39.2
More information about the pbs-devel
mailing list