[pbs-devel] [PATCH v6 proxmox-backup 13/29] fix #3044: server: implement push support for sync operations

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Nov 6 12:57:45 CET 2024


Quoting Christian Ebner (2024-10-31 13:15:03)
> Adds the functionality required to push datastore contents from a
> source to a remote target.
> This includes syncing of the namespaces, backup groups and snapshots
> based on the provided filters as well as removing vanished contents
> from the target when requested.
> 
> While trying to mimic the pull direction of sync jobs, the
> implementation is different as access to the remote must be performed
> via the REST API, not needed for the pull job which can access the
> local datastore via the filesystem directly.
> 
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 5:
> - fetch backup groups split by owned and not owned, only allow to push
>   to owned groups, don't allow to prune not owned groups.
> - store remote api version unconditionally
> - check for supported feature instead of api version to include conditional
>   parameters for api calls
> - directly use delete stats from api calls, since these are not followup
>   patches anymore
> 
>  src/server/mod.rs  |   1 +
>  src/server/push.rs | 980 +++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 981 insertions(+)
>  create mode 100644 src/server/push.rs
> 
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index 2e40bde3c..7c14ed4b8 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -36,6 +36,7 @@ pub mod auth;
>  pub mod metric_collection;
>  
>  pub(crate) mod pull;
> +pub(crate) mod push;
>  pub(crate) mod sync;
>  
>  pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
> diff --git a/src/server/push.rs b/src/server/push.rs
> new file mode 100644
> index 000000000..c38e9c96b
> --- /dev/null
> +++ b/src/server/push.rs
> @@ -0,0 +1,980 @@
> +//! Sync datastore by pushing contents to remote server
> +
> +use std::cmp::Ordering;
> +use std::collections::HashSet;
> +use std::sync::{Arc, Mutex};
> +
> +use anyhow::{bail, format_err, Error};
> +use futures::stream::{self, StreamExt, TryStreamExt};
> +use tokio::sync::mpsc;
> +use tokio_stream::wrappers::ReceiverStream;
> +use tracing::{info, warn};
> +
> +use pbs_api_types::{
> +    print_store_and_ns, ApiVersion, ApiVersionInfo, Authid, BackupDir, BackupGroup,
> +    BackupGroupDeleteStats, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
> +    NamespaceListItem, Operation, RateLimitConfig, Remote, SnapshotListItem,
> +    PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_MODIFY, PRIV_REMOTE_DATASTORE_PRUNE,
> +};
> +use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions};
> +use pbs_config::CachedUserInfo;
> +use pbs_datastore::data_blob::ChunkInfo;
> +use pbs_datastore::dynamic_index::DynamicIndexReader;
> +use pbs_datastore::fixed_index::FixedIndexReader;
> +use pbs_datastore::index::IndexFile;
> +use pbs_datastore::manifest::{ArchiveType, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME};
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> +use pbs_datastore::{BackupManifest, DataStore, StoreProgress};
> +
> +use super::sync::{
> +    check_namespace_depth_limit, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
> +    SyncSource, SyncStats,
> +};
> +use crate::api2::config::remote;
> +
> +/// Target for backups to be pushed to
> +pub(crate) struct PushTarget {
> +    // Remote as found in remote.cfg
> +    remote: Remote,
> +    // Target repository on remote
> +    repo: BackupRepository,
> +    // Target namespace on remote
> +    ns: BackupNamespace,
> +    // Http client to connect to remote
> +    client: HttpClient,
> +    // Api version reported by the target
> +    api_version: ApiVersion,
> +}
> +
> +impl PushTarget {
> +    fn remote_user(&self) -> Authid {
> +        self.remote.config.auth_id.clone()
> +    }
> +}
> +
> +/// Parameters for a push operation
> +pub(crate) struct PushParameters {
> +    /// Source of backups to be pushed to remote
> +    source: Arc<LocalSource>,
> +    /// Target for backups to be pushed to
> +    target: PushTarget,
> +    /// Local user limiting the accessible source contents, makes sure that the sync job sees the
> +    /// same source content when executed by different users with different privileges
> +    /// User as which the job gets executed, requires the permissions on the remote

this now has two sentences which looks kinda weird. maybe they could be
combined into a single one, with the longer explanation of the semantics and
caveats in the documentation?

e.g.,

User used for permission checks on the source side, including potentially
filtering visible namespaces and backup groups.

> +    local_user: Authid,
> +    /// Whether to remove groups which exist locally, but not on the remote end
> +    remove_vanished: bool,

groups and namespaces?

> +    /// How many levels of sub-namespaces to push (0 == no recursion, None == maximum recursion)
> +    max_depth: Option<usize>,
> +    /// Filters for reducing the push scope
> +    group_filter: Vec<GroupFilter>,
> +    /// How many snapshots should be transferred at most (taking the newest N snapshots)
> +    transfer_last: Option<usize>,
> +}
> +
> +impl PushParameters {
> +    /// Creates a new instance of `PushParameters`.
> +    #[allow(clippy::too_many_arguments)]
> +    pub(crate) async fn new(
> +        store: &str,
> +        ns: BackupNamespace,
> +        remote_id: &str,
> +        remote_store: &str,
> +        remote_ns: BackupNamespace,
> +        local_user: Authid,
> +        remove_vanished: Option<bool>,
> +        max_depth: Option<usize>,
> +        group_filter: Option<Vec<GroupFilter>>,
> +        limit: RateLimitConfig,
> +        transfer_last: Option<usize>,
> +    ) -> Result<Self, Error> {
> +        if let Some(max_depth) = max_depth {
> +            ns.check_max_depth(max_depth)?;
> +            remote_ns.check_max_depth(max_depth)?;
> +        };
> +        let remove_vanished = remove_vanished.unwrap_or(false);
> +
> +        let source = Arc::new(LocalSource {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Read))?,
> +            ns,
> +        });
> +
> +        let (remote_config, _digest) = pbs_config::remote::config()?;
> +        let remote: Remote = remote_config.lookup("remote", remote_id)?;
> +
> +        let repo = BackupRepository::new(
> +            Some(remote.config.auth_id.clone()),
> +            Some(remote.config.host.clone()),
> +            remote.config.port,
> +            remote_store.to_string(),
> +        );
> +
> +        let client = remote::remote_client_config(&remote, Some(limit))?;
> +
> +        let mut result = client.get("api2/json/version", None).await?;
> +        let data = result["data"].take();
> +        let version_info: ApiVersionInfo = serde_json::from_value(data)?;
> +        let api_version = ApiVersion::try_from(version_info)?;
> +        let target = PushTarget {
> +            remote,
> +            repo,
> +            ns: remote_ns,
> +            client,
> +            api_version,
> +        };
> +        let group_filter = group_filter.unwrap_or_default();
> +
> +        Ok(Self {
> +            source,
> +            target,
> +            local_user,
> +            remove_vanished,
> +            max_depth,
> +            group_filter,
> +            transfer_last,
> +        })
> +    }
> +
> +    // Map the given namespace from source to target by adapting the prefix
> +    fn map_to_target(&self, namespace: &BackupNamespace) -> Result<BackupNamespace, Error> {
> +        namespace.map_prefix(&self.source.ns, &self.target.ns)
> +    }
> +}
> +
> +// Check if the job user given in the push parameters has the provided privs on the remote
> +// datastore namespace
> +fn check_ns_remote_datastore_privs(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

what is this namespace referring to? shouldn't it be the actual namespace on
the target side? if so, I'd rename it to `target_namespace` and adapt the call
sites accordingly to pass in already mapped namespaces..

i.e., if I push a local namespace foo/bar into a remote namespace bar/baz, it
doesn't really make sense to make the remote ACL path about the local
namespace?

> +    privs: u64,
> +) -> Result<(), Error> {
> +    let user_info = CachedUserInfo::new()?;
> +    let mut acl_path: Vec<&str> = vec![
> +        "remote",
> +        &params.target.remote.name,
> +        params.target.repo.store(),
> +    ];
> +
> +    if !namespace.is_root() {
> +        let ns_components: Vec<&str> = namespace.components().collect();
> +        acl_path.extend(ns_components);
> +    }

let acl_path = namespace.remote_acl_path(..) ;)

> +    user_info.check_privs(&params.local_user, &acl_path, privs, false)?;
> +
> +    Ok(())
> +}
> +
> +// Fetch the list of namespaces found on target
> +async fn fetch_target_namespaces(params: &PushParameters) -> Result<Vec<BackupNamespace>, Error> {
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/namespace",
> +        store = params.target.repo.store(),
> +    );
> +    let mut result = params.target.client.get(&api_path, None).await?;
> +    let namespaces: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut namespaces: Vec<BackupNamespace> = namespaces
> +        .into_iter()
> +        .map(|namespace| namespace.ns)
> +        .collect();
> +    namespaces.sort_unstable_by_key(|a| a.name_len());
> +
> +    Ok(namespaces)

so this fails for servers not yet supporting namespaces.. which is fine I guess (those versions are EOL in the meantime), but we might want to
- call that out in the docs
- check that up front based on the ApiVersion? (this one would actually need to go with the version as base, not a new extra feature, since we can't add that retroactively to all versions supporting namespaces..)

> +}
> +
> +// Remove the provided namespace from the target
> +async fn remove_target_namespace(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

should be `target_namespace` as well, based on the other comments above, since
all we do with it is check the "remote ACL" and map it for the actual API
call..

> +) -> Result<BackupGroupDeleteStats, Error> {
> +    if namespace.is_root() {
> +        bail!("cannot remove root namespace from target");
> +    }
> +
> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_MODIFY)
> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
> +
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/namespace",
> +        store = params.target.repo.store(),
> +    );
> +
> +    let target_ns = params.map_to_target(namespace)?;

and this mapping should happen at the call site..

> +    let mut args = serde_json::json!({
> +        "ns": target_ns.name(),
> +        "delete-groups": true,
> +    });
> +
> +    let api_feature_supported = params.target.api_version.supports_feature("prune-delete-stats");

this is done a few times, would it make sense to do it once when initializing
the parameters/target and just have a boolean? or even a "PushFeatures"
struct/bitmap/.. ? then it could just be logged once at the start of the sync
job as well..

> +    if api_feature_supported {
> +        args["error-on-protected"] = serde_json::to_value(false)?;
> +    }
> +
> +    let mut result = params.target.client.delete(&api_path, Some(args)).await?;
> +    let data = result["data"].take();
> +    let delete_stats: BackupGroupDeleteStats = if api_feature_supported {
> +        serde_json::from_value(data)?

should we add context to a deserialization error here?

> +    } else {
> +        serde_json::from_value(data).unwrap_or_else(|_| BackupGroupDeleteStats::default())

isn't this wrong? if the other end doesn't support DeleteStats, how could it
return one? this should just return empty stats..

> +    };
> +
> +    Ok(delete_stats)

this return could just be part of the if, dropping the corresponding let, just

> +}
> +
> +// Fetch the list of groups found on target in given namespace
> +// Returns sorted list of owned groups and a hashset containing not owned backup groups on target.
> +async fn fetch_target_groups(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

should be `target_namespace` as well

> +) -> Result<(Vec<BackupGroup>, HashSet<BackupGroup>), Error> {
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/groups",
> +        store = params.target.repo.store(),
> +    );
> +
> +    let args = if !namespace.is_root() {

else this check here is wrong, we can only skip setting "ns" if the *mapped*
namespace is the root one.. but also, I think we don't need this at all - if we
don't want to support servers without namespace support, then passing in an
empty string for the (mapped) root namespace should be fine? after all, that's
what it serializes to and can be deserialized from ..

> +        let target_ns = params.map_to_target(namespace)?;
> +        Some(serde_json::json!({ "ns": target_ns.name() }))
> +    } else {
> +        None
> +    };
> +
> +    let mut result = params.target.client.get(&api_path, args).await?;
> +    let groups: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +
> +    let (mut owned, not_owned) = groups.iter().fold(
> +        (Vec::new(), HashSet::new()),
> +        |(mut owned, mut not_owned), group| {
> +            if let Some(ref owner) = group.owner {
> +                if params.target.remote_user() == *owner {
> +                    owned.push(group.backup.clone());
> +                    return (owned, not_owned);
> +                }
> +            }
> +            not_owned.insert(group.backup.clone());
> +            (owned, not_owned)
> +        },
> +    );
> +
> +    owned.sort_unstable_by(|a, b| {
> +        let type_order = a.ty.cmp(&b.ty);
> +        if type_order == Ordering::Equal {
> +            a.id.cmp(&b.id)
> +        } else {
> +            type_order
> +        }
> +    });

this is copied from pull code, but actually, BackGroup implements cmp::Ord in a
more meaningful manner, and both could be switched over to that?

> +
> +    Ok((owned, not_owned))
> +}
> +
> +// Remove the provided backup group in given namespace from the target
> +async fn remove_target_group(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

`target_namespace` again.. because all we do here is check the remote ACL
(which should use the mapped namespace) and mapping it to do the API call..

> +    backup_group: &BackupGroup,
> +) -> Result<BackupGroupDeleteStats, Error> {
> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_PRUNE)
> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
> +
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/groups",
> +        store = params.target.repo.store(),
> +    );
> +
> +    let mut args = serde_json::json!({
> +        "backup-id": backup_group.id,
> +        "backup-type": backup_group.ty,
> +    });
> +
> +    let api_feature_supported = params.target.api_version.supports_feature("prune-delete-stats");
> +
> +    if api_feature_supported {
> +        args["error-on-protected"] = serde_json::to_value(false)?;
> +    }
> +    if !namespace.is_root() {

this again checks in the wrong order, but the same comment as in
fetch_target_groups applies here as well..

> +        let target_ns = params.map_to_target(namespace)?;
> +        args["ns"] = serde_json::to_value(target_ns.name())?;
> +    }
> +
> +    let mut result = params.target.client.delete(&api_path, Some(args)).await?;
> +    let data = result["data"].take();
> +    let delete_stats: BackupGroupDeleteStats = if api_feature_supported {
> +        serde_json::from_value(data)?
> +    } else {
> +        serde_json::from_value(data).unwrap_or_else(|_| BackupGroupDeleteStats::default())

and here the same comment as with removing namespaces, just return the default
stats right away, the server can't have returned one if it doesn't have the
feature..

> +    };
> +    Ok(delete_stats)

this return could just be part of the if, dropping the corresponding let, just
like in remove_target_namespace..

> +}
> +
> +// Check if the namespace is already present on the target, create it otherwise
> +async fn check_or_create_target_namespace(
> +    params: &PushParameters,
> +    target_namespaces: &[BackupNamespace],
> +    namespace: &BackupNamespace,

this actually already contains the target_namespace, but it's named like it
doesn't..

> +) -> Result<bool, Error> {
> +    let mut created = false;
> +
> +    if !namespace.is_root() && !target_namespaces.contains(namespace) {
> +        // Namespace not present on target, create namespace.
> +        // Sub-namespaces have to be created by creating parent components first.
> +
> +        check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_MODIFY)
> +            .map_err(|err| format_err!("Creating namespace not allowed - {err}"))?;

which means that this priv check here operated on different namespace semantics
than the others..

> +
> +        let mut parent = BackupNamespace::root();
> +        for component in namespace.components() {
> +            let current = BackupNamespace::from_parent_ns(&parent, component.to_string())?;
> +            // Skip over pre-existing parent namespaces on target
> +            if target_namespaces.contains(&current) {
> +                parent = current;
> +                continue;
> +            }
> +            let api_path = format!(
> +                "api2/json/admin/datastore/{store}/namespace",
> +                store = params.target.repo.store(),
> +            );
> +            let mut args = serde_json::json!({ "name": component.to_string() });
> +            if !parent.is_root() {
> +                args["parent"] = serde_json::to_value(parent.clone())?;
> +            }
> +            if let Err(err) = params.target.client.post(&api_path, Some(args)).await {
> +                let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &current);
> +                bail!("sync into {target_store_and_ns} failed - namespace creation failed: {err}");
> +            }

should we add a log line here for created intermediate namespaces? namespace
creation won't happen too often, but might be important information..

> +            created = true;
> +            parent = current;
> +        }
> +    }
> +
> +    Ok(created)
> +}
> +
> +/// Push contents of source datastore matched by given push parameters to target.
> +pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats, Error> {
> +    let mut errors = false;
> +    if !params.target.api_version.supports_feature("prune-delete-stats") && params.remove_vanished {
> +        info!("Older api version on remote detected, delete stats might be incomplete");

I think this might be a bit more prominent, but not sure.. or maybe, set a flag
in the removedstats and print another info line at the end of the push if it is
set? making it a warning is probably overkill, since then every task pushing to
an older server would have a warning..

> +    }
> +
> +    // Generate list of source namespaces to push to target, limited by max-depth
> +    let mut namespaces = params.source.list_namespaces(&mut params.max_depth).await?;
> +
> +    check_namespace_depth_limit(&params.source.get_ns(), &params.target.ns, &namespaces)?;
> +
> +    namespaces.sort_unstable_by_key(|a| a.name_len());
> +
> +    // Fetch all accessible namespaces already present on the target
> +    let target_namespaces = fetch_target_namespaces(&params).await?;
> +    // Remember synced namespaces, removing non-synced ones when remove vanished flag is set
> +    let mut synced_namespaces = HashSet::with_capacity(namespaces.len());
> +
> +    let (mut groups, mut snapshots) = (0, 0);
> +    let mut stats = SyncStats::default();
> +    for namespace in namespaces {
> +        let source_store_and_ns = print_store_and_ns(params.source.store.name(), &namespace);
> +        let target_namespace = params.map_to_target(&namespace)?;
> +        let target_store_and_ns = print_store_and_ns(params.target.repo.store(), &target_namespace);
> +
> +        info!("----");
> +        info!("Syncing {source_store_and_ns} into {target_store_and_ns}");
> +
> +        synced_namespaces.insert(target_namespace.clone());
> +
> +        match check_or_create_target_namespace(&params, &target_namespaces, &target_namespace).await
> +        {
> +            Ok(true) => info!("Created namespace {target_namespace}"),
> +            Ok(false) => {}
> +            Err(err) => {
> +                info!("Cannot sync {source_store_and_ns} into {target_store_and_ns} - {err}");
> +                errors = true;
> +                continue;
> +            }
> +        }
> +
> +        match push_namespace(&namespace, &params).await {
> +            Ok((sync_progress, sync_stats, sync_errors)) => {
> +                errors |= sync_errors;
> +                stats.add(sync_stats);
> +
> +                if params.max_depth != Some(0) {
> +                    groups += sync_progress.done_groups;
> +                    snapshots += sync_progress.done_snapshots;
> +
> +                    let ns = if namespace.is_root() {
> +                        "root namespace".into()
> +                    } else {
> +                        format!("namespace {namespace}")
> +                    };
> +                    info!(
> +                        "Finished syncing {ns}, current progress: {groups} groups, {snapshots} snapshots"
> +                    );
> +                }
> +            }
> +            Err(err) => {
> +                errors = true;
> +                info!("Encountered errors while syncing namespace {namespace} - {err}");
> +            }
> +        }
> +    }
> +
> +    if params.remove_vanished {
> +        for target_namespace in target_namespaces {

target_namespaces contains *all* target namespaces as they are called on the
target side, not just those below our target "anchor".. this needs additional
filtering, else you might remove entirely unrelated namespaces here..

> +            if synced_namespaces.contains(&target_namespace) {
> +                continue;
> +            }
> +            match remove_target_namespace(&params, &target_namespace).await {

see above w.r.t. remove_target_namespace, this is actually wrong with the
current code (where remove_target_namespace does yet another mapping), but
becomes right once the changes I suggested are incorporated..

> +                Ok(delete_stats) => {
> +                    stats.add(SyncStats::from(RemovedVanishedStats {
> +                        snapshots: delete_stats.removed_snapshots(),
> +                        groups: delete_stats.removed_groups(),
> +                        namespaces: 1,
> +                    }));
> +                    if delete_stats.protected_snapshots() > 0 {
> +                        warn!(
> +                            "kept {protected_count} protected snapshots of namespace '{target_namespace}'",

most output refers to the namespaces as they are called on the source side, should we keep this?

> +                            protected_count = delete_stats.protected_snapshots(),
> +                        );
> +                        continue;
> +                    }
> +                }
> +                Err(err) => {
> +                    warn!("failed to remove vanished namespace {target_namespace} - {err}");

same here..

> +                    continue;
> +                }
> +            }
> +            info!("removed vanished namespace {target_namespace}");

and here..

> +        }
> +    }
> +
> +    if errors {
> +        bail!("sync failed with some errors.");
> +    }
> +
> +    Ok(stats)
> +}
> +
> +/// Push namespace including all backup groups to target
> +///
> +/// Iterate over all backup groups in the namespace and push them to the target.
> +pub(crate) async fn push_namespace(
> +    namespace: &BackupNamespace,
> +    params: &PushParameters,
> +) -> Result<(StoreProgress, SyncStats, bool), Error> {
> +    // Check if user is allowed to perform backups on remote datastore
> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_BACKUP)

this needs to be mapped..

> +        .map_err(|err| format_err!("Pushing to remote not allowed - {err}"))?;
> +
> +    let mut list: Vec<BackupGroup> = params
> +        .source
> +        .list_groups(namespace, &params.local_user)
> +        .await?;
> +
> +    list.sort_unstable_by(|a, b| {
> +        let type_order = a.ty.cmp(&b.ty);
> +        if type_order == Ordering::Equal {
> +            a.id.cmp(&b.id)
> +        } else {
> +            type_order
> +        }
> +    });

once more, this could just use BackupGroup's impl of Ord..

> +
> +    let total = list.len();
> +    let list: Vec<BackupGroup> = list
> +        .into_iter()
> +        .filter(|group| group.apply_filters(&params.group_filter))
> +        .collect();
> +
> +    info!(
> +        "found {filtered} groups to sync (out of {total} total)",
> +        filtered = list.len()
> +    );
> +
> +    let mut errors = false;
> +    // Remember synced groups, remove others when the remove vanished flag is set
> +    let mut synced_groups = HashSet::new();
> +    let mut progress = StoreProgress::new(list.len() as u64);
> +    let mut stats = SyncStats::default();
> +
> +    let (owned_target_groups, not_owned_target_groups) =
> +        fetch_target_groups(params, namespace).await?;

should use mapped namespace..

> +
> +    for (done, group) in list.into_iter().enumerate() {
> +        progress.done_groups = done as u64;
> +        progress.done_snapshots = 0;
> +        progress.group_snapshots = 0;
> +
> +        if not_owned_target_groups.contains(&group) {
> +            warn!("group '{group}' not owned by remote user on target, skip");

should we include the remote user/authid here? you can only end up here if you
have Remote.Audit on the remote, so you can already query that anyway..

> +            continue;
> +        }
> +        synced_groups.insert(group.clone());
> +
> +        match push_group(params, namespace, &group, &mut progress).await {

this one actually requires the source side namespace, so that's okay (and also
why this whole fn can't be switched over to just take the target NS ;))

> +            Ok(sync_stats) => stats.add(sync_stats),
> +            Err(err) => {
> +                warn!("sync group '{group}' failed  - {err}");
> +                errors = true;
> +            }
> +        }
> +    }
> +
> +    if params.remove_vanished {
> +        // only ever allow to prune owned groups on target
> +        for target_group in owned_target_groups {
> +            if synced_groups.contains(&target_group) {
> +                continue;
> +            }
> +            if !target_group.apply_filters(&params.group_filter) {
> +                continue;
> +            }
> +
> +            info!("delete vanished group '{target_group}'");
> +
> +            match remove_target_group(params, namespace, &target_group).await {

this should use the mapped namespace again

> +                Ok(delete_stats) => {
> +                    if delete_stats.protected_snapshots() > 0 {
> +                        warn!(
> +                            "kept {protected_count} protected snapshots of group '{target_group}'",
> +                            protected_count = delete_stats.protected_snapshots(),
> +                        );
> +                    }
> +                    stats.add(SyncStats::from(RemovedVanishedStats {
> +                        snapshots: delete_stats.removed_snapshots(),
> +                        groups: delete_stats.removed_groups(),
> +                        namespaces: 0,
> +                    }));
> +                }
> +                Err(err) => {
> +                    warn!("failed to delete vanished group - {err}");
> +                    errors = true;
> +                    continue;
> +                }
> +            }
> +        }
> +    }
> +
> +    Ok((progress, stats, errors))
> +}
> +
> +async fn fetch_target_snapshots(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

this should use the mapped/target_namespace

> +    group: &BackupGroup,
> +) -> Result<Vec<SnapshotListItem>, Error> {
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/snapshots",
> +        store = params.target.repo.store(),
> +    );
> +    let mut args = serde_json::to_value(group)?;
> +    if !namespace.is_root() {
> +        let target_ns = params.map_to_target(namespace)?;
> +        args["ns"] = serde_json::to_value(target_ns)?;
> +    }
> +    let mut result = params.target.client.get(&api_path, Some(args)).await?;
> +    let snapshots: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> +
> +    Ok(snapshots)
> +}
> +
> +async fn fetch_previous_backup_time(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

target_namespace as well..

> +    group: &BackupGroup,
> +) -> Result<Option<i64>, Error> {
> +    let mut snapshots = fetch_target_snapshots(params, namespace, group).await?;
> +    snapshots.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> +    Ok(snapshots.last().map(|snapshot| snapshot.backup.time))
> +}
> +
> +async fn forget_target_snapshot(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,

target_namespace as well..

> +    snapshot: &BackupDir,
> +) -> Result<(), Error> {
> +    check_ns_remote_datastore_privs(params, namespace, PRIV_REMOTE_DATASTORE_PRUNE)
> +        .map_err(|err| format_err!("Pruning remote datastore contents not allowed - {err}"))?;
> +
> +    let api_path = format!(
> +        "api2/json/admin/datastore/{store}/snapshots",
> +        store = params.target.repo.store(),
> +    );
> +    let mut args = serde_json::to_value(snapshot)?;
> +    if !namespace.is_root() {
> +        let target_ns = params.map_to_target(namespace)?;
> +        args["ns"] = serde_json::to_value(target_ns)?;
> +    }
> +    params.target.client.delete(&api_path, Some(args)).await?;
> +
> +    Ok(())
> +}
> +
> +/// Push group including all snaphshots to target
> +///
> +/// Iterate over all snapshots in the group and push them to the target.
> +/// The group sync operation consists of the following steps:
> +/// - Query snapshots of given group from the source
> +/// - Sort snapshots by time
> +/// - Apply transfer last cutoff and filters to list
> +/// - Iterate the snapshot list and push each snapshot individually
> +/// - (Optional): Remove vanished groups if `remove_vanished` flag is set
> +pub(crate) async fn push_group(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,
> +    group: &BackupGroup,
> +    progress: &mut StoreProgress,
> +) -> Result<SyncStats, Error> {
> +    let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> +    let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
> +
> +    let mut snapshots: Vec<BackupDir> = params.source.list_backup_dirs(namespace, group).await?;
> +    snapshots.sort_unstable_by(|a, b| a.time.cmp(&b.time));
> +
> +    let total_snapshots = snapshots.len();
> +    let cutoff = params
> +        .transfer_last
> +        .map(|count| total_snapshots.saturating_sub(count))
> +        .unwrap_or_default();
> +
> +    let last_snapshot_time = fetch_previous_backup_time(params, namespace, group)
> +        .await?
> +        .unwrap_or(i64::MIN);
> +
> +    let mut source_snapshots = HashSet::new();
> +    let snapshots: Vec<BackupDir> = snapshots
> +        .into_iter()
> +        .enumerate()
> +        .filter(|&(pos, ref snapshot)| {
> +            source_snapshots.insert(snapshot.time);
> +            if last_snapshot_time > snapshot.time {
> +                already_synced_skip_info.update(snapshot.time);
> +                return false;
> +            } else if already_synced_skip_info.count > 0 {
> +                info!("{already_synced_skip_info}");
> +                already_synced_skip_info.reset();
> +                return true;

didn't you just discover that this return here is wrong? ;)

> +            }
> +
> +            if pos < cutoff && last_snapshot_time != snapshot.time {
> +                transfer_last_skip_info.update(snapshot.time);
> +                return false;

does the last_snapshot_time vs snapshot.time check make sense for push? we
can't overwrite the existing manifest in that case, all we could do would be
uploading a log that is missing.. but we don't currently attempt that
(continued below..)

> +            } else if transfer_last_skip_info.count > 0 {
> +                info!("{transfer_last_skip_info}");
> +                transfer_last_skip_info.reset();
> +            }
> +            true
> +        })
> +        .map(|(_, dir)| dir)
> +        .collect();
> +
> +    progress.group_snapshots = snapshots.len() as u64;
> +
> +    let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;
> +    let target_snapshots: Vec<BackupDir> = target_snapshots
> +        .into_iter()
> +        .map(|snapshot| snapshot.backup)
> +        .collect();
> +
> +    let mut stats = SyncStats::default();
> +    let mut fetch_previous_manifest = !target_snapshots.is_empty();
> +    for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
> +        if target_snapshots.contains(&source_snapshot) {
> +            progress.done_snapshots = pos as u64 + 1;
> +            info!("percentage done: {progress}");
> +            continue;

because any existing snapshots are skipped here ;)

> +        }
> +        let result =
> +            push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
> +        fetch_previous_manifest = true;
> +
> +        progress.done_snapshots = pos as u64 + 1;
> +        info!("percentage done: {progress}");
> +
> +        // stop on error
> +        let sync_stats = result?;
> +        stats.add(sync_stats);
> +    }
> +
> +    if params.remove_vanished {
> +        let target_snapshots = fetch_target_snapshots(params, namespace, group).await?;

should use target_namespace

> +        for snapshot in target_snapshots {
> +            if source_snapshots.contains(&snapshot.backup.time) {
> +                continue;
> +            }
> +            if snapshot.protected {
> +                info!(
> +                    "don't delete vanished snapshot {name} (protected)",
> +                    name = snapshot.backup
> +                );
> +                continue;
> +            }
> +            if let Err(err) = forget_target_snapshot(params, namespace, &snapshot.backup).await {

should use target_namespace

> +                info!(
> +                    "could not delete vanished snapshot {name} - {err}",
> +                    name = snapshot.backup
> +                );
> +            }
> +            info!("delete vanished snapshot {name}", name = snapshot.backup);
> +            stats.add(SyncStats::from(RemovedVanishedStats {
> +                snapshots: 1,
> +                groups: 0,
> +                namespaces: 0,
> +            }));
> +        }
> +    }
> +
> +    Ok(stats)
> +}
> +
> +/// Push snapshot to target
> +///
> +/// Creates a new snapshot on the target and pushes the content of the source snapshot to the
> +/// target by creating a new manifest file and connecting to the remote as backup writer client.
> +/// Chunks are written by recreating the index by uploading the chunk stream as read from the
> +/// source. Data blobs are uploaded as such.
> +pub(crate) async fn push_snapshot(
> +    params: &PushParameters,
> +    namespace: &BackupNamespace,
> +    snapshot: &BackupDir,
> +    fetch_previous_manifest: bool,
> +) -> Result<SyncStats, Error> {
> +    let mut stats = SyncStats::default();
> +    let target_ns = params.map_to_target(namespace)?;
> +    let backup_dir = params
> +        .source
> +        .store
> +        .backup_dir(namespace.clone(), snapshot.clone())?;
> +
> +    // Reader locks the snapshot
> +    let reader = params.source.reader(namespace, snapshot).await?;
> +
> +    // Does not lock the manifest, but the reader already assures a locked snapshot
> +    let source_manifest = match backup_dir.load_manifest() {
> +        Ok((manifest, _raw_size)) => manifest,
> +        Err(err) => {
> +            // No manifest in snapshot or failed to read, warn and skip
> +            log::warn!("failed to load manifest - {err}");
> +            return Ok(stats);
> +        }
> +    };
> +
> +    // Manifest to be created on target, referencing all the source archives after upload.
> +    let mut manifest = BackupManifest::new(snapshot.clone());
> +
> +    // Writer instance locks the snapshot on the remote side
> +    let backup_writer = BackupWriter::start(
> +        &params.target.client,
> +        None,
> +        params.target.repo.store(),
> +        &target_ns,
> +        snapshot,
> +        false,
> +        false,
> +    )
> +    .await?;
> +
> +    let mut previous_manifest = None;
> +    // Use manifest of previous snapshots in group on target for chunk upload deduplication
> +    if fetch_previous_manifest {
> +        match backup_writer.download_previous_manifest().await {
> +            Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
> +            Err(err) => log::info!("Could not download previous manifest - {err}"),
> +        }
> +    };

as discussed off-list, this is not enough - we also need to download each index
for the previous snapshot using the backup writer to properly initialize their
contents as known chunks - both on the client side, and on the server side.

> +
> +    // Dummy upload options: the actual compression and/or encryption already happened while
> +    // the chunks were generated during creation of the backup snapshot, therefore pre-existing
> +    // chunks (already compressed and/or encrypted) can be pushed to the target.
> +    // Further, these steps are skipped in the backup writer upload stream.
> +    //
> +    // Therefore, these values do not need to fit the values given in the manifest.
> +    // The original manifest is uploaded in the end anyways.
> +    //
> +    // Compression is set to true so that the uploaded manifest will be compressed.
> +    // Encrypt is set to assure that above files are not encrypted.
> +    let upload_options = UploadOptions {
> +        compress: true,
> +        encrypt: false,
> +        previous_manifest,
> +        ..UploadOptions::default()
> +    };
> +
> +    // Avoid double upload penalty by remembering already seen chunks
> +    let known_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 1024)));

see above ;)

> +
> +    for entry in source_manifest.files() {
> +        let mut path = backup_dir.full_path();
> +        path.push(&entry.filename);
> +        if path.try_exists()? {
> +            match ArchiveType::from_path(&entry.filename)? {
> +                ArchiveType::Blob => {
> +                    let file = std::fs::File::open(path.clone())?;
> +                    let backup_stats = backup_writer.upload_blob(file, &entry.filename).await?;
> +                    manifest.add_file(
> +                        entry.filename.to_string(),
> +                        backup_stats.size,
> +                        backup_stats.csum,
> +                        entry.chunk_crypt_mode(),
> +                    )?;

I think this

> +                    stats.add(SyncStats {
> +                        chunk_count: backup_stats.chunk_count as usize,
> +                        bytes: backup_stats.size as usize,
> +                        elapsed: backup_stats.duration,
> +                        removed: None,
> +                    });
> +                }
> +                ArchiveType::DynamicIndex => {
> +                    let index = DynamicIndexReader::open(&path)?;
> +                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
> +                    let sync_stats = push_index(
> +                        &entry.filename,
> +                        index,
> +                        chunk_reader,
> +                        &backup_writer,
> +                        &mut manifest,

and this parameter

> +                        entry.chunk_crypt_mode(),
> +                        None,
> +                        known_chunks.clone(),
> +                    )
> +                    .await?;
> +                    stats.add(sync_stats);
> +                }
> +                ArchiveType::FixedIndex => {
> +                    let index = FixedIndexReader::open(&path)?;
> +                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
> +                    let size = index.index_bytes();
> +                    let sync_stats = push_index(
> +                        &entry.filename,
> +                        index,
> +                        chunk_reader,
> +                        &backup_writer,
> +                        &mut manifest,

and this parameter can all be dropped.. because we end up uploading the source
manifest anyway below?

> +                        entry.chunk_crypt_mode(),
> +                        Some(size),
> +                        known_chunks.clone(),
> +                    )
> +                    .await?;
> +                    stats.add(sync_stats);
> +                }
> +            }
> +        } else {
> +            info!("{path:?} does not exist, skipped.");

this should be a warning, or potentially even an error? if the source manifest
references a file that doesn't exist, something is rather wrong?

> +        }
> +    }
> +
> +    // Fetch client log from source and push to target
> +    // this has to be handled individually since the log is never part of the manifest
> +    let mut client_log_path = backup_dir.full_path();
> +    client_log_path.push(CLIENT_LOG_BLOB_NAME);
> +    if client_log_path.is_file() {
> +        backup_writer
> +            .upload_blob_from_file(
> +                &client_log_path,
> +                CLIENT_LOG_BLOB_NAME,
> +                upload_options.clone(),
> +            )
> +            .await?;
> +    }

see comment a bit above w.r.t. handling of resyncing the last already existing
snapshot..

> +    //TODO: only add log line for conditions as described in feedback

leftover?

> +
> +    // Rewrite manifest for pushed snapshot, recreating manifest from source on target
> +    let manifest_json = serde_json::to_value(source_manifest)?;

unsure: should we drop verification state and upload stats? this and not
re-syncing notes and verification state comes up from time to time..

> +    let manifest_string = serde_json::to_string_pretty(&manifest_json)?;
> +    let backup_stats = backup_writer
> +        .upload_blob_from_data(
> +            manifest_string.into_bytes(),
> +            MANIFEST_BLOB_NAME,
> +            upload_options,
> +        )
> +        .await?;
> +    backup_writer.finish().await?;
> +
> +    stats.add(SyncStats {
> +        chunk_count: backup_stats.chunk_count as usize,
> +        bytes: backup_stats.size as usize,
> +        elapsed: backup_stats.duration,
> +        removed: None,
> +    });
> +
> +    Ok(stats)
> +}
> +
> +// Read fixed or dynamic index and push to target by uploading via the backup writer instance
> +//
> +// For fixed indexes, the size must be provided as given by the index reader.
> +#[allow(clippy::too_many_arguments)]
> +async fn push_index<'a>(
> +    filename: &'a str,
> +    index: impl IndexFile + Send + 'static,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
> +    backup_writer: &BackupWriter,
> +    manifest: &mut BackupManifest,

only used to add the file below

> +    crypt_mode: CryptMode,
> +    size: Option<u64>,
> +    known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +) -> Result<SyncStats, Error> {
> +    let (upload_channel_tx, upload_channel_rx) = mpsc::channel(20);
> +    let mut chunk_infos =
> +        stream::iter(0..index.index_count()).map(move |pos| index.chunk_info(pos).unwrap());
> +
> +    tokio::spawn(async move {
> +        while let Some(chunk_info) = chunk_infos.next().await {
> +            // Avoid reading known chunks, as they are not uploaded by the backup writer anyways
> +            let needs_upload = {
> +                // Need to limit the scope of the lock, otherwise the async block is not `Send`
> +                let mut known_chunks = known_chunks.lock().unwrap();
> +                // Check if present and insert, chunk will be read and uploaded below if not present
> +                known_chunks.insert(chunk_info.digest)
> +            };
> +
> +            let merged_chunk_info = if needs_upload {
> +                chunk_reader
> +                    .read_raw_chunk(&chunk_info.digest)
> +                    .await
> +                    .map(|chunk| {
> +                        MergedChunkInfo::New(ChunkInfo {
> +                            chunk,
> +                            digest: chunk_info.digest,
> +                            chunk_len: chunk_info.size(),
> +                            offset: chunk_info.range.start,
> +                        })
> +                    })
> +            } else {
> +                Ok(MergedChunkInfo::Known(vec![(
> +                    // Pass size instead of offset, will be replaced with offset by the backup
> +                    // writer
> +                    chunk_info.size(),
> +                    chunk_info.digest,
> +                )]))
> +            };
> +            let _ = upload_channel_tx.send(merged_chunk_info).await;
> +        }
> +    });
> +
> +    let merged_chunk_info_stream = ReceiverStream::new(upload_channel_rx).map_err(Error::from);
> +
> +    let upload_options = UploadOptions {
> +        compress: true,
> +        encrypt: false,
> +        fixed_size: size,
> +        ..UploadOptions::default()
> +    };
> +
> +    let upload_stats = backup_writer
> +        .upload_index_chunk_info(filename, merged_chunk_info_stream, upload_options)
> +        .await?;
> +
> +    manifest.add_file(
> +        filename.to_string(),
> +        upload_stats.size,
> +        upload_stats.csum,
> +        crypt_mode,
> +    )?;

but this is then not used anywhere because we use the original manifest at the
call site -> can be dropped..

> +
> +    Ok(SyncStats {
> +        chunk_count: upload_stats.chunk_count as usize,
> +        bytes: upload_stats.size as usize,
> +        elapsed: upload_stats.duration,
> +        removed: None,
> +    })
> +}
> -- 
> 2.39.5
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
>




More information about the pbs-devel mailing list