[pbs-devel] [PATCH v3 proxmox-backup 05/11] pull/sync: extract passed along vars into struct
Fabian Grünbichler
f.gruenbichler at proxmox.com
Thu Oct 28 15:00:52 CEST 2021
this is basically the sync job config without ID and some stuff
converted already, and a convenient helper to generate the http client
from it.
Suggested-by: Dominik Csapak <d.csapak at proxmox.com>
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
src/api2/config/remote.rs | 4 +-
src/api2/pull.rs | 61 +++++++++++++-------------
src/server/pull.rs | 91 ++++++++++++++++++++++++++-------------
3 files changed, 92 insertions(+), 64 deletions(-)
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 29e638d7..4dffe6bb 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -277,7 +277,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
}
/// Helper to get client for remote.cfg entry
-pub async fn remote_client(remote: Remote) -> Result<HttpClient, Error> {
+pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
let client = HttpClient::new(
@@ -322,7 +322,7 @@ pub async fn scan_remote_datastores(name: String) -> Result<Vec<DataStoreListIte
api_err)
};
- let client = remote_client(remote)
+ let client = remote_client(&remote)
.await
.map_err(map_remote_err)?;
let api_res = client
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 4280d922..5ae916ed 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -1,5 +1,5 @@
//! Sync datastore from remote server
-use std::sync::{Arc};
+use std::convert::TryFrom;
use anyhow::{format_err, Error};
use futures::{select, future::FutureExt};
@@ -7,18 +7,18 @@ use futures::{select, future::FutureExt};
use proxmox_schema::api;
use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
-use pbs_client::{HttpClient, BackupRepository};
use pbs_api_types::{
- Remote, Authid, SyncJobConfig,
+ Authid, SyncJobConfig,
DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
};
use pbs_tools::task_log;
use proxmox_rest_server::WorkerTask;
use pbs_config::CachedUserInfo;
-use pbs_datastore::DataStore;
-use crate::server::{jobstate::Job, pull::pull_store};
+use crate::server::pull::{PullParameters, pull_store};
+use crate::server::jobstate::Job;
+
pub fn check_pull_privs(
auth_id: &Authid,
@@ -40,27 +40,18 @@ pub fn check_pull_privs(
Ok(())
}
-pub async fn get_pull_parameters(
- store: &str,
- remote: &str,
- remote_store: &str,
-) -> Result<(HttpClient, BackupRepository, Arc<DataStore>), Error> {
-
- let tgt_store = DataStore::lookup_datastore(store)?;
-
- let (remote_config, _digest) = pbs_config::remote::config()?;
- let remote: Remote = remote_config.lookup("remote", remote)?;
-
- let src_repo = BackupRepository::new(
- Some(remote.config.auth_id.clone()),
- Some(remote.config.host.clone()),
- remote.config.port,
- remote_store.to_string(),
- );
-
- let client = crate::api2::config::remote::remote_client(remote).await?;
-
- Ok((client, src_repo, tgt_store))
+impl TryFrom<&SyncJobConfig> for PullParameters {
+ type Error = Error;
+
+ fn try_from(sync_job: &SyncJobConfig) -> Result<Self, Self::Error> {
+ PullParameters::new(
+ &sync_job.store,
+ &sync_job.remote,
+ &sync_job.remote_store,
+ sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
+ sync_job.remove_vanished,
+ )
+ }
}
pub fn do_sync_job(
@@ -94,9 +85,8 @@ pub fn do_sync_job(
let worker_future = async move {
- let delete = sync_job.remove_vanished.unwrap_or(true);
- let sync_owner = sync_job.owner.unwrap_or_else(|| Authid::root_auth_id().clone());
- let (client, src_repo, tgt_store) = get_pull_parameters(&sync_job.store, &sync_job.remote, &sync_job.remote_store).await?;
+ let pull_params = PullParameters::try_from(&sync_job)?;
+ let client = pull_params.client().await?;
task_log!(worker, "Starting datastore sync job '{}'", job_id);
if let Some(event_str) = schedule {
@@ -110,7 +100,7 @@ pub fn do_sync_job(
sync_job.remote_store,
);
- pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, sync_owner).await?;
+ pull_store(&worker, &client, &pull_params).await?;
task_log!(worker, "sync job '{}' end", &job_id);
@@ -187,14 +177,21 @@ async fn pull (
check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?;
- let (client, src_repo, tgt_store) = get_pull_parameters(&store, &remote, &remote_store).await?;
+ let pull_params = PullParameters::new(
+ &store,
+ &remote,
+ &remote_store,
+ auth_id.clone(),
+ remove_vanished,
+ )?;
+ let client = pull_params.client().await?;
// fixme: set to_stdout to false?
let upid_str = WorkerTask::spawn("sync", Some(store.clone()), auth_id.to_string(), true, move |worker| async move {
task_log!(worker, "sync datastore '{}' start", store);
- let pull_future = pull_store(&worker, &client, &src_repo, tgt_store.clone(), delete, auth_id);
+ let pull_future = pull_store(&worker, &client, &pull_params);
let future = select!{
success = pull_future.fuse() => success,
abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5c3f9a18..2c454e2d 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -13,7 +13,7 @@ use http::StatusCode;
use proxmox_router::HttpError;
-use pbs_api_types::{Authid, SnapshotListItem, GroupListItem};
+use pbs_api_types::{Authid, GroupListItem, Remote, SnapshotListItem};
use pbs_datastore::{DataStore, BackupInfo, BackupDir, BackupGroup, StoreProgress};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
@@ -33,6 +33,44 @@ use crate::tools::ParallelHandler;
// fixme: delete vanished groups
// Todo: correctly lock backup groups
+pub struct PullParameters {
+ remote: Remote,
+ source: BackupRepository,
+ store: Arc<DataStore>,
+ owner: Authid,
+ remove_vanished: bool,
+}
+
+impl PullParameters {
+ pub fn new(
+ store: &str,
+ remote: &str,
+ remote_store: &str,
+ owner: Authid,
+ remove_vanished: Option<bool>,
+ ) -> Result<Self, Error> {
+ let store = DataStore::lookup_datastore(store)?;
+
+ let (remote_config, _digest) = pbs_config::remote::config()?;
+ let remote: Remote = remote_config.lookup("remote", remote)?;
+
+ let remove_vanished = remove_vanished.unwrap_or(true);
+
+ let source = BackupRepository::new(
+ Some(remote.config.auth_id.clone()),
+ Some(remote.config.host.clone()),
+ remote.config.port,
+ remote_store.to_string(),
+ );
+
+ Ok(Self { remote, source, store, owner, remove_vanished })
+ }
+
+ pub async fn client(&self) -> Result<HttpClient, Error> {
+ crate::api2::config::remote::remote_client(&self.remote).await
+ }
+}
+
async fn pull_index_chunks<I: IndexFile>(
worker: &WorkerTask,
chunk_reader: RemoteChunkReader,
@@ -503,13 +541,11 @@ impl std::fmt::Display for SkipInfo {
pub async fn pull_group(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
+ params: &PullParameters,
group: &BackupGroup,
- delete: bool,
progress: &mut StoreProgress,
) -> Result<(), Error> {
- let path = format!("api2/json/admin/datastore/{}/snapshots", src_repo.store());
+ let path = format!("api2/json/admin/datastore/{}/snapshots", params.source.store());
let args = json!({
"backup-type": group.backup_type(),
@@ -525,7 +561,7 @@ pub async fn pull_group(
let fingerprint = client.fingerprint();
- let last_sync = tgt_store.last_successful_backup(group)?;
+ let last_sync = params.store.last_successful_backup(group)?;
let mut remote_snapshots = std::collections::HashSet::new();
@@ -566,16 +602,16 @@ pub async fn pull_group(
let options = HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone());
let new_client = HttpClient::new(
- src_repo.host(),
- src_repo.port(),
- src_repo.auth_id(),
+ params.source.host(),
+ params.source.port(),
+ params.source.auth_id(),
options,
)?;
let reader = BackupReader::start(
new_client,
None,
- src_repo.store(),
+ params.source.store(),
snapshot.group().backup_type(),
snapshot.group().backup_id(),
backup_time,
@@ -586,7 +622,7 @@ pub async fn pull_group(
let result = pull_snapshot_from(
worker,
reader,
- tgt_store.clone(),
+ params.store.clone(),
&snapshot,
downloaded_chunks.clone(),
)
@@ -598,14 +634,14 @@ pub async fn pull_group(
result?; // stop on error
}
- if delete {
- let local_list = group.list_backups(&tgt_store.base_path())?;
+ if params.remove_vanished {
+ let local_list = group.list_backups(¶ms.store.base_path())?;
for info in local_list {
let backup_time = info.backup_dir.backup_time();
if remote_snapshots.contains(&backup_time) {
continue;
}
- if info.backup_dir.is_protected(tgt_store.base_path()) {
+ if info.backup_dir.is_protected(params.store.base_path()) {
task_log!(
worker,
"don't delete vanished snapshot {:?} (protected)",
@@ -614,7 +650,7 @@ pub async fn pull_group(
continue;
}
task_log!(worker, "delete vanished snapshot {:?}", info.backup_dir.relative_path());
- tgt_store.remove_backup_dir(&info.backup_dir, false)?;
+ params.store.remove_backup_dir(&info.backup_dir, false)?;
}
}
@@ -628,15 +664,12 @@ pub async fn pull_group(
pub async fn pull_store(
worker: &WorkerTask,
client: &HttpClient,
- src_repo: &BackupRepository,
- tgt_store: Arc<DataStore>,
- delete: bool,
- auth_id: Authid,
+ params: &PullParameters,
) -> Result<(), Error> {
// explicit create shared lock to prevent GC on newly created chunks
- let _shared_store_lock = tgt_store.try_shared_chunk_store_lock()?;
+ let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
- let path = format!("api2/json/admin/datastore/{}/groups", src_repo.store());
+ let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
let mut result = client
.get(&path, None)
@@ -675,7 +708,7 @@ pub async fn pull_store(
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let (owner, _lock_guard) = match tgt_store.create_locked_backup_group(&group, &auth_id) {
+ let (owner, _lock_guard) = match params.store.create_locked_backup_group(&group, ¶ms.owner) {
Ok(result) => result,
Err(err) => {
task_log!(
@@ -689,21 +722,19 @@ pub async fn pull_store(
};
// permission check
- if auth_id != owner {
+ if params.owner != owner {
// only the owner is allowed to create additional snapshots
task_log!(
worker,
"sync group {} failed - owner check failed ({} != {})",
- &group, auth_id, owner
+ &group, params.owner, owner
);
errors = true; // do not stop here, instead continue
} else if let Err(err) = pull_group(
worker,
client,
- src_repo,
- tgt_store.clone(),
+ params,
&group,
- delete,
&mut progress,
)
.await
@@ -717,9 +748,9 @@ pub async fn pull_store(
}
}
- if delete {
+ if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
- let local_groups = BackupInfo::list_backup_groups(&tgt_store.base_path())?;
+ let local_groups = BackupInfo::list_backup_groups(¶ms.store.base_path())?;
for local_group in local_groups {
if new_groups.contains(&local_group) {
continue;
@@ -730,7 +761,7 @@ pub async fn pull_store(
local_group.backup_type(),
local_group.backup_id()
);
- match tgt_store.remove_backup_group(&local_group) {
+ match params.store.remove_backup_group(&local_group) {
Ok(true) => {},
Ok(false) => {
task_log!(worker, "kept some protected snapshots of group '{}'", local_group);
--
2.30.2
More information about the pbs-devel
mailing list