[pbs-devel] [PATCH proxmox-backup v4 42/45] api/datastore: implement refresh endpoint for stores with s3 backend

Christian Ebner c.ebner at proxmox.com
Mon Jun 23 11:41:03 CEST 2025


Allows to easily refresh the contents on the local cache store for
datastores backed by an S3 object store.

In order to guarantee that no read or write operations are ongoing,
the store is first set into the maintenance mode `S3Refresh`. Objects
are then fetched into a temporary directory to avoid loosing contents
and consistency in case of an error. Once all objects have been
fetched, clears out existing contents and moves the newly fetched
contents in place.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 pbs-datastore/src/datastore.rs | 136 +++++++++++++++++++++++++++++++++
 src/api2/admin/datastore.rs    |  34 +++++++++
 2 files changed, 170 insertions(+)

diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index 1c78f42b2..62d6bc4a1 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -1,5 +1,6 @@
 use std::collections::{HashMap, HashSet};
 use std::io::{self, Write};
+use std::ops::Deref;
 use std::os::unix::ffi::OsStrExt;
 use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
@@ -11,6 +12,7 @@ use http_body_util::BodyExt;
 use nix::unistd::{unlinkat, UnlinkatFlags};
 use pbs_s3_client::{PutObjectResponse, S3Client, S3ClientOptions};
 use pbs_tools::lru_cache::LruCache;
+use tokio::io::AsyncWriteExt;
 use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
@@ -2138,4 +2140,138 @@ impl DataStore {
     pub fn old_locking(&self) -> bool {
         *OLD_LOCKING
     }
+
+    /// Set the datastore's maintenance mode to `S3Refresh`, fetch from S3 object store, clear and
+    /// replace the local cache store contents. Once finished disable the maintenance mode again.
+    /// Returns with error for other datastore backends without setting the maintenance mode.
+    pub async fn s3_refresh(self: &Arc<Self>) -> Result<(), Error> {
+        match self.backend()? {
+            DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()),
+            DatastoreBackend::S3(s3_client) => {
+                let _lock = pbs_config::datastore::lock_config()?;
+                let (mut section_config, _digest) = pbs_config::datastore::config()?;
+                let mut datastore: DataStoreConfig =
+                    section_config.lookup("datastore", self.name())?;
+                datastore.set_maintenance_mode(Some(MaintenanceMode {
+                    ty: MaintenanceType::S3Refresh,
+                    message: None,
+                }))?;
+                section_config.set_data(self.name(), "datastore", &datastore)?;
+                pbs_config::datastore::save_config(&section_config)?;
+                drop(_lock);
+
+                let store_base = self.base_path();
+
+                let tmp_base = proxmox_sys::fs::make_tmp_dir(&store_base, None)?;
+
+                let backup_user = pbs_config::backup_user()?;
+                let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
+                let file_create_options = CreateOptions::new()
+                    .perm(mode)
+                    .owner(backup_user.uid)
+                    .group(backup_user.gid);
+                let mode = nix::sys::stat::Mode::from_bits_truncate(0o0755);
+                let dir_create_options = CreateOptions::new()
+                    .perm(mode)
+                    .owner(backup_user.uid)
+                    .group(backup_user.gid);
+
+                let list_prefix = S3PathPrefix::Some(S3_CONTENT_PREFIX.to_string());
+                let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name());
+                let content_prefix = format!("{S3_CONTENT_PREFIX}/");
+                let mut next_continuation_token: Option<String> = None;
+                loop {
+                    let list_objects_result = s3_client
+                        .list_objects_v2(&list_prefix, next_continuation_token.as_deref())
+                        .await?;
+
+                    let mut objects_to_fetch: Vec<RelS3ObjectKey> =
+                        Vec::with_capacity(list_objects_result.contents.len());
+                    for item in &list_objects_result.contents {
+                        let object_key = item
+                            .key
+                            .strip_prefix(&store_prefix)
+                            .ok_or_else(|| {
+                                format_err!(
+                                    "failed to strip store context prefix {store_prefix} for {}",
+                                    item.key
+                                )
+                            })?
+                            .into();
+                        objects_to_fetch.push(object_key);
+                    }
+
+                    for object in objects_to_fetch {
+                        let object_path = object.deref().strip_prefix(&content_prefix).unwrap();
+                        let object_path = pbs_s3_client::uri_decode(object_path)?;
+                        if object_path.ends_with(NAMESPACE_MARKER_FILENAME) {
+                            continue;
+                        }
+
+                        info!("Fetching object {object_path}");
+
+                        let file_path = tmp_base.join(&object_path);
+                        if let Some(parent) = file_path.parent() {
+                            proxmox_sys::fs::create_path(
+                                parent,
+                                Some(dir_create_options),
+                                Some(dir_create_options),
+                            )?;
+                        }
+
+                        let mut target_file = tokio::fs::OpenOptions::new()
+                            .write(true)
+                            .create(true)
+                            .truncate(true)
+                            .read(true)
+                            .open(&file_path)
+                            .await?;
+
+                        if let Some(response) = s3_client.get_object(object.clone()).await? {
+                            let data = response.content.collect().await?.to_bytes();
+                            target_file.write_all(&data).await?;
+                            file_create_options.apply_to(&mut target_file, &file_path)?;
+                            target_file.flush().await?;
+                        } else {
+                            bail!("failed to download {object_path}, not found");
+                        }
+                    }
+
+                    if list_objects_result.is_truncated {
+                        next_continuation_token = list_objects_result
+                            .next_continuation_token
+                            .as_ref()
+                            .cloned();
+                        continue;
+                    }
+                    break;
+                }
+
+                for ty in ["vm", "ct", "host", "ns"] {
+                    let store_base_clone = store_base.clone();
+                    let tmp_base_clone = tmp_base.clone();
+                    tokio::task::spawn_blocking(move || {
+                        let type_dir = store_base_clone.join(ty);
+                        std::fs::remove_dir_all(&type_dir)?;
+                        let tmp_type_dir = tmp_base_clone.join(ty);
+                        std::fs::rename(tmp_type_dir, type_dir)?;
+                        Ok::<(), Error>(())
+                    })
+                    .await??;
+                }
+
+                std::fs::remove_dir_all(&tmp_base)?;
+
+                let _lock = pbs_config::datastore::lock_config()?;
+                let (mut section_config, _digest) = pbs_config::datastore::config()?;
+                let mut datastore: DataStoreConfig =
+                    section_config.lookup("datastore", self.name())?;
+                datastore.set_maintenance_mode(None)?;
+                section_config.set_data(self.name(), "datastore", &datastore)?;
+                pbs_config::datastore::save_config(&section_config)?;
+                drop(_lock);
+            }
+        }
+        Ok(())
+    }
 }
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index b6ae7bbc1..39b127bb9 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -2686,6 +2686,39 @@ pub async fn unmount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<V
     Ok(json!(upid))
 }
 
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            store: {
+                schema: DATASTORE_SCHEMA,
+            },
+        }
+    },
+    returns: {
+        schema: UPID_SCHEMA,
+    },
+    access: {
+        permission: &Permission::Privilege(&["datastore", "{store}"], PRIV_DATASTORE_MODIFY, false),
+    },
+)]
+/// Refresh datastore contents from S3 to local cache store.
+pub async fn s3_refresh(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Error> {
+    let datastore = DataStore::lookup_datastore(&store, Some(Operation::Lookup))?;
+    let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
+    let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
+
+    let upid = WorkerTask::spawn(
+        "s3-refresh",
+        Some(store),
+        auth_id.to_string(),
+        to_stdout,
+        move |_worker| async move { datastore.s3_refresh().await },
+    )?;
+
+    Ok(json!(upid))
+}
+
 #[sortable]
 const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
     (
@@ -2752,6 +2785,7 @@ const DATASTORE_INFO_SUBDIRS: SubdirMap = &[
         &Router::new().download(&API_METHOD_PXAR_FILE_DOWNLOAD),
     ),
     ("rrd", &Router::new().get(&API_METHOD_GET_RRD_STATS)),
+    ("s3-refresh", &Router::new().put(&API_METHOD_S3_REFRESH)),
     (
         "snapshots",
         &Router::new()
-- 
2.47.2





More information about the pbs-devel mailing list