[pbs-devel] [PATCH proxmox-backup v8 07/45] api: backup: store datastore backend in runtime environment

Lukas Wagner l.wagner at proxmox.com
Fri Jul 18 09:54:47 CEST 2025


Reviewed-by: Lukas Wagner <l.wagner at proxmox.com>


On  2025-07-15 14:52, Christian Ebner wrote:
> Get and store the datastore's backend during creation of the backup
> runtime environment and upload the chunks to the local filesystem or
> s3 object store based on the backend variant.
> 
> By storing the backend variant in the environment the s3 client is
> instantiated only once and reused for all api calls in the same
> backup http/2 connection.
> 
> Refactor the upgrade method by moving all logic into the async block,
> such that the now possible error on backup environment creation gets
> propagated to the thread spawn call side.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
> 
>  src/api2/backup/environment.rs |  11 +--
>  src/api2/backup/mod.rs         | 128 ++++++++++++++++-----------------
>  2 files changed, 71 insertions(+), 68 deletions(-)
> 
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 1d8f64aa0..7bd86f39c 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -16,7 +16,7 @@ use pbs_api_types::Authid;
>  use pbs_datastore::backup_info::{BackupDir, BackupInfo};
>  use pbs_datastore::dynamic_index::DynamicIndexWriter;
>  use pbs_datastore::fixed_index::FixedIndexWriter;
> -use pbs_datastore::{DataBlob, DataStore};
> +use pbs_datastore::{DataBlob, DataStore, DatastoreBackend};
>  use proxmox_rest_server::{formatter::*, WorkerTask};
>  
>  use crate::backup::VerifyWorker;
> @@ -116,6 +116,7 @@ pub struct BackupEnvironment {
>      pub datastore: Arc<DataStore>,
>      pub backup_dir: BackupDir,
>      pub last_backup: Option<BackupInfo>,
> +    pub backend: DatastoreBackend,
>      state: Arc<Mutex<SharedBackupState>>,
>  }
>  
> @@ -126,7 +127,7 @@ impl BackupEnvironment {
>          worker: Arc<WorkerTask>,
>          datastore: Arc<DataStore>,
>          backup_dir: BackupDir,
> -    ) -> Self {
> +    ) -> Result<Self, Error> {
>          let state = SharedBackupState {
>              finished: false,
>              uid_counter: 0,
> @@ -138,7 +139,8 @@ impl BackupEnvironment {
>              backup_stat: UploadStatistic::new(),
>          };
>  
> -        Self {
> +        let backend = datastore.backend()?;
> +        Ok(Self {
>              result_attributes: json!({}),
>              env_type,
>              auth_id,
> @@ -148,8 +150,9 @@ impl BackupEnvironment {
>              formatter: JSON_FORMATTER,
>              backup_dir,
>              last_backup: None,
> +            backend,
>              state: Arc::new(Mutex::new(state)),
> -        }
> +        })
>      }
>  
>      /// Register a Chunk with associated length.
> diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
> index a723e7cb0..026f1f106 100644
> --- a/src/api2/backup/mod.rs
> +++ b/src/api2/backup/mod.rs
> @@ -187,7 +187,8 @@ fn upgrade_to_backup_protocol(
>              }
>  
>              // lock last snapshot to prevent forgetting/pruning it during backup
> -            let guard = last.backup_dir
> +            let guard = last
> +                .backup_dir
>                  .lock_shared()
>                  .with_context(|| format!("while locking last snapshot during backup '{last:?}'"))?;
>              Some(guard)
> @@ -206,14 +207,14 @@ fn upgrade_to_backup_protocol(
>              Some(worker_id),
>              auth_id.to_string(),
>              true,
> -            move |worker| {
> +            move |worker| async move {
>                  let mut env = BackupEnvironment::new(
>                      env_type,
>                      auth_id,
>                      worker.clone(),
>                      datastore,
>                      backup_dir,
> -                );
> +                )?;
>  
>                  env.debug = debug;
>                  env.last_backup = last_backup;
> @@ -247,74 +248,73 @@ fn upgrade_to_backup_protocol(
>                          http.max_frame_size(4 * 1024 * 1024);
>  
>                          let env3 = env2.clone();
> -                        http.serve_connection(conn, TowerToHyperService::new(service)).map(move |result| {
> -                            match result {
> -                                Err(err) => {
> -                                    // Avoid  Transport endpoint is not connected (os error 107)
> -                                    // fixme: find a better way to test for that error
> -                                    if err.to_string().starts_with("connection error")
> -                                        && env3.finished()
> -                                    {
> -                                        Ok(())
> -                                    } else {
> -                                        Err(Error::from(err))
> +                        http.serve_connection(conn, TowerToHyperService::new(service))
> +                            .map(move |result| {
> +                                match result {
> +                                    Err(err) => {
> +                                        // Avoid  Transport endpoint is not connected (os error 107)
> +                                        // fixme: find a better way to test for that error
> +                                        if err.to_string().starts_with("connection error")
> +                                            && env3.finished()
> +                                        {
> +                                            Ok(())
> +                                        } else {
> +                                            Err(Error::from(err))
> +                                        }
>                                      }
> +                                    Ok(()) => Ok(()),
>                                  }
> -                                Ok(()) => Ok(()),
> -                            }
> -                        })
> +                            })
>                      });
>                  let mut abort_future = abort_future.map(|_| Err(format_err!("task aborted")));
>  
> -                async move {
> -                    // keep flock until task ends
> -                    let _group_guard = _group_guard;
> -                    let snap_guard = snap_guard;
> -                    let _last_guard = _last_guard;
> -
> -                    let res = select! {
> -                        req = req_fut => req,
> -                        abrt = abort_future => abrt,
> -                    };
> -                    if benchmark {
> -                        env.log("benchmark finished successfully");
> -                        proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> -                        return Ok(());
> +                // keep flock until task ends
> +                let _group_guard = _group_guard;
> +                let snap_guard = snap_guard;
> +                let _last_guard = _last_guard;
> +
> +                let res = select! {
> +                    req = req_fut => req,
> +                    abrt = abort_future => abrt,
> +                };
> +                if benchmark {
> +                    env.log("benchmark finished successfully");
> +                    proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> +                    return Ok(());
> +                }
> +
> +                let verify = |env: BackupEnvironment| {
> +                    if let Err(err) = env.verify_after_complete(snap_guard) {
> +                        env.log(format!(
> +                            "backup finished, but starting the requested verify task failed: {}",
> +                            err
> +                        ));
>                      }
> +                };
>  
> -                    let verify = |env: BackupEnvironment| {
> -                        if let Err(err) = env.verify_after_complete(snap_guard) {
> -                            env.log(format!(
> -                                "backup finished, but starting the requested verify task failed: {}",
> -                                err
> -                            ));
> -                        }
> -                    };
> -
> -                    match (res, env.ensure_finished()) {
> -                        (Ok(_), Ok(())) => {
> -                            env.log("backup finished successfully");
> -                            verify(env);
> -                            Ok(())
> -                        }
> -                        (Err(err), Ok(())) => {
> -                            // ignore errors after finish
> -                            env.log(format!("backup had errors but finished: {}", err));
> -                            verify(env);
> -                            Ok(())
> -                        }
> -                        (Ok(_), Err(err)) => {
> -                            env.log(format!("backup ended and finish failed: {}", err));
> -                            env.log("removing unfinished backup");
> -                            proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> -                            Err(err)
> -                        }
> -                        (Err(err), Err(_)) => {
> -                            env.log(format!("backup failed: {}", err));
> -                            env.log("removing failed backup");
> -                            proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> -                            Err(err)
> -                        }
> +                match (res, env.ensure_finished()) {
> +                    (Ok(_), Ok(())) => {
> +                        env.log("backup finished successfully");
> +                        verify(env);
> +                        Ok(())
> +                    }
> +                    (Err(err), Ok(())) => {
> +                        // ignore errors after finish
> +                        env.log(format!("backup had errors but finished: {}", err));
> +                        verify(env);
> +                        Ok(())
> +                    }
> +                    (Ok(_), Err(err)) => {
> +                        env.log(format!("backup ended and finish failed: {}", err));
> +                        env.log("removing unfinished backup");
> +                        proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> +                        Err(err)
> +                    }
> +                    (Err(err), Err(_)) => {
> +                        env.log(format!("backup failed: {}", err));
> +                        env.log("removing failed backup");
> +                        proxmox_async::runtime::block_in_place(|| env.remove_backup())?;
> +                        Err(err)
>                      }
>                  }
>              },

-- 
- Lukas





More information about the pbs-devel mailing list