[pbs-devel] [PATCH proxmox-backup v8 35/45] api/datastore: implement refresh endpoint for stores with s3 backend
Lukas Wagner
l.wagner at proxmox.com
Fri Jul 18 14:01:01 CEST 2025
On 2025-07-15 14:53, Christian Ebner wrote:
> Allows to easily refresh the contents on the local cache store for
> datastores backed by an S3 object store.
>
> In order to guarantee that no read or write operations are ongoing,
> the store is first set into the maintenance mode `S3Refresh`. Objects
> are then fetched into a temporary directory to avoid loosing contents
> and consistency in case of an error. Once all objects have been
> fetched, clears out existing contents and moves the newly fetched
> contents in place.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - add more error context
> - fix clippy warning
>
> pbs-datastore/src/datastore.rs | 172 ++++++++++++++++++++++++++++++++-
> src/api2/admin/datastore.rs | 34 +++++++
> 2 files changed, 205 insertions(+), 1 deletion(-)
>
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index cab0f5b4d..c63759f9a 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -10,11 +10,13 @@ use anyhow::{bail, format_err, Context, Error};
> use http_body_util::BodyExt;
> use nix::unistd::{unlinkat, UnlinkatFlags};
> use pbs_tools::lru_cache::LruCache;
> +use proxmox_lang::try_block;
> +use tokio::io::AsyncWriteExt;
> use tracing::{info, warn};
>
> use proxmox_human_byte::HumanByte;
> use proxmox_s3_client::{
> - S3Client, S3ClientConfig, S3ClientOptions, S3ClientSecretsConfig, S3PathPrefix,
> + S3Client, S3ClientConfig, S3ClientOptions, S3ClientSecretsConfig, S3ObjectKey, S3PathPrefix,
> };
> use proxmox_schema::ApiType;
>
> @@ -2132,4 +2134,172 @@ impl DataStore {
> pub fn old_locking(&self) -> bool {
> *OLD_LOCKING
> }
> +
> + /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and
> + /// replace the local cache store contents. Once finished disable the maintenance mode again.
> + /// Returns with error for other datastore backends without setting the maintenance mode.
> + pub async fn s3_refresh(self: &Arc<Self>) -> Result<(), Error> {
> + match self.backend()? {
> + DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()),
> + DatastoreBackend::S3(s3_client) => {
> + try_block!({
> + let _lock = pbs_config::datastore::lock_config()?;
> + let (mut section_config, _digest) = pbs_config::datastore::config()?;
> + let mut datastore: DataStoreConfig =
> + section_config.lookup("datastore", self.name())?;
> + datastore.set_maintenance_mode(Some(MaintenanceMode {
> + ty: MaintenanceType::S3Refresh,
> + message: None,
> + }))?;
> + section_config.set_data(self.name(), "datastore", &datastore)?;
> + pbs_config::datastore::save_config(§ion_config)?;
> + drop(_lock);
No need to drop the lock, since the block ends anyway, right?
Also this should be done in a tokio::spawn_blocking, if I'm not mistaken?
(the try_block! is only a convenience wrapper that wraps the block in a function,
it doesn't spawn the block on the blocking thread pool)
> + Ok::<(), Error>(())
> + })
> + .context("failed to set maintenance mode")?;
> +
> + let store_base = self.base_path();
> +
> + let tmp_base = proxmox_sys::fs::make_tmp_dir(&store_base, None)
> + .context("failed to create temporary content folder in {store_base}")?;
> +
> + let backup_user = pbs_config::backup_user().context("failed to get backup user")?;
> + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
> + let file_create_options = CreateOptions::new()
> + .perm(mode)
> + .owner(backup_user.uid)
> + .group(backup_user.gid);
> + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755);
> + let dir_create_options = CreateOptions::new()
> + .perm(mode)
> + .owner(backup_user.uid)
> + .group(backup_user.gid);
> +
> + let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string());
> + let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name());
> + let mut next_continuation_token: Option<String> = None;
> + loop {
> + let list_objects_result = s3_client
> + .list_objects_v2(&list_prefix, next_continuation_token.as_deref())
> + .await
> + .context("failed to list object")?;
> +
> + let objects_to_fetch: Vec<S3ObjectKey> = list_objects_result
> + .contents
> + .into_iter()
> + .map(|item| item.key)
> + .collect();
> +
> + for object_key in objects_to_fetch {
> + let object_path = format!("{object_key}");
> + let object_path = object_path.strip_prefix(&store_prefix).with_context(||
> + format!("failed to strip store context prefix {store_prefix} for {object_key}")
> + )?;
> + if object_path.ends_with(NAMESPACE_MARKER_FILENAME) {
> + continue;
> + }
> +
> + info!("Fetching object {object_path}");
> +
> + let file_path = tmp_base.join(object_path);
> + if let Some(parent) = file_path.parent() {
> + proxmox_sys::fs::create_path(
> + parent,
> + Some(dir_create_options),
> + Some(dir_create_options),
> + )?;
> + }
> +
> + let mut target_file = tokio::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .truncate(true)
> + .read(true)
> + .open(&file_path)
> + .await
> + .with_context(|| {
> + format!("failed to create target file {file_path:?}")
> + })?;
> +
> + if let Some(response) = s3_client
> + .get_object(object_key)
> + .await
> + .with_context(|| format!("failed to fetch object {object_path}"))?
> + {
> + let data = response
> + .content
> + .collect()
> + .await
> + .context("failed to collect object contents")?;
> + target_file
> + .write_all(&data.to_bytes())
> + .await
> + .context("failed to write to target file")?;
> + file_create_options
> + .apply_to(&mut target_file, &file_path)
> + .context("failed to set target file create options")?;
> + target_file
> + .flush()
> + .await
> + .context("failed to flush target file")?;
> + } else {
> + bail!("failed to download {object_path}, not found");
> + }
> + }
> +
> + if list_objects_result.is_truncated {
> + next_continuation_token = list_objects_result
> + .next_continuation_token
> + .as_ref()
> + .cloned();
> + continue;
> + }
> + break;
> + }
> +
> + for ty in ["vm", "ct", "host", "ns"] {
> + let store_base_clone = store_base.clone();
> + let tmp_base_clone = tmp_base.clone();
> + tokio::task::spawn_blocking(move || {
> + let type_dir = store_base_clone.join(ty);
> + if let Err(err) = std::fs::remove_dir_all(&type_dir) {
> + if err.kind() != io::ErrorKind::NotFound {
> + return Err(err).with_context(|| {
> + format!("failed to remove old contents in {type_dir:?}")
> + });
> + }
> + }
> + let tmp_type_dir = tmp_base_clone.join(ty);
> + if let Err(err) = std::fs::rename(&tmp_type_dir, &type_dir) {
> + if err.kind() != io::ErrorKind::NotFound {
> + return Err(err)
> + .with_context(|| format!("failed to rename {tmp_type_dir:?}"));
> + }
> + }
> + Ok::<(), Error>(())
> + })
> + .await?
> + .with_context(|| format!("failed to refresh {store_base:?}"))?;
> + }
> +
> + std::fs::remove_dir_all(&tmp_base).with_context(|| {
> + format!("failed to cleanup temporary content in {tmp_base:?}")
> + })?;
> +
> + try_block!({
> + let _lock = pbs_config::datastore::lock_config()?;
> + let (mut section_config, _digest) = pbs_config::datastore::config()?;
> + let mut datastore: DataStoreConfig =
> + section_config.lookup("datastore", self.name())?;
> + datastore.set_maintenance_mode(None)?;
> + section_config.set_data(self.name(), "datastore", &datastore)?;
> + pbs_config::datastore::save_config(§ion_config)?;
> + drop(_lock);
> + Ok::<(), Error>(())
> + })
> + .context("failed to clear maintenance mode")?;
Same thing here.
> + }
> + }
> + Ok(())
> + }
In general, I think the s3_refresh function is a good candidate to be broken up into multiple smaller functions
- setting/unsetting maintenance mode
- creating the new temporary dir
- retrieving the objects from S3
- replacing the old contents
- etc.
> }
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 80740e3fb..41cbee4de 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -2707,6 +2707,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<V
> Ok(json!(upid))
> }
>
> +#[api(
> + protected: true,
> + input: {
> + properties: {
> + store: {
> + schema: DATASTORE_SCHEMA,
> + },
> + }
> + },
> + returns: {
> + schema: UPID_SCHEMA,
> + },
> + access: {
> + permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false),
> + },
> +)]
> +/// Refresh datastore contents from S3 to local cache store.
> +pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
> + let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?;
> + let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> + let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
> +
> + let upid = WorkerTask::spawn(
> + "s3-refresh",
> + Some(store),
> + auth_id.to_string(),
> + to_stdout,
> + move |_worker| async move { datastore.s3_refresh().await },
> + )?;
> +
> + Ok(json!(upid))
> +}
> +
> #[sortable]
> const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
> (
> @@ -2773,6 +2806,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
> &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
> ),
> ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
> + ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)),
> (
> "snapshots",
> &Router::new()
--
- Lukas
More information about the pbs-devel
mailing list