[pbs-devel] [PATCH proxmox-backup v9 36/46] api/datastore: implement refresh endpoint for stores with s3 backend
Hannes Laimer
h.laimer at proxmox.com
Mon Jul 21 16:31:26 CEST 2025
On Mon Jul 21, 2025 at 4:26 PM CEST, Christian Ebner wrote:
> On 7/21/25 4:16 PM, Hannes Laimer wrote:
>> On Sat Jul 19, 2025 at 2:50 PM CEST, Christian Ebner wrote:
>>> Allows to easily refresh the contents on the local cache store for
>>> datastores backed by an S3 object store.
>>>
>>> In order to guarantee that no read or write operations are ongoing,
>>> the store is first set into the maintenance mode `S3Refresh`. Objects
>>> are then fetched into a temporary directory to avoid loosing contents
>>> and consistency in case of an error. Once all objects have been
>>> fetched, clears out existing contents and moves the newly fetched
>>> contents in place.
>>>
>>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>>> ---
>>> changes since version 8:
>>> - refactor s3 refresh into more compact methods
>>> - drop un-necessary drop(_lock)
>>> - use missing tokio::task::spawn_blocking context for blocking
>>> maintenance mode setting
>>>
>>> pbs-datastore/src/datastore.rs | 175 +++++++++++++++++++++++++++++++++
>>> src/api2/admin/datastore.rs | 34 +++++++
>>> 2 files changed, 209 insertions(+)
>>>
>>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>>> index a524d7b32..b2af05eac 100644
>>> --- a/pbs-datastore/src/datastore.rs
>>> +++ b/pbs-datastore/src/datastore.rs
>>> @@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Context, Error};
>>> use http_body_util::BodyExt;
>>> use nix::unistd::{unlinkat, UnlinkatFlags};
>>> use pbs_tools::lru_cache::LruCache;
>>> +use tokio::io::AsyncWriteExt;
>>> use tracing::{info, warn};
>>>
>>> use proxmox_human_byte::HumanByte;
>>> @@ -2200,4 +2201,178 @@ impl DataStore {
>>> pub fn old_locking(&self) -> bool {
>>> *OLD_LOCKING
>>> }
>>> +
>>> + /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and
>>> + /// replace the local cache store contents. Once finished disable the maintenance mode again.
>>> + /// Returns with error for other datastore backends without setting the maintenance mode.
>>> + pub async fn s3_refresh(self: &Arc<Self>) -> Result<(), Error> {
>>> + match self.backend()? {
>>> + DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()),
>>> + DatastoreBackend::S3(s3_client) => {
>>> + let self_clone = Arc::clone(self);
>>> + tokio::task::spawn_blocking(move || {
>>> + self_clone.maintenance_mode(Some(MaintenanceMode {
>>> + ty: MaintenanceType::S3Refresh,
>>> + message: None,
>>> + }))
>>> + })
>>> + .await?
>>> + .context("failed to set maintenance mode")?;
>>
>> I think we should hold the config lock, so it can't be changed while we
>> refresh, no?
>
> Yes, but that is handled by the method itself, also to limit lock scope.
>
> See further below...
>
maybe I'm missing something, but the limited scope is what I mean. I
think we should try to prevent changing the maintenance mode away from
`S3Refresh` before we're done, so basically holding the lock while we
refresh.
>>
>>> +
>>> + let tmp_base = proxmox_sys::fs::make_tmp_dir(&self.base_path(), None)
>>> + .context("failed to create temporary content folder in {store_base}")?;
>>> +
>>> + self.fetch_tmp_contents(&tmp_base, &s3_client).await?;
>>> + self.move_tmp_contents_in_place(&tmp_base).await?;
>>> +
>>> + let self_clone = Arc::clone(self);
>>> + tokio::task::spawn_blocking(move || self_clone.maintenance_mode(None))
>>> + .await?
>>> + .context("failed to clear maintenance mode")?;
>>> + }
>>> + }
>>> + Ok(())
>>> + }
>>> +
>>> + // Set or clear the datastores maintenance mode by locking and updating the datastore config
>>> + fn maintenance_mode(&self, maintenance_mode: Option<MaintenanceMode>) -> Result<(), Error> {
>>> + let _lock = pbs_config::datastore::lock_config()?;
>
> ... here the config is locked and the scope limited by the method.
>
>>> + let (mut section_config, _digest) = pbs_config::datastore::config()?;
>>> + let mut datastore: DataStoreConfig = section_config.lookup("datastore", self.name())?;
>>> + datastore.set_maintenance_mode(maintenance_mode)?;
>>> + section_config.set_data(self.name(), "datastore", &datastore)?;
>>> + pbs_config::datastore::save_config(§ion_config)?;
>>> + Ok(())
>>> + }
>>> +
>>> + // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the
>>> + // provided temporaray directory
>>> + async fn fetch_tmp_contents(&self, tmp_base: &Path, s3_client: &S3Client) -> Result<(), Error> {
>>> + let backup_user = pbs_config::backup_user().context("failed to get backup user")?;
>>> + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
>>> + let file_create_options = CreateOptions::new()
>>> + .perm(mode)
>>> + .owner(backup_user.uid)
>>> + .group(backup_user.gid);
>>> + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755);
>>> + let dir_create_options = CreateOptions::new()
>>> + .perm(mode)
>>> + .owner(backup_user.uid)
>>> + .group(backup_user.gid);
>>> +
>>> + let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string());
>>> + let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name());
>>> + let mut next_continuation_token: Option<String> = None;
>>> + loop {
>>> + let list_objects_result = s3_client
>>> + .list_objects_v2(&list_prefix, next_continuation_token.as_deref())
>>> + .await
>>> + .context("failed to list object")?;
>>> +
>>> + let objects_to_fetch: Vec<S3ObjectKey> = list_objects_result
>>> + .contents
>>> + .into_iter()
>>> + .map(|item| item.key)
>>> + .collect();
>>> +
>>> + for object_key in objects_to_fetch {
>>> + let object_path = format!("{object_key}");
>>> + let object_path = object_path.strip_prefix(&store_prefix).with_context(|| {
>>> + format!("failed to strip store context prefix {store_prefix} for {object_key}")
>>> + })?;
>>> + if object_path.ends_with(NAMESPACE_MARKER_FILENAME) {
>>> + continue;
>>> + }
>>> +
>>> + info!("Fetching object {object_path}");
>>> +
>>> + let file_path = tmp_base.join(object_path);
>>> + if let Some(parent) = file_path.parent() {
>>> + proxmox_sys::fs::create_path(
>>> + parent,
>>> + Some(dir_create_options),
>>> + Some(dir_create_options),
>>> + )?;
>>> + }
>>> +
>>> + let mut target_file = tokio::fs::OpenOptions::new()
>>> + .write(true)
>>> + .create(true)
>>> + .truncate(true)
>>> + .read(true)
>>> + .open(&file_path)
>>> + .await
>>> + .with_context(|| format!("failed to create target file {file_path:?}"))?;
>>> +
>>> + if let Some(response) = s3_client
>>> + .get_object(object_key)
>>> + .await
>>> + .with_context(|| format!("failed to fetch object {object_path}"))?
>>> + {
>>> + let data = response
>>> + .content
>>> + .collect()
>>> + .await
>>> + .context("failed to collect object contents")?;
>>> + target_file
>>> + .write_all(&data.to_bytes())
>>> + .await
>>> + .context("failed to write to target file")?;
>>> + file_create_options
>>> + .apply_to(&mut target_file, &file_path)
>>> + .context("failed to set target file create options")?;
>>> + target_file
>>> + .flush()
>>> + .await
>>> + .context("failed to flush target file")?;
>>> + } else {
>>> + bail!("failed to download {object_path}, not found");
>>> + }
>>> + }
>>> +
>>> + if list_objects_result.is_truncated {
>>> + next_continuation_token = list_objects_result
>>> + .next_continuation_token
>>> + .as_ref()
>>> + .cloned();
>>> + continue;
>>> + }
>>> + break;
>>> + }
>>> + Ok(())
>>> + }
>>> +
>>> + // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the
>>> + // provided temporaray directory
>>> + async fn move_tmp_contents_in_place(&self, tmp_base: &PathBuf) -> Result<(), Error> {
>>> + for ty in ["vm", "ct", "host", "ns"] {
>>> + let store_base_clone = self.base_path().clone();
>>> + let tmp_base_clone = tmp_base.clone();
>>> + tokio::task::spawn_blocking(move || {
>>> + let type_dir = store_base_clone.join(ty);
>>> + if let Err(err) = std::fs::remove_dir_all(&type_dir) {
>>> + if err.kind() != io::ErrorKind::NotFound {
>>> + return Err(err).with_context(|| {
>>> + format!("failed to remove old contents in {type_dir:?}")
>>> + });
>>> + }
>>> + }
>>> + let tmp_type_dir = tmp_base_clone.join(ty);
>>> + if let Err(err) = std::fs::rename(&tmp_type_dir, &type_dir) {
>>> + if err.kind() != io::ErrorKind::NotFound {
>>> + return Err(err)
>>> + .with_context(|| format!("failed to rename {tmp_type_dir:?}"));
>>> + }
>>> + }
>>> + Ok::<(), Error>(())
>>> + })
>>> + .await?
>>> + .with_context(|| format!("failed to refresh {:?}", self.base_path()))?;
>>> + }
>>> +
>>> + std::fs::remove_dir_all(&tmp_base)
>>> + .with_context(|| format!("failed to cleanup temporary content in {tmp_base:?}"))?;
>>> +
>>> + Ok(())
>>> + }
>>> }
>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>>> index 87a8641bd..23b216bef 100644
>>> --- a/src/api2/admin/datastore.rs
>>> +++ b/src/api2/admin/datastore.rs
>>> @@ -2707,6 +2707,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<V
>>> Ok(json!(upid))
>>> }
>>>
>>> +#[api(
>>> + protected: true,
>>> + input: {
>>> + properties: {
>>> + store: {
>>> + schema: DATASTORE_SCHEMA,
>>> + },
>>> + }
>>> + },
>>> + returns: {
>>> + schema: UPID_SCHEMA,
>>> + },
>>> + access: {
>>> + permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false),
>>> + },
>>> +)]
>>> +/// Refresh datastore contents from S3 to local cache store.
>>> +pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
>>> + let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?;
>>> + let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>> + let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
>>> +
>>> + let upid = WorkerTask::spawn(
>>> + "s3-refresh",
>>> + Some(store),
>>> + auth_id.to_string(),
>>> + to_stdout,
>>> + move |_worker| async move { datastore.s3_refresh().await },
>>> + )?;
>>> +
>>> + Ok(json!(upid))
>>> +}
>>> +
>>> #[sortable]
>>> const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>>> (
>>> @@ -2773,6 +2806,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
>>> &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
>>> ),
>>> ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
>>> + ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)),
>>> (
>>> "snapshots",
>>> &Router::new()
>>
>>
>>
>> _______________________________________________
>> pbs-devel mailing list
>> pbs-devel at lists.proxmox.com
>> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>>
>>
>
>
>
> _______________________________________________
> pbs-devel mailing list
> pbs-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
More information about the pbs-devel
mailing list