[pbs-devel] [PATCH proxmox-backup v8 22/45] datastore: implement garbage collection for s3 backend
Lukas Wagner
l.wagner at proxmox.com
Fri Jul 18 11:47:23 CEST 2025
On 2025-07-15 14:53, Christian Ebner wrote:
> Implements the garbage collection for datastores backed by an s3
> object store.
> Take advantage of the local datastore by placing marker files in the
> chunk store during phase 1 of the garbage collection, updating their
> atime if already present.
> This allows us to avoid making expensive API calls to update object
> metadata, which would only be possible via a copy object operation.
>
> The phase 2 is implemented by fetching a list of all the chunks via
> the ListObjectsV2 API call, filtered by the chunk folder prefix.
> This operation has to be performed in batches of 1000 objects, given
> by the APIs response limits.
> For each object key, lookup the marker file and decide based on the
> marker existence and it's atime if the chunk object needs to be
> removed. Deletion happens via the delete objects operation, allowing
> to delete multiple chunks by a single request.
>
> This allows to efficiently lookup chunks which are not in use
> anymore while being performant and cost effective.
>
> Baseline runtime performance tests:
> -----------------------------------
>
> 3 garbage collection runs were performed with hot filesystem caches
> (by additional GC run before the test runs). The PBS instance was
> virtualized, the same virtualized disk using ZFS for all the local
> cache stores:
>
> All datastores contained the same encrypted data, with the following
> content statistics:
> Original data usage: 269.685 GiB
> On-Disk usage: 9.018 GiB (3.34%)
> On-Disk chunks: 6477
> Deduplication factor: 29.90
> Average chunk size: 1.426 MiB
>
> The resutlts demonstrate the overhead caused by the additional
> ListObjectV2 API calls and their processing, but depending on the
> object store backend.
>
> Average garbage collection runtime:
> Local datastore: (2.04 ± 0.01) s
> Local RADOS gateway (Squid): (3.05 ± 0.01) s
> AWS S3: (3.05 ± 0.01) s
> Cloudflare R2: (6.71 ± 0.58) s
>
> After pruning of all datastore contents (therefore including
> DeleteObjects requests):
> Local datastore: 3.04 s
> Local RADOS gateway (Squid): 14.08 s
> AWS S3: 13.06 s
> Cloudflare R2: 78.21 s
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> changes since version 7:
> - no changes
>
> pbs-datastore/src/chunk_store.rs | 4 +
> pbs-datastore/src/datastore.rs | 211 +++++++++++++++++++++++++++----
> 2 files changed, 190 insertions(+), 25 deletions(-)
>
> diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
> index 8c195df54..95f00e8d5 100644
> --- a/pbs-datastore/src/chunk_store.rs
> +++ b/pbs-datastore/src/chunk_store.rs
> @@ -353,6 +353,10 @@ impl ChunkStore {
> ProcessLocker::oldest_shared_lock(self.locker.clone().unwrap())
> }
>
> + pub fn mutex(&self) -> &std::sync::Mutex<()> {
> + &self.mutex
> + }
> +
> pub fn sweep_unused_chunks(
> &self,
> oldest_writer: i64,
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index ca099c1d0..6cc7fdbaa 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -4,7 +4,7 @@ use std::os::unix::ffi::OsStrExt;
> use std::os::unix::io::AsRawFd;
> use std::path::{Path, PathBuf};
> use std::sync::{Arc, LazyLock, Mutex};
> -use std::time::Duration;
> +use std::time::{Duration, SystemTime};
>
> use anyhow::{bail, format_err, Context, Error};
> use http_body_util::BodyExt;
> @@ -1209,6 +1209,7 @@ impl DataStore {
> chunk_lru_cache: &mut Option<LruCache<[u8; 32], ()>>,
> status: &mut GarbageCollectionStatus,
> worker: &dyn WorkerTaskContext,
> + s3_client: Option<Arc<S3Client>>,
> ) -> Result<(), Error> {
> status.index_file_count += 1;
> status.index_data_bytes += index.index_bytes();
> @@ -1225,21 +1226,41 @@ impl DataStore {
> }
> }
>
> - if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
> - let hex = hex::encode(digest);
> - warn!(
> - "warning: unable to access non-existent chunk {hex}, required by {file_name:?}"
> - );
> -
> - // touch any corresponding .bad files to keep them around, meaning if a chunk is
> - // rewritten correctly they will be removed automatically, as well as if no index
> - // file requires the chunk anymore (won't get to this loop then)
> - for i in 0..=9 {
> - let bad_ext = format!("{}.bad", i);
> - let mut bad_path = PathBuf::new();
> - bad_path.push(self.chunk_path(digest).0);
> - bad_path.set_extension(bad_ext);
> - self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
> + match s3_client {
> + None => {
> + // Filesystem backend
> + if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
> + let hex = hex::encode(digest);
> + warn!(
> + "warning: unable to access non-existent chunk {hex}, required by {file_name:?}"
> + );
> +
> + // touch any corresponding .bad files to keep them around, meaning if a chunk is
> + // rewritten correctly they will be removed automatically, as well as if no index
> + // file requires the chunk anymore (won't get to this loop then)
> + for i in 0..=9 {
> + let bad_ext = format!("{}.bad", i);
> + let mut bad_path = PathBuf::new();
> + bad_path.push(self.chunk_path(digest).0);
> + bad_path.set_extension(bad_ext);
> + self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
> + }
> + }
> + }
> + Some(ref _s3_client) => {
> + // Update atime on local cache marker files.
> + if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
> + let (chunk_path, _digest) = self.chunk_path(digest);
> + // Insert empty file as marker to tell GC phase2 that this is
> + // a chunk still in-use, so to keep in the S3 object store.
> + std::fs::File::options()
> + .write(true)
> + .create_new(true)
> + .open(&chunk_path)
> + .with_context(|| {
> + format!("failed to create marker for chunk {}", hex::encode(digest))
> + })?;
> + }
> }
> }
> }
> @@ -1251,6 +1272,7 @@ impl DataStore {
> status: &mut GarbageCollectionStatus,
> worker: &dyn WorkerTaskContext,
> cache_capacity: usize,
> + s3_client: Option<Arc<S3Client>>,
> ) -> Result<(), Error> {
> // Iterate twice over the datastore to fetch index files, even if this comes with an
> // additional runtime cost:
> @@ -1344,6 +1366,7 @@ impl DataStore {
> &mut chunk_lru_cache,
> status,
> worker,
> + s3_client.as_ref().cloned(),
> )?;
>
> if !unprocessed_index_list.remove(&path) {
> @@ -1378,7 +1401,14 @@ impl DataStore {
> continue;
> }
> };
> - self.index_mark_used_chunks(index, &path, &mut chunk_lru_cache, status, worker)?;
> + self.index_mark_used_chunks(
> + index,
> + &path,
> + &mut chunk_lru_cache,
> + status,
> + worker,
> + s3_client.as_ref().cloned(),
> + )?;
> warn!("Marked chunks for unexpected index file at '{path:?}'");
> }
> if strange_paths_count > 0 {
> @@ -1476,18 +1506,149 @@ impl DataStore {
> 1024 * 1024
> };
>
> - info!("Start GC phase1 (mark used chunks)");
> + let s3_client = match self.backend()? {
> + DatastoreBackend::Filesystem => None,
> + DatastoreBackend::S3(s3_client) => {
> + proxmox_async::runtime::block_on(s3_client.head_bucket())
> + .context("failed to reach bucket")?;
> + Some(s3_client)
> + }
> + };
>
> - self.mark_used_chunks(&mut gc_status, worker, gc_cache_capacity)
> - .context("marking used chunks failed")?;
> + info!("Start GC phase1 (mark used chunks)");
>
> - info!("Start GC phase2 (sweep unused chunks)");
> - self.inner.chunk_store.sweep_unused_chunks(
> - oldest_writer,
> - min_atime,
> + self.mark_used_chunks(
> &mut gc_status,
> worker,
> - )?;
> + gc_cache_capacity,
> + s3_client.as_ref().cloned(),
> + )
> + .context("marking used chunks failed")?;
> +
> + info!("Start GC phase2 (sweep unused chunks)");
> +
> + if let Some(ref s3_client) = s3_client {
> + let mut chunk_count = 0;
> + let prefix = S3PathPrefix::Some(".chunks/".to_string());
> + // Operates in batches of 1000 objects max per request
> + let mut list_bucket_result =
> + proxmox_async::runtime::block_on(s3_client.list_objects_v2(&prefix, None))
> + .context("failed to list chunk in s3 object store")?;
> +
> + let mut delete_list = Vec::with_capacity(1000);
> + loop {
> + let lock = self.inner.chunk_store.mutex().lock().unwrap();
> +
> + for content in list_bucket_result.contents {
> + // Check object is actually a chunk
> + let digest = match Path::new::<str>(&content.key).file_name() {
> + Some(file_name) => file_name,
> + // should never be the case as objects will have a filename
> + None => continue,
> + };
> + let bytes = digest.as_bytes();
> + if bytes.len() != 64 && bytes.len() != 64 + ".0.bad".len() {
> + continue;
> + }
> + if !bytes.iter().take(64).all(u8::is_ascii_hexdigit) {
> + continue;
> + }
> +
> + let bad = bytes.ends_with(b".bad");
> +
> + // Safe since contains valid ascii hexdigits only as checked above.
> + let digest_str = digest.to_string_lossy();
> + let hexdigit_prefix = unsafe { digest_str.get_unchecked(0..4) };
> + let mut chunk_path = self.base_path();
> + chunk_path.push(".chunks");
> + chunk_path.push(hexdigit_prefix);
> + chunk_path.push(digest);
> +
> + // Check local markers (created or atime updated during phase1) and
> + // keep or delete chunk based on that.
> + let atime = match std::fs::metadata(chunk_path) {
> + Ok(stat) => stat.accessed()?,
> + Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
> + // File not found, delete by setting atime to unix epoch
> + info!("Not found, mark for deletion: {}", content.key);
> + SystemTime::UNIX_EPOCH
> + }
> + Err(err) => return Err(err.into()),
> + };
> + let atime = atime.duration_since(SystemTime::UNIX_EPOCH)?.as_secs() as i64;
> +
> + chunk_count += 1;
> +
> + if atime < min_atime {
> + delete_list.push(content.key);
> + if bad {
> + gc_status.removed_bad += 1;
> + } else {
> + gc_status.removed_chunks += 1;
> + }
> + gc_status.removed_bytes += content.size;
> + } else if atime < oldest_writer {
> + if bad {
> + gc_status.still_bad += 1;
> + } else {
> + gc_status.pending_chunks += 1;
> + }
> + gc_status.pending_bytes += content.size;
> + } else {
> + if !bad {
> + gc_status.disk_chunks += 1;
> + }
> + gc_status.disk_bytes += content.size;
> + }
> + }
> +
> + if !delete_list.is_empty() {
> + let delete_objects_result = proxmox_async::runtime::block_on(
> + s3_client.delete_objects(&delete_list),
> + )?;
> + if let Some(_err) = delete_objects_result.error {
> + bail!("failed to delete some objects");
> + }
> + delete_list.clear();
> + }
> +
> + drop(lock);
> +
> + // Process next batch of chunks if there is more
> + if list_bucket_result.is_truncated {
> + list_bucket_result =
> + proxmox_async::runtime::block_on(s3_client.list_objects_v2(
> + &prefix,
> + list_bucket_result.next_continuation_token.as_deref(),
> + ))?;
> + continue;
> + }
> +
> + break;
> + }
> + info!("processed {chunk_count} total chunks");
> +
> + // Phase 2 GC of Filesystem backed storage is phase 3 for S3 backed GC
> + info!("Start GC phase3 (sweep unused chunk markers)");
> +
> + let mut tmp_gc_status = GarbageCollectionStatus {
> + upid: Some(upid.to_string()),
> + ..Default::default()
> + };
> + self.inner.chunk_store.sweep_unused_chunks(
> + oldest_writer,
> + min_atime,
> + &mut tmp_gc_status,
> + worker,
> + )?;
> + } else {
> + self.inner.chunk_store.sweep_unused_chunks(
> + oldest_writer,
> + min_atime,
> + &mut gc_status,
> + worker,
> + )?;
> + }
I found this big chunk for new code quite hard to follow.
I guess everything between the `loop` start and the `if list_bucket_result.is_truncated` could
maybe separated out to some `process_objects` (todo: find better name) function. IMO
a good indicator is also the scope where you hold the lock.
Within this block, it might also make sense to split it further, e.g.
- check_if_chunk
- get_local_chunk_path
- get_local_chunk_atime
- ...
(there might be better ways to separate or name things, but you get the idea)
>
> info!(
> "Removed garbage: {}",
--
- Lukas
More information about the pbs-devel
mailing list