[pbs-devel] [PATCH proxmox-backup v8 35/45] api/datastore: implement refresh endpoint for stores with s3 backend

Christian Ebner c.ebner at proxmox.com
Fri Jul 18 17:51:29 CEST 2025


On 7/18/25 2:00 PM, Lukas Wagner wrote:
> 
> 
> On  2025-07-15 14:53, Christian Ebner wrote:
>> Allows to easily refresh the contents on the local cache store for
>> datastores backed by an S3 object store.
>>
>> In order to guarantee that no read or write operations are ongoing,
>> the store is first set into the maintenance mode `S3Refresh`. Objects
>> are then fetched into a temporary directory to avoid loosing contents
>> and consistency in case of an error. Once all objects have been
>> fetched, clears out existing contents and moves the newly fetched
>> contents in place.
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - add more error context
>> - fix clippy warning
>>
>>   pbs-datastore/src/datastore.rs | 172 ++++++++++++++++++++++++++++++++-
>>   src/api2/admin/datastore.rs    |  34 +++++++
>>   2 files changed, 205 insertions(+), 1 deletion(-)
>>
>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>> index cab0f5b4d..c63759f9a 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -10,11 +10,13 @@ use anyhow::{bail, format_err, Context, Error};
>>   use http_body_util::BodyExt;
>>   use nix::unistd::{unlinkat, UnlinkatFlags};
>>   use pbs_tools::lru_cache::LruCache;
>> +use proxmox_lang::try_block;
>> +use tokio::io::AsyncWriteExt;
>>   use tracing::{info, warn};
>>   
>>   use proxmox_human_byte::HumanByte;
>>   use proxmox_s3_client::{
>> -    S3Client, S3ClientConfig, S3ClientOptions, S3ClientSecretsConfig, S3PathPrefix,
>> +    S3Client, S3ClientConfig, S3ClientOptions, S3ClientSecretsConfig, S3ObjectKey, S3PathPrefix,
>>   };
>>   use proxmox_schema::ApiType;
>>   
>> @@ -2132,4 +2134,172 @@ impl DataStore {
>>       pub fn old_locking(&self) -> bool {
>>           *OLD_LOCKING
>>       }
>> +
>> +    /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and
>> +    /// replace the local cache store contents. Once finished disable the maintenance mode again.
>> +    /// Returns with error for other datastore backends without setting the maintenance mode.
>> +    pub async fn s3_refresh(self: &Arc<Self>) -> Result<(), Error> {
>> +        match self.backend()? {
>> +            DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()),
>> +            DatastoreBackend::S3(s3_client) => {
>> +                try_block!({
>> +                    let _lock = pbs_config::datastore::lock_config()?;
>> +                    let (mut section_config, _digest) = pbs_config::datastore::config()?;
>> +                    let mut datastore: DataStoreConfig =
>> +                        section_config.lookup("datastore", self.name())?;
>> +                    datastore.set_maintenance_mode(Some(MaintenanceMode {
>> +                        ty: MaintenanceType::S3Refresh,
>> +                        message: None,
>> +                    }))?;
>> +                    section_config.set_data(self.name(), "datastore", &datastore)?;
>> +                    pbs_config::datastore::save_config(&section_config)?;
>> +                    drop(_lock);
> 
> 
> No need to drop the lock, since the block ends anyway, right?'

Agreed, dropping that here.

> 
> Also this should be done in a tokio::spawn_blocking, if I'm not mistaken?
> (the try_block! is only a convenience wrapper that wraps the block in a function,
> it doesn't spawn the block on the blocking thread pool)

True, allows me to also get rid of the try_block after adapting that 
here ...

> 
>> +                    Ok::<(), Error>(())
>> +                })
>> +                .context("failed to set maintenance mode")?;
>> +
>> +                let store_base = self.base_path();
>> +
>> +                let tmp_base = proxmox_sys::fs::make_tmp_dir(&store_base, None)
>> +                    .context("failed to create temporary content folder in {store_base}")?;
>> +
>> +                let backup_user = pbs_config::backup_user().context("failed to get backup user")?;
>> +                let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
>> +                let file_create_options = CreateOptions::new()
>> +                    .perm(mode)
>> +                    .owner(backup_user.uid)
>> +                    .group(backup_user.gid);
>> +                let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755);
>> +                let dir_create_options = CreateOptions::new()
>> +                    .perm(mode)
>> +                    .owner(backup_user.uid)
>> +                    .group(backup_user.gid);
>> +
>> +                let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string());
>> +                let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name());
>> +                let mut next_continuation_token: Option<String> = None;
>> +                loop {
>> +                    let list_objects_result = s3_client
>> +                        .list_objects_v2(&list_prefix, next_continuation_token.as_deref())
>> +                        .await
>> +                        .context("failed to list object")?;
>> +
>> +                    let objects_to_fetch: Vec<S3ObjectKey> = list_objects_result
>> +                        .contents
>> +                        .into_iter()
>> +                        .map(|item| item.key)
>> +                        .collect();
>> +
>> +                    for object_key in objects_to_fetch {
>> +                        let object_path = format!("{object_key}");
>> +                        let object_path = object_path.strip_prefix(&store_prefix).with_context(||
>> +                            format!("failed to strip store context prefix {store_prefix} for {object_key}")
>> +                        )?;
>> +                        if object_path.ends_with(NAMESPACE_MARKER_FILENAME) {
>> +                            continue;
>> +                        }
>> +
>> +                        info!("Fetching object {object_path}");
>> +
>> +                        let file_path = tmp_base.join(object_path);
>> +                        if let Some(parent) = file_path.parent() {
>> +                            proxmox_sys::fs::create_path(
>> +                                parent,
>> +                                Some(dir_create_options),
>> +                                Some(dir_create_options),
>> +                            )?;
>> +                        }
>> +
>> +                        let mut target_file = tokio::fs::OpenOptions::new()
>> +                            .write(true)
>> +                            .create(true)
>> +                            .truncate(true)
>> +                            .read(true)
>> +                            .open(&file_path)
>> +                            .await
>> +                            .with_context(|| {
>> +                                format!("failed to create target file {file_path:?}")
>> +                            })?;
>> +
>> +                        if let Some(response) = s3_client
>> +                            .get_object(object_key)
>> +                            .await
>> +                            .with_context(|| format!("failed to fetch object {object_path}"))?
>> +                        {
>> +                            let data = response
>> +                                .content
>> +                                .collect()
>> +                                .await
>> +                                .context("failed to collect object contents")?;
>> +                            target_file
>> +                                .write_all(&data.to_bytes())
>> +                                .await
>> +                                .context("failed to write to target file")?;
>> +                            file_create_options
>> +                                .apply_to(&mut target_file, &file_path)
>> +                                .context("failed to set target file create options")?;
>> +                            target_file
>> +                                .flush()
>> +                                .await
>> +                                .context("failed to flush target file")?;
>> +                        } else {
>> +                            bail!("failed to download {object_path}, not found");
>> +                        }
>> +                    }
>> +
>> +                    if list_objects_result.is_truncated {
>> +                        next_continuation_token = list_objects_result
>> +                            .next_continuation_token
>> +                            .as_ref()
>> +                            .cloned();
>> +                        continue;
>> +                    }
>> +                    break;
>> +                }
>> +
>> +                for ty in ["vm", "ct", "host", "ns"] {
>> +                    let store_base_clone = store_base.clone();
>> +                    let tmp_base_clone = tmp_base.clone();
>> +                    tokio::task::spawn_blocking(move || {
>> +                        let type_dir = store_base_clone.join(ty);
>> +                        if let Err(err) = std::fs::remove_dir_all(&type_dir) {
>> +                            if err.kind() != io::ErrorKind::NotFound {
>> +                                return Err(err).with_context(|| {
>> +                                    format!("failed to remove old contents in {type_dir:?}")
>> +                                });
>> +                            }
>> +                        }
>> +                        let tmp_type_dir = tmp_base_clone.join(ty);
>> +                        if let Err(err) = std::fs::rename(&tmp_type_dir, &type_dir) {
>> +                            if err.kind() != io::ErrorKind::NotFound {
>> +                                return Err(err)
>> +                                    .with_context(|| format!("failed to rename {tmp_type_dir:?}"));
>> +                            }
>> +                        }
>> +                        Ok::<(), Error>(())
>> +                    })
>> +                    .await?
>> +                    .with_context(|| format!("failed to refresh {store_base:?}"))?;
>> +                }
>> +
>> +                std::fs::remove_dir_all(&tmp_base).with_context(|| {
>> +                    format!("failed to cleanup temporary content in {tmp_base:?}")
>> +                })?;
>> +
>> +                try_block!({
>> +                    let _lock = pbs_config::datastore::lock_config()?;
>> +                    let (mut section_config, _digest) = pbs_config::datastore::config()?;
>> +                    let mut datastore: DataStoreConfig =
>> +                        section_config.lookup("datastore", self.name())?;
>> +                    datastore.set_maintenance_mode(None)?;
>> +                    section_config.set_data(self.name(), "datastore", &datastore)?;
>> +                    pbs_config::datastore::save_config(&section_config)?;
>> +                    drop(_lock);
>> +                    Ok::<(), Error>(())
>> +                })
>> +                .context("failed to clear maintenance mode")?;
> 
> Same thing here.

... and here

> 
>> +            }
>> +        }
>> +        Ok(())
>> +    }
> 
> In general, I think the s3_refresh function is a good candidate to be broken up into multiple smaller functions
>    - setting/unsetting maintenance mode
>    - creating the new temporary dir
>    - retrieving the objects from S3
>    - replacing the old contents
>    - etc.

Okay, will try to factor out parts of it, although I see not to much 
benefit as this is rather self contained at the moment.

>>   }
>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>> index 80740e3fb..41cbee4de 100644
>> --- a/src/api2/admin/datastore.rs
>> +++ b/src/api2/admin/datastore.rs
>> @@ -2707,6 +2707,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<V
>>       Ok(json!(upid))
>>   }
>>   
>> +#[api(
>> +    protected: true,
>> +    input: {
>> +        properties: {
>> +            store: {
>> +                schema: DATASTORE_SCHEMA,
>> +            },
>> +        }
>> +    },
>> +    returns: {
>> +        schema: UPID_SCHEMA,
>> +    },
>> +    access: {
>> +        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false),
>> +    },
>> +)]
>> +/// Refresh datastore contents from S3 to local cache store.
>> +pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
>> +    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?;
>> +    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>> +    let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>> +
>> +    let upid = WorkerTask::spawn(
>> +        "s3-refresh",
>> +        Some(store),
>> +        auth_id.to_string(),
>> +        to_stdout,
>> +        move |_worker| async move { datastore.s3_refresh().await },
>> +    )?;
>> +
>> +    Ok(json!(upid))
>> +}
>> +
>>   #[sortable]
>>   const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>>       (
>> @@ -2773,6 +2806,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>>           &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
>>       ),
>>       ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
>> +    ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)),
>>       (
>>           "snapshots",
>>           &Router::new()
> 





More information about the pbs-devel mailing list