[pbs-devel] [PATCH proxmox-backup v8 22/45] datastore: implement garbage collection for s3 backend

Christian Ebner c.ebner at proxmox.com
Fri Jul 18 16:31:48 CEST 2025


On 7/18/25 11:47 AM, Lukas Wagner wrote:
> 
> 
> On  2025-07-15 14:53, Christian Ebner wrote:
>> Implements the garbage collection for datastores backed by an s3
>> object store.
>> Take advantage of the local datastore by placing marker files in the
>> chunk store during phase 1 of the garbage collection, updating their
>> atime if already present.
>> This allows us to avoid making expensive API calls to update object
>> metadata, which would only be possible via a copy object operation.
>>
>> The phase 2 is implemented by fetching a list of all the chunks via
>> the ListObjectsV2 API call, filtered by the chunk folder prefix.
>> This operation has to be performed in batches of 1000 objects, given
>> by the APIs response limits.
>> For each object key, lookup the marker file and decide based on the
>> marker existence and it's atime if the chunk object needs to be
>> removed. Deletion happens via the delete objects operation, allowing
>> to delete multiple chunks by a single request.
>>
>> This allows to efficiently lookup chunks which are not in use
>> anymore while being performant and cost effective.
>>
>> Baseline runtime performance tests:
>> -----------------------------------
>>
>> 3 garbage collection runs were performed with hot filesystem caches
>> (by additional GC run before the test runs). The PBS instance was
>> virtualized, the same virtualized disk using ZFS for all the local
>> cache stores:
>>
>> All datastores contained the same encrypted data, with the following
>> content statistics:
>> Original data usage: 269.685 GiB
>> On-Disk usage: 9.018 GiB (3.34%)
>> On-Disk chunks: 6477
>> Deduplication factor: 29.90
>> Average chunk size: 1.426 MiB
>>
>> The resutlts demonstrate the overhead caused by the additional
>> ListObjectV2 API calls and their processing, but depending on the
>> object store backend.
>>
>> Average garbage collection runtime:
>> Local datastore:             (2.04 ± 0.01) s
>> Local RADOS gateway (Squid): (3.05 ± 0.01) s
>> AWS S3:                      (3.05 ± 0.01) s
>> Cloudflare R2:               (6.71 ± 0.58) s
>>
>> After pruning of all datastore contents (therefore including
>> DeleteObjects requests):
>> Local datastore:              3.04 s
>> Local RADOS gateway (Squid): 14.08 s
>> AWS S3:                      13.06 s
>> Cloudflare R2:               78.21 s
>>
>> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
>> ---
>> changes since version 7:
>> - no changes
>>
>>   pbs-datastore/src/chunk_store.rs |   4 +
>>   pbs-datastore/src/datastore.rs   | 211 +++++++++++++++++++++++++++----
>>   2 files changed, 190 insertions(+), 25 deletions(-)
>>
>> diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
>> index 8c195df54..95f00e8d5 100644
>> --- a/pbs-datastore/src/chunk_store.rs
>> +++ b/pbs-datastore/src/chunk_store.rs
>> @@ -353,6 +353,10 @@ impl ChunkStore {
>>           ProcessLocker::oldest_shared_lock(self.locker.clone().unwrap())
>>       }
>>   
>> +    pub fn mutex(&self) -> &std::sync::Mutex<()> {
>> +        &self.mutex
>> +    }
>> +
>>       pub fn sweep_unused_chunks(
>>           &self,
>>           oldest_writer: i64,
>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>> index ca099c1d0..6cc7fdbaa 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -4,7 +4,7 @@ use std::os::unix::ffi::OsStrExt;
>>   use std::os::unix::io::AsRawFd;
>>   use std::path::{Path, PathBuf};
>>   use std::sync::{Arc, LazyLock, Mutex};
>> -use std::time::Duration;
>> +use std::time::{Duration, SystemTime};
>>   
>>   use anyhow::{bail, format_err, Context, Error};
>>   use http_body_util::BodyExt;
>> @@ -1209,6 +1209,7 @@ impl DataStore {
>>           chunk_lru_cache: &mut Option<LruCache<[u8; 32], ()>>,
>>           status: &mut GarbageCollectionStatus,
>>           worker: &dyn WorkerTaskContext,
>> +        s3_client: Option<Arc<S3Client>>,
>>       ) -> Result<(), Error> {
>>           status.index_file_count += 1;
>>           status.index_data_bytes += index.index_bytes();
>> @@ -1225,21 +1226,41 @@ impl DataStore {
>>                   }
>>               }
>>   
>> -            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>> -                let hex = hex::encode(digest);
>> -                warn!(
>> -                    "warning: unable to access non-existent chunk {hex}, required by {file_name:?}"
>> -                );
>> -
>> -                // touch any corresponding .bad files to keep them around, meaning if a chunk is
>> -                // rewritten correctly they will be removed automatically, as well as if no index
>> -                // file requires the chunk anymore (won't get to this loop then)
>> -                for i in 0..=9 {
>> -                    let bad_ext = format!("{}.bad", i);
>> -                    let mut bad_path = PathBuf::new();
>> -                    bad_path.push(self.chunk_path(digest).0);
>> -                    bad_path.set_extension(bad_ext);
>> -                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
>> +            match s3_client {
>> +                None => {
>> +                    // Filesystem backend
>> +                    if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>> +                        let hex = hex::encode(digest);
>> +                        warn!(
>> +                            "warning: unable to access non-existent chunk {hex}, required by {file_name:?}"
>> +                        );
>> +
>> +                        // touch any corresponding .bad files to keep them around, meaning if a chunk is
>> +                        // rewritten correctly they will be removed automatically, as well as if no index
>> +                        // file requires the chunk anymore (won't get to this loop then)
>> +                        for i in 0..=9 {
>> +                            let bad_ext = format!("{}.bad", i);
>> +                            let mut bad_path = PathBuf::new();
>> +                            bad_path.push(self.chunk_path(digest).0);
>> +                            bad_path.set_extension(bad_ext);
>> +                            self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
>> +                        }
>> +                    }
>> +                }
>> +                Some(ref _s3_client) => {
>> +                    // Update atime on local cache marker files.
>> +                    if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>> +                        let (chunk_path, _digest) = self.chunk_path(digest);
>> +                        // Insert empty file as marker to tell GC phase2 that this is
>> +                        // a chunk still in-use, so to keep in the S3 object store.
>> +                        std::fs::File::options()
>> +                            .write(true)
>> +                            .create_new(true)
>> +                            .open(&chunk_path)
>> +                            .with_context(|| {
>> +                                format!("failed to create marker for chunk {}", hex::encode(digest))
>> +                            })?;
>> +                    }
>>                   }
>>               }
>>           }
>> @@ -1251,6 +1272,7 @@ impl DataStore {
>>           status: &mut GarbageCollectionStatus,
>>           worker: &dyn WorkerTaskContext,
>>           cache_capacity: usize,
>> +        s3_client: Option<Arc<S3Client>>,
>>       ) -> Result<(), Error> {
>>           // Iterate twice over the datastore to fetch index files, even if this comes with an
>>           // additional runtime cost:
>> @@ -1344,6 +1366,7 @@ impl DataStore {
>>                                   &mut chunk_lru_cache,
>>                                   status,
>>                                   worker,
>> +                                s3_client.as_ref().cloned(),
>>                               )?;
>>   
>>                               if !unprocessed_index_list.remove(&path) {
>> @@ -1378,7 +1401,14 @@ impl DataStore {
>>                       continue;
>>                   }
>>               };
>> -            self.index_mark_used_chunks(index, &path, &mut chunk_lru_cache, status, worker)?;
>> +            self.index_mark_used_chunks(
>> +                index,
>> +                &path,
>> +                &mut chunk_lru_cache,
>> +                status,
>> +                worker,
>> +                s3_client.as_ref().cloned(),
>> +            )?;
>>               warn!("Marked chunks for unexpected index file at '{path:?}'");
>>           }
>>           if strange_paths_count > 0 {
>> @@ -1476,18 +1506,149 @@ impl DataStore {
>>                   1024 * 1024
>>               };
>>   
>> -            info!("Start GC phase1 (mark used chunks)");
>> +            let s3_client = match self.backend()? {
>> +                DatastoreBackend::Filesystem => None,
>> +                DatastoreBackend::S3(s3_client) => {
>> +                    proxmox_async::runtime::block_on(s3_client.head_bucket())
>> +                        .context("failed to reach bucket")?;
>> +                    Some(s3_client)
>> +                }
>> +            };
>>   
>> -            self.mark_used_chunks(&mut gc_status, worker, gc_cache_capacity)
>> -                .context("marking used chunks failed")?;
>> +            info!("Start GC phase1 (mark used chunks)");
>>   
>> -            info!("Start GC phase2 (sweep unused chunks)");
>> -            self.inner.chunk_store.sweep_unused_chunks(
>> -                oldest_writer,
>> -                min_atime,
>> +            self.mark_used_chunks(
>>                   &mut gc_status,
>>                   worker,
>> -            )?;
>> +                gc_cache_capacity,
>> +                s3_client.as_ref().cloned(),
>> +            )
>> +            .context("marking used chunks failed")?;
>> +
>> +            info!("Start GC phase2 (sweep unused chunks)");
>> +
>> +            if let Some(ref s3_client) = s3_client {
>> +                let mut chunk_count = 0;
>> +                let prefix = S3PathPrefix::Some(".chunks/".to_string());
>> +                // Operates in batches of 1000 objects max per request
>> +                let mut list_bucket_result =
>> +                    proxmox_async::runtime::block_on(s3_client.list_objects_v2(&prefix, None))
>> +                        .context("failed to list chunk in s3 object store")?;
>> +
>> +                let mut delete_list = Vec::with_capacity(1000);
>> +                loop {
>> +                    let lock = self.inner.chunk_store.mutex().lock().unwrap();
>> +
>> +                    for content in list_bucket_result.contents {
>> +                        // Check object is actually a chunk
>> +                        let digest = match Path::new::<str>(&content.key).file_name() {
>> +                            Some(file_name) => file_name,
>> +                            // should never be the case as objects will have a filename
>> +                            None => continue,
>> +                        };
>> +                        let bytes = digest.as_bytes();
>> +                        if bytes.len() != 64 && bytes.len() != 64 + ".0.bad".len() {
>> +                            continue;
>> +                        }
>> +                        if !bytes.iter().take(64).all(u8::is_ascii_hexdigit) {
>> +                            continue;
>> +                        }
>> +
>> +                        let bad = bytes.ends_with(b".bad");
>> +
>> +                        // Safe since contains valid ascii hexdigits only as checked above.
>> +                        let digest_str = digest.to_string_lossy();
>> +                        let hexdigit_prefix = unsafe { digest_str.get_unchecked(0..4) };
>> +                        let mut chunk_path = self.base_path();
>> +                        chunk_path.push(".chunks");
>> +                        chunk_path.push(hexdigit_prefix);
>> +                        chunk_path.push(digest);
>> +
>> +                        // Check local markers (created or atime updated during phase1) and
>> +                        // keep or delete chunk based on that.
>> +                        let atime = match std::fs::metadata(chunk_path) {
>> +                            Ok(stat) => stat.accessed()?,
>> +                            Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
>> +                                // File not found, delete by setting atime to unix epoch
>> +                                info!("Not found, mark for deletion: {}", content.key);
>> +                                SystemTime::UNIX_EPOCH
>> +                            }
>> +                            Err(err) => return Err(err.into()),
>> +                        };
>> +                        let atime = atime.duration_since(SystemTime::UNIX_EPOCH)?.as_secs() as i64;
>> +
>> +                        chunk_count += 1;
>> +
>> +                        if atime < min_atime {
>> +                            delete_list.push(content.key);
>> +                            if bad {
>> +                                gc_status.removed_bad += 1;
>> +                            } else {
>> +                                gc_status.removed_chunks += 1;
>> +                            }
>> +                            gc_status.removed_bytes += content.size;
>> +                        } else if atime < oldest_writer {
>> +                            if bad {
>> +                                gc_status.still_bad += 1;
>> +                            } else {
>> +                                gc_status.pending_chunks += 1;
>> +                            }
>> +                            gc_status.pending_bytes += content.size;
>> +                        } else {
>> +                            if !bad {
>> +                                gc_status.disk_chunks += 1;
>> +                            }
>> +                            gc_status.disk_bytes += content.size;
>> +                        }
>> +                    }
>> +
>> +                    if !delete_list.is_empty() {
>> +                        let delete_objects_result = proxmox_async::runtime::block_on(
>> +                            s3_client.delete_objects(&delete_list),
>> +                        )?;
>> +                        if let Some(_err) = delete_objects_result.error {
>> +                            bail!("failed to delete some objects");
>> +                        }
>> +                        delete_list.clear();
>> +                    }
>> +
>> +                    drop(lock);
>> +
>> +                    // Process next batch of chunks if there is more
>> +                    if list_bucket_result.is_truncated {
>> +                        list_bucket_result =
>> +                            proxmox_async::runtime::block_on(s3_client.list_objects_v2(
>> +                                &prefix,
>> +                                list_bucket_result.next_continuation_token.as_deref(),
>> +                            ))?;
>> +                        continue;
>> +                    }
>> +
>> +                    break;
>> +                }
>> +                info!("processed {chunk_count} total chunks");
>> +
>> +                // Phase 2 GC of Filesystem backed storage is phase 3 for S3 backed GC
>> +                info!("Start GC phase3 (sweep unused chunk markers)");
>> +
>> +                let mut tmp_gc_status = GarbageCollectionStatus {
>> +                    upid: Some(upid.to_string()),
>> +                    ..Default::default()
>> +                };
>> +                self.inner.chunk_store.sweep_unused_chunks(
>> +                    oldest_writer,
>> +                    min_atime,
>> +                    &mut tmp_gc_status,
>> +                    worker,
>> +                )?;
>> +            } else {
>> +                self.inner.chunk_store.sweep_unused_chunks(
>> +                    oldest_writer,
>> +                    min_atime,
>> +                    &mut gc_status,
>> +                    worker,
>> +                )?;
>> +            }
> 
> I found this big chunk for new code quite hard to follow.
> 
> I guess everything between the `loop` start and the `if list_bucket_result.is_truncated` could
> maybe separated out to some `process_objects` (todo: find better name) function. IMO
> a good indicator is also the scope where you hold the lock.
> 
> Within this block, it might also make sense to split it further, e.g.
>    - check_if_chunk
>    - get_local_chunk_path
>    - get_local_chunk_atime
>    - ...
> 
> (there might be better ways to separate or name things, but you get the idea)

Okay, took a bit without breaking stuff because of all the 
interdependence here and not all types being pub, but I managed to 
restructure this quite a bit. Although I did opt for a slightly 
different structure as suggested.

> 
> 
>>   
>>               info!(
>>                   "Removed garbage: {}",
> 





More information about the pbs-devel mailing list