[pbs-devel] [PATCH proxmox-backup v9 36/46] api/datastore: implement refresh endpoint for stores with s3 backend

Hannes Laimer h.laimer at proxmox.com
Mon Jul 21 16:16:20 CEST 2025


On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
> Allows to easily refresh the contents on the local cache store for
> datastores backed by an S3 object store.
>
> In order to guarantee that no read or write operations are ongoing,
> the store is first set into the maintenance mode `S3Refresh`. Objects
> are then fetched into a temporary directory to avoid loosing contents
> and consistency in case of an error. Once all objects have been
> fetched, clears out existing contents and moves the newly fetched
> contents in place.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 8:
> - refactor s3 refresh into more compact methods
> - drop un-necessary drop(_lock)
> - use missing tokio::task::spawn_blocking context for blocking
>   maintenance mode setting
>
>  pbs-datastore/src/datastore.rs | 175 +++++++++++++++++++++++++++++++++
>  src/api2/admin/datastore.rs    |  34 +++++++
>  2 files changed, 209 insertions(+)
>
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index a524d7b32..b2af05eac 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Context, Error};
>  use http_body_util::BodyExt;
>  use nix::unistd::{unlinkat, UnlinkatFlags};
>  use pbs_tools::lru_cache::LruCache;
> +use tokio::io::AsyncWriteExt;
>  use tracing::{info, warn};
>  
>  use proxmox_human_byte::HumanByte;
> @@ -2200,4 +2201,178 @@ impl DataStore {
>      pub fn old_locking(&self) -> bool {
>          *OLD_LOCKING
>      }
> +
> +    /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and
> +    /// replace the local cache store contents. Once finished disable the maintenance mode again.
> +    /// Returns with error for other datastore backends without setting the maintenance mode.
> +    pub async fn s3_refresh(self: &Arc<Self>) -> Result<(), Error> {
> +        match self.backend()? {
> +            DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()),
> +            DatastoreBackend::S3(s3_client) => {
> +                let self_clone = Arc::clone(self);
> +                tokio::task::spawn_blocking(move || {
> +                    self_clone.maintenance_mode(Some(MaintenanceMode {
> +                        ty: MaintenanceType::S3Refresh,
> +                        message: None,
> +                    }))
> +                })
> +                .await?
> +                .context("failed to set maintenance mode")?;

I think we should hold the config lock, so it can't be changed while we
refresh, no?

> +
> +                let tmp_base = proxmox_sys::fs::make_tmp_dir(&self.base_path(), None)
> +                    .context("failed to create temporary content folder in {store_base}")?;
> +
> +                self.fetch_tmp_contents(&tmp_base, &s3_client).await?;
> +                self.move_tmp_contents_in_place(&tmp_base).await?;
> +
> +                let self_clone = Arc::clone(self);
> +                tokio::task::spawn_blocking(move || self_clone.maintenance_mode(None))
> +                    .await?
> +                    .context("failed to clear maintenance mode")?;
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    // Set or clear the datastores maintenance mode by locking and updating the datastore config
> +    fn maintenance_mode(&self, maintenance_mode: Option<MaintenanceMode>) -> Result<(), Error> {
> +        let _lock = pbs_config::datastore::lock_config()?;
> +        let (mut section_config, _digest) = pbs_config::datastore::config()?;
> +        let mut datastore: DataStoreConfig = section_config.lookup("datastore", self.name())?;
> +        datastore.set_maintenance_mode(maintenance_mode)?;
> +        section_config.set_data(self.name(), "datastore", &datastore)?;
> +        pbs_config::datastore::save_config(&section_config)?;
> +        Ok(())
> +    }
> +
> +    // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the
> +    // provided temporaray directory
> +    async fn fetch_tmp_contents(&self, tmp_base: &Path, s3_client: &S3Client) -> Result<(), Error> {
> +        let backup_user = pbs_config::backup_user().context("failed to get backup user")?;
> +        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
> +        let file_create_options = CreateOptions::new()
> +            .perm(mode)
> +            .owner(backup_user.uid)
> +            .group(backup_user.gid);
> +        let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755);
> +        let dir_create_options = CreateOptions::new()
> +            .perm(mode)
> +            .owner(backup_user.uid)
> +            .group(backup_user.gid);
> +
> +        let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string());
> +        let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name());
> +        let mut next_continuation_token: Option<String> = None;
> +        loop {
> +            let list_objects_result = s3_client
> +                .list_objects_v2(&list_prefix, next_continuation_token.as_deref())
> +                .await
> +                .context("failed to list object")?;
> +
> +            let objects_to_fetch: Vec<S3ObjectKey> = list_objects_result
> +                .contents
> +                .into_iter()
> +                .map(|item| item.key)
> +                .collect();
> +
> +            for object_key in objects_to_fetch {
> +                let object_path = format!("{object_key}");
> +                let object_path = object_path.strip_prefix(&store_prefix).with_context(|| {
> +                    format!("failed to strip store context prefix {store_prefix} for {object_key}")
> +                })?;
> +                if object_path.ends_with(NAMESPACE_MARKER_FILENAME) {
> +                    continue;
> +                }
> +
> +                info!("Fetching object {object_path}");
> +
> +                let file_path = tmp_base.join(object_path);
> +                if let Some(parent) = file_path.parent() {
> +                    proxmox_sys::fs::create_path(
> +                        parent,
> +                        Some(dir_create_options),
> +                        Some(dir_create_options),
> +                    )?;
> +                }
> +
> +                let mut target_file = tokio::fs::OpenOptions::new()
> +                    .write(true)
> +                    .create(true)
> +                    .truncate(true)
> +                    .read(true)
> +                    .open(&file_path)
> +                    .await
> +                    .with_context(|| format!("failed to create target file {file_path:?}"))?;
> +
> +                if let Some(response) = s3_client
> +                    .get_object(object_key)
> +                    .await
> +                    .with_context(|| format!("failed to fetch object {object_path}"))?
> +                {
> +                    let data = response
> +                        .content
> +                        .collect()
> +                        .await
> +                        .context("failed to collect object contents")?;
> +                    target_file
> +                        .write_all(&data.to_bytes())
> +                        .await
> +                        .context("failed to write to target file")?;
> +                    file_create_options
> +                        .apply_to(&mut target_file, &file_path)
> +                        .context("failed to set target file create options")?;
> +                    target_file
> +                        .flush()
> +                        .await
> +                        .context("failed to flush target file")?;
> +                } else {
> +                    bail!("failed to download {object_path}, not found");
> +                }
> +            }
> +
> +            if list_objects_result.is_truncated {
> +                next_continuation_token = list_objects_result
> +                    .next_continuation_token
> +                    .as_ref()
> +                    .cloned();
> +                continue;
> +            }
> +            break;
> +        }
> +        Ok(())
> +    }
> +
> +    // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the
> +    // provided temporaray directory
> +    async fn move_tmp_contents_in_place(&self, tmp_base: &PathBuf) -> Result<(), Error> {
> +        for ty in ["vm", "ct", "host", "ns"] {
> +            let store_base_clone = self.base_path().clone();
> +            let tmp_base_clone = tmp_base.clone();
> +            tokio::task::spawn_blocking(move || {
> +                let type_dir = store_base_clone.join(ty);
> +                if let Err(err) = std::fs::remove_dir_all(&type_dir) {
> +                    if err.kind() != io::ErrorKind::NotFound {
> +                        return Err(err).with_context(|| {
> +                            format!("failed to remove old contents in {type_dir:?}")
> +                        });
> +                    }
> +                }
> +                let tmp_type_dir = tmp_base_clone.join(ty);
> +                if let Err(err) = std::fs::rename(&tmp_type_dir, &type_dir) {
> +                    if err.kind() != io::ErrorKind::NotFound {
> +                        return Err(err)
> +                            .with_context(|| format!("failed to rename {tmp_type_dir:?}"));
> +                    }
> +                }
> +                Ok::<(), Error>(())
> +            })
> +            .await?
> +            .with_context(|| format!("failed to refresh {:?}", self.base_path()))?;
> +        }
> +
> +        std::fs::remove_dir_all(&tmp_base)
> +            .with_context(|| format!("failed to cleanup temporary content in {tmp_base:?}"))?;
> +
> +        Ok(())
> +    }
>  }
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 87a8641bd..23b216bef 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -2707,6 +2707,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<V
>      Ok(json!(upid))
>  }
>  
> +#[api(
> +    protected: true,
> +    input: {
> +        properties: {
> +            store: {
> +                schema: DATASTORE_SCHEMA,
> +            },
> +        }
> +    },
> +    returns: {
> +        schema: UPID_SCHEMA,
> +    },
> +    access: {
> +        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false),
> +    },
> +)]
> +/// Refresh datastore contents from S3 to local cache store.
> +pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
> +    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?;
> +    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> +    let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
> +
> +    let upid = WorkerTask::spawn(
> +        "s3-refresh",
> +        Some(store),
> +        auth_id.to_string(),
> +        to_stdout,
> +        move |_worker| async move { datastore.s3_refresh().await },
> +    )?;
> +
> +    Ok(json!(upid))
> +}
> +
>  #[sortable]
>  const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>      (
> @@ -2773,6 +2806,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>          &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
>      ),
>      ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
> +    ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)),
>      (
>          "snapshots",
>          &Router::new()





More information about the pbs-devel mailing list