[pbs-devel] [PATCH v3 proxmox-backup 15/33] fix #3044: server: implement push support for sync operations

Christian Ebner c.ebner at proxmox.com
Mon Oct 14 11:32:39 CEST 2024


On 10/10/24 16:48, Fabian Grünbichler wrote:
> left some higher level comments on the cover letter as well that are
> relevant for this patch!

Okay, will be addressed there.

> On September 12, 2024 4:33 pm, Christian Ebner wrote:
>> Adds the functionality required to push datastore contents from a
>> source to a remote target.
>> This includes syncing of the namespaces, backup groups and snapshots
>> based on the provided filters as well as removing vanished contents
>> from the target when requested.
>>
>> While trying to mimic the pull direction of sync jobs, the
>> implementation is different as access to the remote must be performed
>> via the REST API, not needed for the pull job which can access the
>> local datastore via the filesystem directly.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 2:
>> - Implement additional permission checks limiting possible remote
>>    datastore operations.
>> - Rename `owner` to `local_user`, this is the user who's view of the
>>    local datastore is used for the push to the remote target. It can be
>>    different from the job user, executing the sync job and requiring the
>>    permissions to access the remote.
>>
>>   src/server/mod.rs  |   1 +
>>   src/server/push.rs | 892 +++++++++++++++++++++++++++++++++++++++++++++
>>   2 files changed, 893 insertions(+)
>>   create mode 100644 src/server/push.rs
>>
>> diff --git a/src/server/mod.rs b/src/server/mod.rs
>> index 468847c2e..882c5cc10 100644
>> --- a/src/server/mod.rs
>> +++ b/src/server/mod.rs
>> @@ -34,6 +34,7 @@ pub use report::*;
>>   pub mod auth;
>>   
>>   pub(crate) mod pull;
>> +pub(crate) mod push;
>>   pub(crate) mod sync;
>>   
>>   pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
>> diff --git a/src/server/push.rs b/src/server/push.rs
>> new file mode 100644
>> index 000000000..cfbb88728
>> --- /dev/null
>> +++ b/src/server/push.rs
>> @@ -0,0 +1,892 @@
>> +//! Sync datastore by pushing contents to remote server
>> +
>> +use std::cmp::Ordering;
>> +use std::collections::HashSet;
>> +use std::sync::{Arc, Mutex};
>> +
>> +use anyhow::{bail, format_err, Error};
>> +use futures::stream::{self, StreamExt, TryStreamExt};
>> +use tokio::sync::mpsc;
>> +use tokio_stream::wrappers::ReceiverStream;
>> +use tracing::info;
>> +
>> +use pbs_api_types::{
>> +    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
>> +    GroupListItem, NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
>> +    PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_MODIFY, PRIV_REMOTE_DATASTORE_PRUNE,
>> +};
>> +use pbs_client::{BackupRepository, BackupWriter, HttpClient, UploadOptions};
>> +use pbs_config::CachedUserInfo;
>> +use pbs_datastore::data_blob::ChunkInfo;
>> +use pbs_datastore::dynamic_index::DynamicIndexReader;
>> +use pbs_datastore::fixed_index::FixedIndexReader;
>> +use pbs_datastore::index::IndexFile;
>> +use pbs_datastore::manifest::{ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
>> +use pbs_datastore::read_chunk::AsyncReadChunk;
>> +use pbs_datastore::{BackupManifest, DataStore, StoreProgress};
>> +
>> +use super::sync::{
>> +    check_namespace_depth_limit, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
>> +    SyncSource, SyncStats,
>> +};
>> +use crate::api2::config::remote;
>> +
>> +/// Target for backups to be pushed to
>> +pub(crate) struct PushTarget {
>> +    // Name of the remote as found in remote.cfg
>> +    remote: String,
>> +    // Target repository on remote
>> +    repo: BackupRepository,
>> +    // Target namespace on remote
>> +    ns: BackupNamespace,
>> +    // Http client to connect to remote
>> +    client: HttpClient,
>> +}
>> +
>> +/// Parameters for a push operation
>> +pub(crate) struct PushParameters {
>> +    /// Source of backups to be pushed to remote
>> +    source: Arc<LocalSource>,
>> +    /// Target for backups to be pushed to
>> +    target: PushTarget,
>> +    /// Local user limiting the accessible source contents, makes sure that the sync job sees the
>> +    /// same source content when executed by different users with different privileges
>> +    local_user: Authid,
>> +    /// User as which the job gets executed, requires the permissions on the remote
>> +    pub(crate) job_user: Option<Authid>,
>> +    /// Whether to remove groups which exist locally, but not on the remote end
>> +    remove_vanished: bool,
>> +    /// How many levels of sub-namespaces to push (0 == no recursion, None == maximum recursion)
>> +    max_depth: Option<usize>,
>> +    /// Filters for reducing the push scope
>> +    group_filter: Vec<GroupFilter>,
>> +    /// How many snapshots should be transferred at most (taking the newest N snapshots)
>> +    transfer_last: Option<usize>,
>> +}
>> +
>> +impl PushParameters {
>> +    /// Creates a new instance of `PushParameters`.
>> +    #[allow(clippy::too_many_arguments)]
>> +    pub(crate) fn new(
>> +        store: &str,
>> +        ns: BackupNamespace,
>> +        remote_id: &str,
>> +        remote_store: &str,
>> +        remote_ns: BackupNamespace,
>> +        local_user: Authid,
>> +        remove_vanished: Option<bool>,
>> +        max_depth: Option<usize>,
>> +        group_filter: Option<Vec<GroupFilter>>,
>> +        limit: RateLimitConfig,
>> +        transfer_last: Option<usize>,
>> +    ) -> Result<Self, Error> {
>> +        if let Some(max_depth) = max_depth {
>> +            ns.check_max_depth(max_depth)?;
>> +            remote_ns.check_max_depth(max_depth)?;
>> +        };
>> +        let remove_vanished = remove_vanished.unwrap_or(false);
>> +
>> +        let source = Arc::new(LocalSource {
>> +            store: DataStore::lookup_datastore(store, Some(Operation::Read))?,
>> +            ns,
>> +        });
>> +
>> +        let (remote_config, _digest) = pbs_config::remote::config()?;
>> +        let remote: Remote = remote_config.lookup("remote", remote_id)?;
>> +
>> +        let repo = BackupRepository::new(
>> +            Some(remote.config.auth_id.clone()),
>> +            Some(remote.config.host.clone()),
>> +            remote.config.port,
>> +            remote_store.to_string(),
>> +        );
>> +
>> +        let client = remote::remote_client_config(&remote, Some(limit))?;
>> +        let target = PushTarget {
>> +            remote: remote_id.to_string(),
>> +            repo,
>> +            ns: remote_ns,
>> +            client,
>> +        };
>> +        let group_filter = group_filter.unwrap_or_default();
>> +
>> +        Ok(Self {
>> +            source,
>> +            target,
>> +            local_user,
>> +            job_user: None,
>> +            remove_vanished,
>> +            max_depth,
>> +            group_filter,
>> +            transfer_last,
>> +        })
>> +    }
>> +}
>> +
>> +fn check_ns_remote_datastore_privs(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    privs: u64,
>> +) -> Result<(), Error> {
>> +    let auth_id = params
>> +        .job_user
>> +        .as_ref()
>> +        .ok_or_else(|| format_err!("missing job authid"))?;
>> +    let user_info = CachedUserInfo::new()?;
>> +    let mut acl_path: Vec<&str> = vec!["remote", &params.target.remote, params.target.repo.store()];
>> +
>> +    if !namespace.is_root() {
>> +        let ns_components: Vec<&str> = namespace.components().collect();
>> +        acl_path.extend(ns_components);
>> +    }
>> +
>> +    user_info.check_privs(auth_id, &acl_path, privs, false)?;
>> +
>> +    Ok(())
>> +}
>> +
>> +// Fetch the list of namespaces found on target
>> +async fn fetch_target_namespaces(params: &PushParameters) -> Result<Vec<BackupNamespace>, Error> {
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/namespace",
>> +        store = params.target.repo.store(),
>> +    );
>> +    let mut result = params.target.client.get(&api_path, None).await?;
>> +    let namespaces: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
>> +    let mut namespaces: Vec<BackupNamespace> = namespaces
>> +        .into_iter()
>> +        .map(|namespace| namespace.ns)
>> +        .collect();
>> +    namespaces.sort_unstable_by_key(|a| a.name_len());
>> +
>> +    Ok(namespaces)
>> +}
>> +
>> +// Remove the provided namespace from the target
>> +async fn remove_target_namespace(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +) -> Result<(), Error> {
>> +    if namespace.is_root() {
>> +        bail!("cannot remove root namespace from target");
>> +    }
>> +
>> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_PRUNE)
>> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
> 
> this should be MODIFY, not PRUNE to mimic pull-based syncing, see cover letter

Ack, changed to be `PRIV_REMOTE_DATASTORE_MODIFY` for the upcoming 
version of the patches.
> 
>> +
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/namespace",
>> +        store = params.target.repo.store(),
>> +    );
>> +
>> +    let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +    let target_ns = params.map_namespace(namespace)?;
> 
> would it make sense to make this less verbose *and more readable* by
> implementing a `map_namespace` on params? 7 call sites ;)

Yes, that makes it indeed more readable. I however decided to call it 
`map_to_target`, also leaving the option to make this generic over a 
(currently not required) type, if ever needed in the future.

> 
>> +    let args = serde_json::json!({
>> +        "ns": target_ns.name(),
>> +        "delete-groups": true,
>> +    });
>> +
>> +    params.target.client.delete(&api_path, Some(args)).await?;
>> +
>> +    Ok(())
>> +}
>> +
>> +// Fetch the list of groups found on target in given namespace
>> +async fn fetch_target_groups(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +) -> Result<Vec<BackupGroup>, Error> {
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/groups",
>> +        store = params.target.repo.store(),
>> +    );
>> +
>> +    let args = if !namespace.is_root() {
>> +        let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +        Some(serde_json::json!({ "ns": target_ns.name() }))
>> +    } else {
>> +        None
>> +    };
>> +
>> +    let mut result = params.target.client.get(&api_path, args).await?;
>> +    let groups: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
>> +    let mut groups: Vec<BackupGroup> = groups.into_iter().map(|group| group.backup).collect();
>> +
>> +    groups.sort_unstable_by(|a, b| {
>> +        let type_order = a.ty.cmp(&b.ty);
>> +        if type_order == Ordering::Equal {
>> +            a.id.cmp(&b.id)
>> +        } else {
>> +            type_order
>> +        }
>> +    });
>> +
>> +    Ok(groups)
>> +}
>> +
>> +// Remove the provided backup group in given namespace from the target
>> +async fn remove_target_group(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    backup_group: &BackupGroup,
>> +) -> Result<(), Error> {
>> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_PRUNE)
>> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
>> +
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/groups",
>> +        store = params.target.repo.store(),
>> +    );
>> +
>> +    let mut args = serde_json::json!({
>> +        "backup-id": backup_group.id,
>> +        "backup-type": backup_group.ty,
>> +    });
>> +    if !namespace.is_root() {
>> +        let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +        args["ns"] = serde_json::to_value(target_ns.name())?;
>> +    }
>> +
>> +    params.target.client.delete(&api_path, Some(args)).await?;
>> +
>> +    Ok(())
>> +}
>> +
>> +// Check if the namespace is already present on the target, create it otherwise
>> +async fn check_or_create_target_namespace(
>> +    params: &PushParameters,
>> +    target_namespaces: &[BackupNamespace],
>> +    namespace: &BackupNamespace,
>> +) -> Result<bool, Error> {
>> +    let mut created = false;
>> +
>> +    if !namespace.is_root() && !target_namespaces.contains(namespace) {
>> +        // Namespace not present on target, create namespace.
>> +        // Sub-namespaces have to be created by creating parent components first.
>> +
>> +        check_ns_remote_datastore_privs(&params, namespace, PRIV_REMOTE_DATASTORE_MODIFY)
>> +            .map_err(|err| format_err!("Creating namespace not allowed - {err}"))?;
>> +
>> +        let mut parent = BackupNamespace::root();
>> +        for namespace_component in namespace.components() {
>> +            let namespace = BackupNamespace::new(namespace_component)?;
>> +            let api_path = format!(
>> +                "api2/json/admin/datastore/{store}/namespace",
>> +                store = params.target.repo.store(),
>> +            );
>> +            let mut args = serde_json::json!({ "name": namespace.name() });
>> +            if !parent.is_root() {
>> +                args["parent"] = serde_json::to_value(parent.clone())?;
>> +            }
>> +            if let Err(err) = params.target.client.post(&api_path, Some(args)).await {
>> +                let target_store_and_ns =
>> +                    print_store_and_ns(params.target.repo.store(), &namespace);
>> +                bail!("sync into {target_store_and_ns} failed - namespace creation failed: {err}");
>> +            }
>> +            parent.push(namespace.name())?;
> 
> this tries to create every prefix of the missing namespace, instead of
> just the missing lower end of the hierarchy.. which is currently fine,
> since the create_namespace API endpoint doesn't fail if the namespace
> already exists, but since we already have a list of existing namespaces
> here, we could make it more future proof (and a bit faster ;)) by
> skipping those..

Added the additional checks to not perform the api calls to pre-existing 
namespace components of the target.

>> +        }
>> +
>> +        created = true;
>> +    }
>> +
>> +    Ok(created)
>> +}
>> +
>> +/// Push contents of source datastore matched by given push parameters to target.
>> +pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats, Error> {
>> +    let mut errors = false;
>> +
>> +    // Generate list of source namespaces to push to target, limited by max-depth
>> +    let mut namespaces = params.source.list_namespaces(&mut params.max_depth).await?;
>> +
>> +    check_namespace_depth_limit(&params.source.get_ns(), &params.target.ns, &namespaces)?;
>> +
>> +    namespaces.sort_unstable_by_key(|a| a.name_len());
>> +
>> +    // Fetch all accessible namespaces already present on the target
>> +    let target_namespaces = fetch_target_namespaces(&params).await?;
>> +    // Remember synced namespaces, removing non-synced ones when remove vanished flag is set
>> +    let mut synced_namespaces = HashSet::with_capacity(namespaces.len());
>> +
>> +    let (mut groups, mut snapshots) = (0, 0);
>> +    let mut stats = SyncStats::default();
>> +    for namespace in namespaces {
>> +        let source_store_and_ns = print_store_and_ns(params.source.store.name(), &namespace);
>> +        let target_namespace = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +        let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &target_namespace);
>> +
>> +        info!("----");
>> +        info!("Syncing {source_store_and_ns} into {target_store_and_ns}");
>> +
>> +        synced_namespaces.insert(target_namespace.clone());
>> +
>> +        match check_or_create_target_namespace(&params, &target_namespaces, &target_namespace).await
>> +        {
>> +            Ok(true) => info!("Created namespace {target_namespace}"),
>> +            Ok(false) => {}
>> +            Err(err) => {
>> +                info!("Cannot sync {source_store_and_ns} into {target_store_and_ns} - {err}");
>> +                errors = true;
>> +                continue;
>> +            }
>> +        }
>> +
>> +        match push_namespace(&namespace, &params).await {
>> +            Ok((sync_progress, sync_stats, sync_errors)) => {
>> +                errors |= sync_errors;
>> +                stats.add(sync_stats);
>> +
>> +                if params.max_depth != Some(0) {
>> +                    groups += sync_progress.done_groups;
>> +                    snapshots += sync_progress.done_snapshots;
>> +
>> +                    let ns = if namespace.is_root() {
>> +                        "root namespace".into()
>> +                    } else {
>> +                        format!("namespace {namespace}")
>> +                    };
>> +                    info!(
>> +                        "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots"
>> +                    );
>> +                }
>> +            }
>> +            Err(err) => {
>> +                errors = true;
>> +                info!("Encountered errors while syncing namespace {namespace} - {err}");
>> +            }
>> +        }
>> +    }
>> +
>> +    if params.remove_vanished {
>> +        for target_namespace in target_namespaces {
>> +            if synced_namespaces.contains(&target_namespace) {
>> +                continue;
>> +            }
>> +            if let Err(err) = remove_target_namespace(&params, &target_namespace).await {
>> +                info!("failed to remove vanished namespace {target_namespace} - {err}");
>> +                continue;
>> +            }
>> +            info!("removed vanished namespace {target_namespace}");
>> +        }
>> +    }
>> +
>> +    if errors {
>> +        bail!("sync failed with some errors.");
>> +    }
>> +
>> +    Ok(stats)
>> +}
>> +
>> +/// Push namespace including all backup groups to target
>> +///
>> +/// Iterate over all backup groups in the namespace and push them to the target.
>> +pub(crate) async fn push_namespace(
>> +    namespace: &BackupNamespace,
>> +    params: &PushParameters,
>> +) -> Result<(StoreProgress, SyncStats, bool), Error> {
>> +    // Check if user is allowed to perform backups on remote datastore
>> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_BACKUP)
>> +        .map_err(|err| format_err!("Pushing to remote not allowed - {err}"))?;
>> +
>> +    let mut list: Vec<BackupGroup> = params
>> +        .source
>> +        .list_groups(namespace, &params.local_user)
>> +        .await?;
>> +
>> +    list.sort_unstable_by(|a, b| {
>> +        let type_order = a.ty.cmp(&b.ty);
>> +        if type_order == Ordering::Equal {
>> +            a.id.cmp(&b.id)
>> +        } else {
>> +            type_order
>> +        }
>> +    });
>> +
>> +    let total = list.len();
>> +    let list: Vec<BackupGroup> = list
>> +        .into_iter()
>> +        .filter(|group| group.apply_filters(&params.group_filter))
>> +        .collect();
>> +
>> +    info!(
>> +        "found {filtered} groups to sync (out of {total} total)",
>> +        filtered = list.len()
>> +    );
>> +
>> +    let target_groups = if params.remove_vanished {
>> +        fetch_target_groups(params, namespace).await?
>> +    } else {
>> +        // avoid fetching of groups, not required if remove vanished not set
>> +        Vec::new()
>> +    };
> 
> should we then fetch them below in the if remove_vanished branch, like
> we do when handling snapshots?

Yes, I do not recall why exactly I did fetch the groups already here, 
might be a leftover from a previous iteration during implementation.

As this is never used other than for the remove vanished case, moved the 
call there.

>> +
>> +    let mut errors = false;
>> +    // Remember synced groups, remove others when the remove vanished flag is set
>> +    let mut synced_groups = HashSet::new();
>> +    let mut progress = StoreProgress::new(list.len() as u64);
>> +    let mut stats = SyncStats::default();
>> +
>> +    for (done, group) in list.into_iter().enumerate() {
>> +        progress.done_groups = done as u64;
>> +        progress.done_snapshots = 0;
>> +        progress.group_snapshots = 0;
>> +        synced_groups.insert(group.clone());
>> +
>> +        match push_group(params, namespace, &group, &mut progress).await {
>> +            Ok(sync_stats) => stats.add(sync_stats),
>> +            Err(err) => {
>> +                info!("sync group '{group}' failed  - {err}");
>> +                errors = true;
>> +            }
>> +        }
>> +    }
>> +
>> +    if params.remove_vanished {
>> +        for target_group in target_groups {
>> +            if synced_groups.contains(&target_group) {
>> +                continue;
>> +            }
>> +            if !target_group.apply_filters(&params.group_filter) {
>> +                continue;
>> +            }
>> +
>> +            info!("delete vanished group '{target_group}'");
>> +
>> +            let count_before = match fetch_target_groups(params, namespace).await {
>> +                Ok(snapshots) => snapshots.len(),
>> +                Err(_err) => 0, // ignore errors
>> +            };
>> +
>> +            if let Err(err) = remove_target_group(params, namespace, &target_group).await {
>> +                info!("{err}");
>> +                errors = true;
>> +                continue;
>> +            }
>> +
>> +            let mut count_after = match fetch_target_groups(params, namespace).await {
>> +                Ok(snapshots) => snapshots.len(),
>> +                Err(_err) => 0, // ignore errors
>> +            };
>> +
>> +            let deleted_groups = if count_after > 0 {
>> +                info!("kept some protected snapshots of group '{target_group}'");
>> +                0
>> +            } else {
>> +                1
>> +            };
>> +
>> +            if count_after > count_before {
>> +                count_after = count_before;
>> +            }
>> +
>> +            stats.add(SyncStats::from(RemovedVanishedStats {
>> +                snapshots: count_before - count_after,
>> +                groups: deleted_groups,
>> +                namespaces: 0,
>> +            }));
>> +        }
>> +    }
>> +
>> +    Ok((progress, stats, errors))
>> +}
>> +
>> +async fn fetch_target_snapshots(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    group: &BackupGroup,
>> +) -> Result<Vec<SnapshotListItem>, Error> {
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/snapshots",
>> +        store = params.target.repo.store(),
>> +    );
>> +    let mut args = serde_json::to_value(group)?;
>> +    if !namespace.is_root() {
>> +        let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +        args["ns"] = serde_json::to_value(target_ns)?;
>> +    }
>> +    let mut result = params.target.client.get(&api_path, Some(args)).await?;
>> +    let snapshots: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
>> +
>> +    Ok(snapshots)
>> +}
>> +
>> +async fn fetch_previous_backup_time(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    group: &BackupGroup,
>> +) -> Result<Option<i64>, Error> {
>> +    let mut snapshots = fetch_target_snapshots(params, namespace, group).await?;
>> +    snapshots.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
>> +    Ok(snapshots.last().map(|snapshot| snapshot.backup.time))
>> +}
>> +
>> +async fn forget_target_snapshot(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    snapshot: &BackupDir,
>> +) -> Result<(), Error> {
>> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_PRUNE)
>> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
>> +
>> +    let api_path = format!(
>> +        "api2/json/admin/datastore/{store}/snapshots",
>> +        store = params.target.repo.store(),
>> +    );
>> +    let mut args = serde_json::to_value(snapshot)?;
>> +    if !namespace.is_root() {
>> +        let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +        args["ns"] = serde_json::to_value(target_ns)?;
>> +    }
>> +    params.target.client.delete(&api_path, Some(args)).await?;
>> +
>> +    Ok(())
>> +}
>> +
>> +/// Push group including all snaphshots to target
>> +///
>> +/// Iterate over all snapshots in the group and push them to the target.
>> +/// The group sync operation consists of the following steps:
>> +/// - Query snapshots of given group from the source
>> +/// - Sort snapshots by time
>> +/// - Apply transfer last cutoff and filters to list
>> +/// - Iterate the snapshot list and push each snapshot individually
>> +/// - (Optional): Remove vanished groups if `remove_vanished` flag is set
>> +pub(crate) async fn push_group(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    group: &BackupGroup,
>> +    progress: &mut StoreProgress,
>> +) -> Result<SyncStats, Error> {
>> +    let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
>> +    let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>> +
>> +    let mut snapshots: Vec<BackupDir> = params.source.list_backup_dirs(namespace, group).await?;
>> +    snapshots.sort_unstable_by(|a, b| a.time.cmp(&b.time));
>> +
>> +    let total_snapshots = snapshots.len();
>> +    let cutoff = params
>> +        .transfer_last
>> +        .map(|count| total_snapshots.saturating_sub(count))
>> +        .unwrap_or_default();
>> +
>> +    let last_snapshot_time = fetch_previous_backup_time(params, namespace, group)
>> +        .await?
>> +        .unwrap_or(i64::MIN);
>> +
>> +    let mut source_snapshots = HashSet::new();
>> +    let snapshots: Vec<BackupDir> = snapshots
>> +        .into_iter()
>> +        .enumerate()
>> +        .filter(|&(pos, ref snapshot)| {
>> +            source_snapshots.insert(snapshot.time);
>> +            if last_snapshot_time > snapshot.time {
>> +                already_synced_skip_info.update(snapshot.time);
>> +                return false;
>> +            } else if already_synced_skip_info.count > 0 {
>> +                info!("{already_synced_skip_info}");
>> +                already_synced_skip_info.reset();
>> +                return true;
>> +            }
>> +
>> +            if pos < cutoff && last_snapshot_time != snapshot.time {
>> +                transfer_last_skip_info.update(snapshot.time);
>> +                return false;
>> +            } else if transfer_last_skip_info.count > 0 {
>> +                info!("{transfer_last_skip_info}");
>> +                transfer_last_skip_info.reset();
>> +            }
>> +            true
>> +        })
>> +        .map(|(_, dir)| dir)
>> +        .collect();
>> +
>> +    progress.group_snapshots = snapshots.len() as u64;
>> +
>> +    let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;
>> +    let target_snapshots: Vec<BackupDir> = target_snapshots
>> +        .into_iter()
>> +        .map(|snapshot| snapshot.backup)
>> +        .collect();
>> +
>> +    let mut stats = SyncStats::default();
>> +    for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
>> +        if target_snapshots.contains(&source_snapshot) {
>> +            progress.done_snapshots = pos as u64 + 1;
>> +            info!("percentage done: {progress}");
>> +            continue;
>> +        }
>> +        let result = push_snapshot(params, namespace, &source_snapshot).await;
>> +
>> +        progress.done_snapshots = pos as u64 + 1;
>> +        info!("percentage done: {progress}");
>> +
>> +        // stop on error
>> +        let sync_stats = result?;
>> +        stats.add(sync_stats);
>> +    }
>> +
>> +    if params.remove_vanished {
>> +        let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;
>> +        for snapshot in target_snapshots {
>> +            if source_snapshots.contains(&snapshot.backup.time) {
>> +                continue;
>> +            }
>> +            if snapshot.protected {
>> +                info!(
>> +                    "don't delete vanished snapshot {name} (protected)",
>> +                    name = snapshot.backup
>> +                );
>> +                continue;
>> +            }
>> +            if let Err(err) = forget_target_snapshot(params, namespace, &snapshot.backup).await {
>> +                info!(
>> +                    "could not delete vanished snapshot {name} - {err}",
>> +                    name = snapshot.backup
>> +                );
>> +            }
>> +            info!("delete vanished snapshot {name}", name = snapshot.backup);
>> +            stats.add(SyncStats::from(RemovedVanishedStats {
>> +                snapshots: 1,
>> +                groups: 0,
>> +                namespaces: 0,
>> +            }));
>> +        }
>> +    }
>> +
>> +    Ok(stats)
>> +}
>> +
>> +/// Push snapshot to target
>> +///
>> +/// Creates a new snapshot on the target and pushes the content of the source snapshot to the
>> +/// target by creating a new manifest file and connecting to the remote as backup writer client.
>> +/// Chunks are written by recreating the index by uploading the chunk stream as read from the
>> +/// source. Data blobs are uploaded as such.
>> +pub(crate) async fn push_snapshot(
>> +    params: &PushParameters,
>> +    namespace: &BackupNamespace,
>> +    snapshot: &BackupDir,
>> +) -> Result<SyncStats, Error> {
>> +    let mut stats = SyncStats::default();
>> +    let target_ns = namespace.map_prefix(&params.source.ns, &params.target.ns)?;
>> +    let backup_dir = params
>> +        .source
>> +        .store
>> +        .backup_dir(params.source.ns.clone(), snapshot.clone())?;
>> +
>> +    let reader = params.source.reader(namespace, snapshot).await?;
>> +
>> +    // Load the source manifest, needed to find crypt mode for files
>> +    let mut tmp_source_manifest_path = backup_dir.full_path();
>> +    tmp_source_manifest_path.push(MANIFEST_BLOB_NAME);
>> +    tmp_source_manifest_path.set_extension("tmp");
>> +    let source_manifest = if let Some(manifest_blob) = reader
>> +        .load_file_into(MANIFEST_BLOB_NAME, &tmp_source_manifest_path)
>> +        .await?
>> +    {
>> +        BackupManifest::try_from(manifest_blob)?
> 
> why do we copy the manifest into a .tmp file path here instead of just
> reading it? and if we need the copy, who's cleaning it up?

Switched this over to reading the manifest directly, as you are right 
and the temp file is not required at all. I was primed by the pull 
implementation where it is required since read from the remote.
Makes this part way cleaner and concise.

> 
>> +    } else {
>> +        // no manifest in snapshot, skip
>> +        return Ok(stats);
>> +    };
>> +
>> +    // Manifest to be created on target, referencing all the source archives after upload.
>> +    let mut manifest = BackupManifest::new(snapshot.clone());
>> +
>> +    // writer instance locks the snapshot on the remote side
>> +    let backup_writer = BackupWriter::start(
>> +        &params.target.client,
>> +        None,
>> +        params.target.repo.store(),
>> +        &target_ns,
>> +        snapshot,
>> +        false,
>> +        false,
>> +    )
>> +    .await?;
>> +
>> +    // Use manifest of previous snapshots in group on target for chunk upload deduplication
>> +    let previous_manifest = match backup_writer.download_previous_manifest().await {
>> +        Ok(manifest) => Some(Arc::new(manifest)),
>> +        Err(err) => {
>> +            log::info!("Could not download previous manifest - {err}");
>> +            None
>> +        }
>> +    };
> 
> this should not be attempted for the first snapshot in a group, else it
> does requests we already know will fail and spams the log as a result as
> well..

Agreed, added a check to make sure to only try to fetch the previous 
manifest if there are snapshots present for this backup group on the target.

>> +
>> +    let upload_options = UploadOptions {
>> +        compress: true,
>> +        encrypt: false,
> 
> this might warrant a comment why it's okay to do that (I hope it is? ;))

Ack, added some details on why to set up the upload options as is. 
Basically this is possible, since the backup writer stream code 
performing compression and encryption is bypassed by using the 
`upload_index_chunk_info` method.
Therefore even for encrypted and compressed backups, uploading of the 
chunks as read from the source is possible. The relevant parts need to 
be included in the manifest however, so it is restorable.

Compression is set to true, so that the blob upload of e.g. the backup 
log file (if present) will be compressed.

>> +        previous_manifest,
>> +        ..UploadOptions::default()
>> +    };
>> +
>> +    // Avoid double upload penalty by remembering already seen chunks
>> +    let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 1024)));
>> +
>> +    for entry in source_manifest.files() {
>> +        let mut path = backup_dir.full_path();
>> +        path.push(&entry.filename);
>> +        if path.try_exists()? {
>> +            match ArchiveType::from_path(&entry.filename)? {
>> +                ArchiveType::Blob => {
>> +                    let file = std::fs::File::open(path.clone())?;
>> +                    let backup_stats = backup_writer.upload_blob(file, &entry.filename).await?;
>> +                    manifest.add_file(
>> +                        entry.filename.to_string(),
>> +                        backup_stats.size,
>> +                        backup_stats.csum,
>> +                        entry.chunk_crypt_mode(),
>> +                    )?;
>> +                    stats.add(SyncStats {
>> +                        chunk_count: backup_stats.chunk_count as usize,
>> +                        bytes: backup_stats.size as usize,
>> +                        elapsed: backup_stats.duration,
>> +                        removed: None,
>> +                    });
>> +                }
>> +                ArchiveType::DynamicIndex => {
>> +                    let index = DynamicIndexReader::open(&path)?;
>> +                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
>> +                    let sync_stats = push_index(
>> +                        &entry.filename,
>> +                        index,
>> +                        chunk_reader,
>> +                        &backup_writer,
>> +                        &mut manifest,
>> +                        entry.chunk_crypt_mode(),
>> +                        None,
>> +                        &known_chunks,
>> +                    )
>> +                    .await?;
>> +                    stats.add(sync_stats);
>> +                }
>> +                ArchiveType::FixedIndex => {
>> +                    let index = FixedIndexReader::open(&path)?;
>> +                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
>> +                    let size = index.index_bytes();
>> +                    let sync_stats = push_index(
>> +                        &entry.filename,
>> +                        index,
>> +                        chunk_reader,
>> +                        &backup_writer,
>> +                        &mut manifest,
>> +                        entry.chunk_crypt_mode(),
>> +                        Some(size),
>> +                        &known_chunks,
>> +                    )
>> +                    .await?;
>> +                    stats.add(sync_stats);
>> +                }
>> +            }
>> +        } else {
>> +            info!("{path:?} does not exist, skipped.");
>> +        }
>> +    }
>> +
>> +    // Fetch client log from source and push to target
>> +    // this has to be handled individually since the log is never part of the manifest
>> +    let mut client_log_path = backup_dir.full_path();
>> +    client_log_path.push(CLIENT_LOG_BLOB_NAME);
>> +    if client_log_path.is_file() {
>> +        backup_writer
>> +            .upload_blob_from_file(
>> +                &client_log_path,
>> +                CLIENT_LOG_BLOB_NAME,
>> +                upload_options.clone(),
>> +            )
>> +            .await?;
>> +    } else {
>> +        info!("Client log at {client_log_path:?} does not exist or is not a file, skipped.");
>> +    }
> 
> I am not sure this warrants a log line.. the client log is optional
> after all, so this can happen quite a lot in practice (e.g., if you do
> host backups without bothering to upload logs..)
 >
> I think we should follow the logic of pull based syncing here - add a
> log to the last previously synced snapshot if it exists and is missing
> on the other end, otherwise only attempt to upload a log if it exists
> without logging its absence.

Dropped the log line for now, as fetching the info from the previous 
snapshot on the target just to display this log line seemed rather 
inefficient.
>> +
>> +    // Rewrite manifest for pushed snapshot, re-adding the existing fingerprint and signature
>> +    let mut manifest_json = serde_json::to_value(manifest)?;
>> +    manifest_json["unprotected"] = source_manifest.unprotected;
>> +    if let Some(signature) = source_manifest.signature {
>> +        manifest_json["signature"] = serde_json::to_value(signature)?;
>> +    }
>> +    let manifest_string = serde_json::to_string_pretty(&manifest_json).unwrap();
> 
> couldn't we just upload the original manifest here?

Yes, this will be done instead. This works even if the backup finish 
call (over-)writes some values (`chunk_upload_stats`), which is why I 
implemented this differently at first.

>> +    let backup_stats = backup_writer
>> +        .upload_blob_from_data(
>> +            manifest_string.into_bytes(),
>> +            MANIFEST_BLOB_NAME,
>> +            upload_options,
>> +        )
>> +        .await?;
>> +    backup_writer.finish().await?;
>> +
>> +    stats.add(SyncStats {
>> +        chunk_count: backup_stats.chunk_count as usize,
>> +        bytes: backup_stats.size as usize,
>> +        elapsed: backup_stats.duration,
>> +        removed: None,
>> +    });
>> +
>> +    Ok(stats)
>> +}
>> +
>> +// Read fixed or dynamic index and push to target by uploading via the backup writer instance
>> +//
>> +// For fixed indexes, the size must be provided as given by the index reader.
>> +#[allow(clippy::too_many_arguments)]
>> +async fn push_index<'a>(
>> +    filename: &'a str,
>> +    index: impl IndexFile + Send + 'static,
>> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>> +    backup_writer: &BackupWriter,
>> +    manifest: &mut BackupManifest,
>> +    crypt_mode: CryptMode,
>> +    size: Option<u64>,
>> +    known_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
>> +) -> Result<SyncStats, Error> {
>> +    let (upload_channel_tx, upload_channel_rx) = mpsc::channel(20);
>> +    let mut chunk_infos =
>> +        stream::iter(0..index.index_count()).map(move |pos| index.chunk_info(pos).unwrap());
> 
> so this iterates over all the chunks in the index..
> 
>> +
>> +    tokio::spawn(async move {
>> +        while let Some(chunk_info) = chunk_infos.next().await {
>> +            let chunk_info = chunk_reader
>> +                .read_raw_chunk(&chunk_info.digest)
> 
> and this reads them
> 
>> +                .await
>> +                .map(|chunk| ChunkInfo {
>> +                    chunk,
>> +                    digest: chunk_info.digest,
>> +                    chunk_len: chunk_info.size(),
>> +                    offset: chunk_info.range.start,
>> +                });
>> +            let _ = upload_channel_tx.send(chunk_info).await;
> 
> and sends them further along to the upload code.. which will then (in
> many cases) throw away all that data we just read because it's already
> on the target and we know that because of the previous manifest..
> 
> wouldn't it be better to deduplicate here already, and instead of
> reading known chunks over and over again, just tell the server to
> re-register them? or am I missing something here? :)

Good catch, this is indeed a possible huge performance bottleneck!

Did fix this by moving the known chunks check here (as suggested) and 
stream a `MergedChunkInfo` instead of `ChunkInfo`, which allows to only 
send the chunk's digest and size over to the backup writer. By this 
known chunk are never read.

> 
>> +        }
>> +    });
>> +
>> +    let chunk_info_stream = ReceiverStream::new(upload_channel_rx).map_err(Error::from);
>> +
>> +    let upload_options = UploadOptions {
>> +        compress: true,
>> +        encrypt: false,
>> +        fixed_size: size,
>> +        ..UploadOptions::default()
>> +    };
>> +
>> +    let upload_stats = backup_writer
>> +        .upload_index_chunk_info(
>> +            filename,
>> +            chunk_info_stream,
>> +            upload_options,
>> +            known_chunks.clone(),
>> +        )
>> +        .await?;
>> +
>> +    manifest.add_file(
>> +        filename.to_string(),
>> +        upload_stats.size,
>> +        upload_stats.csum,
>> +        crypt_mode,
>> +    )?;
>> +
>> +    Ok(SyncStats {
>> +        chunk_count: upload_stats.chunk_count as usize,
>> +        bytes: upload_stats.size as usize,
>> +        elapsed: upload_stats.duration,
>> +        removed: None,
>> +    })
>> +}
>> -- 
>> 2.39.2
>>
>>
>>
>> _______________________________________________
>> pbs-devel mailing list
>> pbs-devel at lists.proxmox.com
>> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>>
>>
>>
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 





More information about the pbs-devel mailing list