[pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block
Hannes Laimer
h.laimer at proxmox.com
Mon May 26 16:14:40 CEST 2025
... based on whether they read or write.
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
src/api2/backup/environment.rs | 337 +++++++++++++++++----------------
1 file changed, 174 insertions(+), 163 deletions(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 3d541b46..a1620fb9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -13,6 +13,7 @@ use proxmox_sys::fs::{replace_file, CreateOptions};
use pbs_api_types::Authid;
use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::dynamic_index::DynamicIndexWriter;
use pbs_datastore::fixed_index::FixedIndexWriter;
use pbs_datastore::{DataBlob, DataStore};
@@ -54,17 +55,17 @@ impl std::ops::Add for UploadStatistic {
}
}
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
name: String,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
-struct FixedWriterState {
+struct FixedWriterState<T> {
name: String,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
size: usize,
chunk_size: u32,
chunk_count: u64,
@@ -76,18 +77,18 @@ struct FixedWriterState {
// key=digest, value=length
type KnownChunksMap = HashMap<[u8; 32], u32>;
-struct SharedBackupState {
+struct SharedBackupState<T> {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
- dynamic_writers: HashMap<usize, DynamicWriterState>,
- fixed_writers: HashMap<usize, FixedWriterState>,
+ dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+ fixed_writers: HashMap<usize, FixedWriterState<T>>,
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
}
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
@@ -105,26 +106,32 @@ impl SharedBackupState {
/// `RpcEnvironment` implementation for backup service
#[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
- pub last_backup: Option<BackupInfo>,
- state: Arc<Mutex<SharedBackupState>>,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
+ pub last_backup: Option<BackupInfo<T>>,
+ state: Arc<Mutex<SharedBackupState<T>>>,
}
-impl BackupEnvironment {
+impl<T: Send + Sync + 'static> BackupEnvironment<T> {
+ pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+ self.formatter.format_result(result, self)
+ }
+}
+
+impl<T> BackupEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
let state = SharedBackupState {
finished: false,
@@ -260,10 +267,148 @@ impl BackupEnvironment {
state.known_chunks.get(digest).copied()
}
+ fn log_upload_stat(
+ &self,
+ archive_name: &str,
+ csum: &[u8; 32],
+ uuid: &[u8; 16],
+ size: u64,
+ chunk_count: u64,
+ upload_stat: &UploadStatistic,
+ ) {
+ self.log(format!("Upload statistics for '{}'", archive_name));
+ self.log(format!("UUID: {}", hex::encode(uuid)));
+ self.log(format!("Checksum: {}", hex::encode(csum)));
+ self.log(format!("Size: {}", size));
+ self.log(format!("Chunk count: {}", chunk_count));
+
+ if size == 0 || chunk_count == 0 {
+ return;
+ }
+
+ self.log(format!(
+ "Upload size: {} ({}%)",
+ upload_stat.size,
+ (upload_stat.size * 100) / size
+ ));
+
+ // account for zero chunk, which might be uploaded but never used
+ let client_side_duplicates = if chunk_count < upload_stat.count {
+ 0
+ } else {
+ chunk_count - upload_stat.count
+ };
+
+ let server_side_duplicates = upload_stat.duplicates;
+
+ if (client_side_duplicates + server_side_duplicates) > 0 {
+ let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
+ self.log(format!(
+ "Duplicates: {}+{} ({}%)",
+ client_side_duplicates, server_side_duplicates, per
+ ));
+ }
+
+ if upload_stat.size > 0 {
+ self.log(format!(
+ "Compression: {}%",
+ (upload_stat.compressed_size * 100) / upload_stat.size
+ ));
+ }
+ }
+
+ pub fn log<S: AsRef<str>>(&self, msg: S) {
+ info!("{}", msg.as_ref());
+ }
+
+ pub fn debug<S: AsRef<str>>(&self, msg: S) {
+ if self.debug {
+ // This is kinda weird, we would like to use tracing::debug! here and automatically
+ // filter it, but self.debug is set from the client-side and the logs are printed on
+ // client and server side. This means that if the client sets the log level to debug,
+ // both server and client need to have 'debug' logs printed.
+ self.log(msg);
+ }
+ }
+
+ /// Raise error if finished flag is not set
+ pub fn ensure_finished(&self) -> Result<(), Error> {
+ let state = self.state.lock().unwrap();
+ if !state.finished {
+ bail!("backup ended but finished flag is not set.");
+ }
+ Ok(())
+ }
+
+ /// Return true if the finished flag is set
+ pub fn finished(&self) -> bool {
+ let state = self.state.lock().unwrap();
+ state.finished
+ }
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+ /// If verify-new is set on the datastore, this will run a new verify task
+ /// for the backup. If not, this will return and also drop the passed lock
+ /// immediately.
+ pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
+ self.ensure_finished()?;
+
+ if !self.datastore.verify_new() {
+ // no verify requested, do nothing
+ return Ok(());
+ }
+
+ // Downgrade to shared lock, the backup itself is finished
+ drop(excl_snap_lock);
+ let snap_lock = self.backup_dir.lock_shared().with_context(|| {
+ format!(
+ "while trying to verify snapshot '{:?}' after completion",
+ self.backup_dir
+ )
+ })?;
+ let worker_id = format!(
+ "{}:{}/{}/{:08X}",
+ self.datastore.name(),
+ self.backup_dir.backup_type(),
+ self.backup_dir.backup_id(),
+ self.backup_dir.backup_time()
+ );
+
+ let datastore = self.datastore.clone();
+ let backup_dir = self.backup_dir.clone();
+
+ WorkerTask::new_thread(
+ "verify",
+ Some(worker_id),
+ self.auth_id.to_string(),
+ false,
+ move |worker| {
+ worker.log_message("Automatically verifying newly added snapshot");
+
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ if !verify_backup_dir_with_lock(
+ &verify_worker,
+ &backup_dir,
+ worker.upid().clone(),
+ None,
+ snap_lock,
+ )? {
+ bail!("verification failed - please check the log for details");
+ }
+
+ Ok(())
+ },
+ )
+ .map(|_| ())
+ }
+}
+
+impl<T: CanWrite> BackupEnvironment<T> {
/// Store the writer with an unique ID
pub fn register_dynamic_writer(
&self,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
name: String,
) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@@ -289,7 +434,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_fixed_writer(
&self,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
name: String,
size: usize,
chunk_size: u32,
@@ -379,56 +524,6 @@ impl BackupEnvironment {
Ok(())
}
- fn log_upload_stat(
- &self,
- archive_name: &str,
- csum: &[u8; 32],
- uuid: &[u8; 16],
- size: u64,
- chunk_count: u64,
- upload_stat: &UploadStatistic,
- ) {
- self.log(format!("Upload statistics for '{}'", archive_name));
- self.log(format!("UUID: {}", hex::encode(uuid)));
- self.log(format!("Checksum: {}", hex::encode(csum)));
- self.log(format!("Size: {}", size));
- self.log(format!("Chunk count: {}", chunk_count));
-
- if size == 0 || chunk_count == 0 {
- return;
- }
-
- self.log(format!(
- "Upload size: {} ({}%)",
- upload_stat.size,
- (upload_stat.size * 100) / size
- ));
-
- // account for zero chunk, which might be uploaded but never used
- let client_side_duplicates = if chunk_count < upload_stat.count {
- 0
- } else {
- chunk_count - upload_stat.count
- };
-
- let server_side_duplicates = upload_stat.duplicates;
-
- if (client_side_duplicates + server_side_duplicates) > 0 {
- let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
- self.log(format!(
- "Duplicates: {}+{} ({}%)",
- client_side_duplicates, server_side_duplicates, per
- ));
- }
-
- if upload_stat.size > 0 {
- self.log(format!(
- "Compression: {}%",
- (upload_stat.compressed_size * 100) / upload_stat.size
- ));
- }
- }
-
/// Close dynamic writer
pub fn dynamic_writer_close(
&self,
@@ -633,94 +728,6 @@ impl BackupEnvironment {
Ok(())
}
- /// If verify-new is set on the datastore, this will run a new verify task
- /// for the backup. If not, this will return and also drop the passed lock
- /// immediately.
- pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
- self.ensure_finished()?;
-
- if !self.datastore.verify_new() {
- // no verify requested, do nothing
- return Ok(());
- }
-
- // Downgrade to shared lock, the backup itself is finished
- drop(excl_snap_lock);
- let snap_lock = self.backup_dir.lock_shared().with_context(|| {
- format!(
- "while trying to verify snapshot '{:?}' after completion",
- self.backup_dir
- )
- })?;
- let worker_id = format!(
- "{}:{}/{}/{:08X}",
- self.datastore.name(),
- self.backup_dir.backup_type(),
- self.backup_dir.backup_id(),
- self.backup_dir.backup_time()
- );
-
- let datastore = self.datastore.clone();
- let backup_dir = self.backup_dir.clone();
-
- WorkerTask::new_thread(
- "verify",
- Some(worker_id),
- self.auth_id.to_string(),
- false,
- move |worker| {
- worker.log_message("Automatically verifying newly added snapshot");
-
- let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
- if !verify_backup_dir_with_lock(
- &verify_worker,
- &backup_dir,
- worker.upid().clone(),
- None,
- snap_lock,
- )? {
- bail!("verification failed - please check the log for details");
- }
-
- Ok(())
- },
- )
- .map(|_| ())
- }
-
- pub fn log<S: AsRef<str>>(&self, msg: S) {
- info!("{}", msg.as_ref());
- }
-
- pub fn debug<S: AsRef<str>>(&self, msg: S) {
- if self.debug {
- // This is kinda weird, we would like to use tracing::debug! here and automatically
- // filter it, but self.debug is set from the client-side and the logs are printed on
- // client and server side. This means that if the client sets the log level to debug,
- // both server and client need to have 'debug' logs printed.
- self.log(msg);
- }
- }
-
- pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
- self.formatter.format_result(result, self)
- }
-
- /// Raise error if finished flag is not set
- pub fn ensure_finished(&self) -> Result<(), Error> {
- let state = self.state.lock().unwrap();
- if !state.finished {
- bail!("backup ended but finished flag is not set.");
- }
- Ok(())
- }
-
- /// Return true if the finished flag is set
- pub fn finished(&self) -> bool {
- let state = self.state.lock().unwrap();
- state.finished
- }
-
/// Remove complete backup
pub fn remove_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@@ -736,7 +743,7 @@ impl BackupEnvironment {
}
}
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -758,14 +765,18 @@ impl RpcEnvironment for BackupEnvironment {
}
}
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
--
2.39.5
More information about the pbs-devel
mailing list