[pbs-devel] [RFC proxmox-backup 10/39] s3 client: implement methods to operate on s3 objects in bucket

Christian Ebner c.ebner at proxmox.com
Mon May 19 13:46:11 CEST 2025


Adds the basic implementation of the client to use s3 object stores
as backend for PBS datastores.

This implements the basic client actions on a bucket and objects
stored within given bucket.

This is not feature complete and intended to be extended on a
per-demand fashion rather than implementing the whole client at once.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 Cargo.toml                           |   2 +
 pbs-s3-client/Cargo.toml             |   8 +
 pbs-s3-client/src/client.rs          | 370 +++++++++++++++++++++++++++
 pbs-s3-client/src/lib.rs             |   2 +
 pbs-s3-client/src/response_reader.rs | 324 +++++++++++++++++++++++
 5 files changed, 706 insertions(+)
 create mode 100644 pbs-s3-client/src/response_reader.rs

diff --git a/Cargo.toml b/Cargo.toml
index 3f51b356c..229ba1692 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -140,11 +140,13 @@ once_cell = "1.3.1"
 openssl = "0.10.40"
 percent-encoding = "2.1"
 pin-project-lite = "0.2"
+quick-xml = "0.26"
 regex = "1.5.5"
 rustyline = "9"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 serde_plain = "1.0"
+serde-xml-rs = "0.5"
 siphasher = "0.3"
 syslog = "6"
 tar = "0.4"
diff --git a/pbs-s3-client/Cargo.toml b/pbs-s3-client/Cargo.toml
index 9ee546200..6fd0f7cca 100644
--- a/pbs-s3-client/Cargo.toml
+++ b/pbs-s3-client/Cargo.toml
@@ -8,11 +8,19 @@ rust-version.workspace = true
 
 [dependencies]
 anyhow.workspace = true
+base64.workspace = true
+bytes.workspace = true
+crc32fast.workspace = true
+futures.workspace = true
 hex = { workspace = true, features = [ "serde" ] }
 hyper.workspace = true
 openssl.workspace = true
+quick-xml  = { workspace = true, features = ["async-tokio"] }
 serde.workspace = true
 serde_plain.workspace = true
+serde-xml-rs.workspace = true
+tokio = { workspace = true, features = [] }
+tokio-util = { workspace = true, features = ["compat"] }
 tracing.workspace = true
 url.workspace = true
 
diff --git a/pbs-s3-client/src/client.rs b/pbs-s3-client/src/client.rs
index e001cc7b0..972578e66 100644
--- a/pbs-s3-client/src/client.rs
+++ b/pbs-s3-client/src/client.rs
@@ -1,17 +1,36 @@
+use std::collections::HashMap;
+use std::io::Cursor;
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Context, Error};
+use bytes::{Bytes, BytesMut};
+use hyper::body::HttpBody;
 use hyper::client::{Client, HttpConnector};
+use hyper::http::method::Method;
 use hyper::http::uri::Authority;
+use hyper::http::StatusCode;
+use hyper::http::{header, HeaderValue, Uri};
 use hyper::Body;
+use hyper::{Request, Response};
 use openssl::hash::MessageDigest;
+use openssl::sha::Sha256;
 use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
 use openssl::x509::X509StoreContextRef;
+use quick_xml::events::BytesText;
+use quick_xml::writer::Writer;
 use tracing::error;
 
 use proxmox_http::client::HttpsConnector;
 
+use crate::aws_sign_v4::aws_sign_v4_signature;
+use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT;
+use crate::object_key::S3ObjectKey;
+use crate::response_reader::{
+    CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse,
+    ListObjectsV2Response, PutObjectResponse, ResponseReader,
+};
+
 const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
 const S3_TCP_KEEPALIVE_TIME: u32 = 120;
 
@@ -128,4 +147,355 @@ impl S3Client {
             "unexpected certificate fingerprint {certificate_fingerprint}"
         ))
     }
+
+    async fn prepare(&self, mut request: Request<Body>) -> Result<Request<Body>, Error> {
+        let host_header = request
+            .uri()
+            .authority()
+            .ok_or_else(|| format_err!("request missing authority"))?
+            .to_string();
+
+        // Calculate the crc32 sum of the whole body, while the DataBlob of a chunk store does skip
+        // over the DataBlob header, so that must be considered when using this to check for
+        // changes/interity.
+        let mut crc32sum = crc32fast::Hasher::new();
+        // Content verification for aws s3 signature
+        let mut hasher = Sha256::new();
+        // Load payload into memory, needed as the hash and checksum have to be calculated a-priori
+        let buffer: Bytes = {
+            let body = request.body_mut();
+            let mut buf = BytesMut::with_capacity(body.size_hint().lower() as usize);
+            while let Some(chunk) = body.data().await {
+                let chunk = chunk?;
+                hasher.update(&chunk);
+                crc32sum.update(&chunk);
+                buf.extend_from_slice(&chunk);
+            }
+            buf.freeze()
+        };
+        let payload_digest = hex::encode(hasher.finish());
+        let payload_crc32sum = base64::encode(crc32sum.finalize().to_be_bytes());
+        let payload_len = buffer.len();
+        *request.body_mut() = Body::from(buffer);
+
+        let epoch = proxmox_time::epoch_i64();
+        let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?;
+
+        request
+            .headers_mut()
+            .insert("x-amz-date", HeaderValue::from_str(&datetime)?);
+        request
+            .headers_mut()
+            .insert("host", HeaderValue::from_str(&host_header)?);
+        request.headers_mut().insert(
+            "x-amz-content-sha256",
+            HeaderValue::from_str(&payload_digest)?,
+        );
+        if payload_len > 0 {
+            request.headers_mut().insert(
+                header::CONTENT_LENGTH,
+                HeaderValue::from_str(&payload_len.to_string())?,
+            );
+        }
+        if !payload_crc32sum.is_empty() {
+            request.headers_mut().insert(
+                "x-amz-checksum-crc32",
+                HeaderValue::from_str(&payload_crc32sum)?,
+            );
+        }
+
+        let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?;
+
+        request
+            .headers_mut()
+            .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?);
+
+        Ok(request)
+    }
+
+    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
+        let request = self.prepare(request).await?;
+        let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request))
+            .await
+            .context("request timeout")??;
+        Ok(response)
+    }
+
+    /// Check if bucket exists and got permissions to access it.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
+    pub async fn head_bucket(&self) -> Result<(), Error> {
+        let request = Request::builder()
+            .method(Method::HEAD)
+            .uri(self.uri_builder("/")?)
+            .body(Body::empty())?;
+        let response = self.send(request).await?;
+        let (parts, _body) = response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => {
+                bail!("bucket does not exist or no permission to access it")
+            }
+            status_code => bail!("unexpected status code {status_code}"),
+        }
+
+        Ok(())
+    }
+
+    /// Fetch metadata from an object without returning the object itself.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
+    pub async fn head_object(
+        &self,
+        object_key: S3ObjectKey,
+    ) -> Result<Option<HeadObjectResponse>, Error> {
+        let request = Request::builder()
+            .method(Method::HEAD)
+            .uri(self.uri_builder(&object_key)?)
+            .body(Body::empty())?;
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.head_object_response().await
+    }
+
+    /// Fetch an object from object store.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+    pub async fn get_object(
+        &self,
+        object_key: S3ObjectKey,
+    ) -> Result<Option<GetObjectResponse>, Error> {
+        let request = Request::builder()
+            .method(Method::GET)
+            .uri(self.uri_builder(&object_key)?)
+            .body(Body::empty())?;
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.get_object_response().await
+    }
+
+    /// Returns some or all (up to 1,000) of the objects in a bucket with each request.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+    pub async fn list_objects_v2(
+        &self,
+        prefix: Option<&str>,
+        max_keys: Option<u64>,
+        continuation_token: Option<&str>,
+    ) -> Result<ListObjectsV2Response, Error> {
+        let mut path_and_query = String::from("/?list-type=2");
+        if let Some(prefix) = prefix {
+            path_and_query.push_str("&prefix=");
+            path_and_query.push_str(prefix);
+        }
+        if let Some(max_keys) = max_keys {
+            path_and_query.push_str("&max-keys=");
+            path_and_query.push_str(&max_keys.to_string());
+        }
+        if let Some(token) = continuation_token {
+            path_and_query.push_str("&continuation-token=");
+            path_and_query.push_str(token);
+        }
+        let request = Request::builder()
+            .method(Method::GET)
+            .uri(self.uri_builder(&path_and_query)?)
+            .body(Body::empty())?;
+
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.list_objects_v2_response().await
+    }
+
+    /// Add a new object to a bucket.
+    ///
+    /// Do not reupload if an object with matching key already exists in the bucket.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
+    pub async fn put_object(
+        &self,
+        object_key: S3ObjectKey,
+        object_data: Body,
+    ) -> Result<PutObjectResponse, Error> {
+        // Assure data integrity after upload by providing a trailing checksum header. This value
+        // can also be used to compare a local chunk content against an object stored in S3.
+        // https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
+        let request = Request::builder()
+            .method(Method::PUT)
+            .uri(self.uri_builder(&object_key)?)
+            .header(header::CONTENT_TYPE, "binary/octet")
+            // Never overwrite pre-existing objects with the same key.
+            //.header(header::IF_NONE_MATCH, "*")
+            .body(object_data)?;
+
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.put_object_response().await
+    }
+
+    /// Sets the supplied tag-set to an object that already exists in a bucket. A tag is a key-value pair.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
+    pub async fn put_object_tagging(
+        &self,
+        object_key: S3ObjectKey,
+        tagset: &HashMap<String, String>,
+    ) -> Result<bool, Error> {
+        let mut writer = Writer::new(Cursor::new(Vec::new()));
+        writer
+            .create_element("Tagging")
+            .with_attribute(("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/"))
+            .write_inner_content(|writer| {
+                writer
+                    .create_element("TagSet")
+                    .write_inner_content(|writer| {
+                        for (key, value) in tagset.iter() {
+                            writer.create_element("Tag").write_inner_content(|writer| {
+                                writer
+                                    .create_element("Key")
+                                    .write_text_content(BytesText::new(key))?;
+                                writer
+                                    .create_element("Value")
+                                    .write_text_content(BytesText::new(value))?;
+                                Ok(())
+                            })?;
+                        }
+                        Ok(())
+                    })?;
+                Ok(())
+            })?;
+
+        let body: Body = writer.into_inner().into_inner().into();
+        let request = Request::builder()
+            .method(Method::PUT)
+            .uri(self.uri_builder(&format!("{object_key}?tagging"))?)
+            .body(body)?;
+
+        let response = self.send(request).await?;
+        Ok(response.status().is_success())
+    }
+
+    /// Sets the supplied tag to an object that already exists in a bucket. A tag is a key-value pair.
+    /// Optimized version of the `put_object_tagging` to only set a single tag.
+    pub async fn put_object_tag(
+        &self,
+        object_key: S3ObjectKey,
+        tag_key: &str,
+        tag_value: &str,
+    ) -> Result<bool, Error> {
+        let body: Body = format!(
+            r#"<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+                <TagSet>
+                    <Tag>
+                        <Key>{tag_key}</Key>
+                        <Value>{tag_value}</Value>
+                    </Tag>
+                </TagSet>
+            </Tagging>"#
+        )
+        .into();
+
+        let request = Request::builder()
+            .method(Method::PUT)
+            .uri(self.uri_builder(&format!("{object_key}?tagging"))?)
+            .body(body)?;
+
+        let response = self.send(request).await?;
+        //TODO: Response and error handling!
+        Ok(response.status().is_success())
+    }
+
+    /// Creates a copy of an object that is already stored in Amazon S3.
+    /// Uses the `x-amz-metadata-directive` set to `REPLACE`, therefore resulting in updated metadata.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
+    pub async fn copy_object(
+        &self,
+        destination_key: S3ObjectKey,
+        source_bucket: &str,
+        source_key: S3ObjectKey,
+    ) -> Result<CopyObjectResponse, Error> {
+        let copy_source = source_key.to_copy_source_key(source_bucket);
+        let request = Request::builder()
+            .method(Method::PUT)
+            .uri(self.uri_builder(&destination_key)?)
+            .header("x-amz-copy-source", HeaderValue::from_str(&copy_source)?)
+            .header(
+                "x-amz-metadata-directive",
+                HeaderValue::from_str("REPLACE")?,
+            )
+            .body(Body::empty())?;
+
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.copy_object_response().await
+    }
+
+    /// Helper to update the metadata for an object by copying it to itself. This will not cause
+    /// any additional costs other than the request cost itself.
+    ///
+    /// Note: This will actually create a new object for buckets with versioning enabled.
+    /// Return with error if that is the case, detected by checking the presence of the
+    /// `x-amz-version-id` header in the response.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
+    pub async fn update_object_metadata(
+        &self,
+        object_key: S3ObjectKey,
+    ) -> Result<CopyObjectResponse, Error> {
+        let response = self
+            .copy_object(object_key.clone(), &self.options.bucket, object_key)
+            .await?;
+        if response.x_amz_version_id.is_some() {
+            // Return an error if the response contains an `x-amz-version-id`, indicating that the
+            // bucket has versioning enabled, as that will bloat the bucket size and therefore cost.
+            bail!("Failed to update object metadata as versioning is enabled");
+        }
+        Ok(response)
+    }
+
+    /// Delete multiple objects from a bucket using a single HTTP request.
+    /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
+    pub async fn delete_objects(
+        &self,
+        object_keys: &[String],
+    ) -> Result<DeleteObjectsResponse, Error> {
+        let mut body = String::from(r#"<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#);
+        for object_key in object_keys {
+            let object = format!("<Object><Key>{object_key}</Key></Object>");
+            body.push_str(&object);
+        }
+        body.push_str("</Delete>");
+        let request = Request::builder()
+            .method(Method::POST)
+            .uri(self.uri_builder("/?delete")?)
+            .body(Body::from(body))?;
+
+        let response = self.send(request).await?;
+        let response_reader = ResponseReader::new(response);
+        response_reader.delete_objects_response().await
+    }
+
+    /// Delete objects by given key prefix.
+    /// Requires at least 2 api calls.
+    pub async fn delete_objects_by_prefix(
+        &self,
+        prefix: &str,
+    ) -> Result<DeleteObjectsResponse, Error> {
+        // S3 API does not provide a convenient way to delete objects by key prefix.
+        // List all objects with given group prefix and delete all objects found, so this
+        // requires at least 2 API calls.
+        // TODO: fix for more than 1000 response items given by api limit.
+        let list_objects_result = self.list_objects_v2(Some(prefix), None, None).await?;
+        let objects_to_delete: Vec<String> = list_objects_result
+            .contents
+            .into_iter()
+            .map(|item| item.key)
+            .collect();
+        self.delete_objects(&objects_to_delete).await
+    }
+
+    #[inline(always)]
+    /// Helper to generate [`Uri`] instance with common properties based on given path and query
+    /// string
+    fn uri_builder(&self, path_and_query: &str) -> Result<Uri, Error> {
+        Uri::builder()
+            .scheme("https")
+            .authority(self.authority.clone())
+            .path_and_query(path_and_query)
+            .build()
+            .context("failed to build uri")
+    }
 }
diff --git a/pbs-s3-client/src/lib.rs b/pbs-s3-client/src/lib.rs
index 00fa26455..7cc0ea841 100644
--- a/pbs-s3-client/src/lib.rs
+++ b/pbs-s3-client/src/lib.rs
@@ -3,6 +3,8 @@ mod client;
 pub use client::{S3Client, S3ClientOptions};
 mod object_key;
 pub use object_key::{S3ObjectKey, S3_CONTENT_PREFIX};
+mod response_reader;
+pub use response_reader::PutObjectResponse;
 
 use std::time::Duration;
 
diff --git a/pbs-s3-client/src/response_reader.rs b/pbs-s3-client/src/response_reader.rs
new file mode 100644
index 000000000..ed82d77b9
--- /dev/null
+++ b/pbs-s3-client/src/response_reader.rs
@@ -0,0 +1,324 @@
+use std::str::FromStr;
+
+use anyhow::{anyhow, bail, Context, Error};
+use hyper::body::HttpBody;
+use hyper::header::HeaderName;
+use hyper::http::header;
+use hyper::http::StatusCode;
+use hyper::{Body, HeaderMap, Response};
+use serde::Deserialize;
+
+use crate::{HttpDate, LastModifiedTimestamp};
+
+pub(crate) struct ResponseReader {
+    response: Response<Body>,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsV2Response {
+    pub date: HttpDate,
+    pub name: String,
+    pub prefix: String,
+    pub key_count: u64,
+    pub max_keys: u64,
+    pub is_truncated: bool,
+    pub continuation_token: Option<String>,
+    pub next_continuation_token: Option<String>,
+    pub contents: Vec<ListObjectsV2Contents>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct ListObjectsV2ResponseBody {
+    pub name: String,
+    pub prefix: String,
+    pub key_count: u64,
+    pub max_keys: u64,
+    pub is_truncated: bool,
+    pub continuation_token: Option<String>,
+    pub next_continuation_token: Option<String>,
+    pub contents: Option<Vec<ListObjectsV2Contents>>,
+}
+
+impl ListObjectsV2ResponseBody {
+    fn with_date(self, date: HttpDate) -> ListObjectsV2Response {
+        ListObjectsV2Response {
+            date,
+            name: self.name,
+            prefix: self.prefix,
+            key_count: self.key_count,
+            max_keys: self.max_keys,
+            is_truncated: self.is_truncated,
+            continuation_token: self.continuation_token,
+            next_continuation_token: self.next_continuation_token,
+            contents: self.contents.unwrap_or_else(|| Vec::new()),
+        }
+    }
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectsV2Contents {
+    pub key: String,
+    pub last_modified: LastModifiedTimestamp,
+    pub e_tag: String,
+    pub size: u64,
+    pub storage_class: String,
+}
+
+#[derive(Debug)]
+/// Subset of the head object response (headers only, there is no body)
+/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
+pub struct HeadObjectResponse {
+    pub content_length: u64,
+    pub content_type: String,
+    pub date: HttpDate,
+    pub e_tag: String,
+    pub last_modified: HttpDate,
+}
+
+#[derive(Debug)]
+/// Subset of the get object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax
+pub struct GetObjectResponse {
+    pub content_length: u64,
+    pub content_type: String,
+    pub date: HttpDate,
+    pub e_tag: String,
+    pub last_modified: HttpDate,
+    pub content: Body,
+}
+
+#[derive(Debug)]
+pub struct CopyObjectResponse {
+    pub copy_object_result: CopyObjectResult,
+    pub x_amz_version_id: Option<String>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct CopyObjectResult {
+    pub e_tag: String,
+    pub last_modified: LastModifiedTimestamp,
+}
+
+/// Subset of the put object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax
+#[derive(Debug)]
+pub enum PutObjectResponse {
+    NeedsRetry,
+    PreconditionFailed,
+    Success(String),
+}
+
+/// Subset of the delete objects response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectsResponse {
+    pub deleted: Option<Vec<DeletedObject>>,
+    pub error: Option<Vec<DeleteObjectError>>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeletedObject {
+    pub delete_marker: Option<bool>,
+    pub delete_marker_version_id: Option<String>,
+    pub key: Option<String>,
+    pub version_id: Option<String>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectError {
+    pub code: Option<String>,
+    pub key: Option<String>,
+    pub message: Option<String>,
+    pub version_id: Option<String>,
+}
+
+impl ResponseReader {
+    pub(crate) fn new(response: Response<Body>) -> Self {
+        Self { response }
+    }
+
+    pub(crate) async fn list_objects_v2_response(self) -> Result<ListObjectsV2Response, Error> {
+        let (parts, body) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::NOT_FOUND => bail!("bucket does not exist"),
+            status_code => bail!("unexpected status code {status_code}"),
+        }
+
+        let body = body.collect().await?.to_bytes();
+        let body = String::from_utf8(body.to_vec())?;
+
+        let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?;
+
+        let response: ListObjectsV2ResponseBody =
+            serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+        Ok(response.with_date(date))
+    }
+
+    pub(crate) async fn head_object_response(self) -> Result<Option<HeadObjectResponse>, Error> {
+        let (parts, body) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::NOT_FOUND => return Ok(None),
+            status_code => bail!("unexpected status code {status_code}"),
+        }
+        let body = body.collect().await?.to_bytes();
+        if !body.is_empty() {
+            bail!("got unexpected non-empty response body");
+        }
+
+        let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+        let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+        let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+        let date = Self::parse_header(header::DATE, &parts.headers)?;
+        let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+        Ok(Some(HeadObjectResponse {
+            content_length,
+            content_type,
+            date,
+            e_tag,
+            last_modified,
+        }))
+    }
+
+    pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
+        let (parts, content) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::NOT_FOUND => return Ok(None),
+            StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"),
+            status_code => bail!("unexpected status code {status_code}"),
+        }
+
+        let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+        let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+        let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+        let date = Self::parse_header(header::DATE, &parts.headers)?;
+        let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+        Ok(Some(GetObjectResponse {
+            content_length,
+            content_type,
+            date,
+            e_tag,
+            last_modified,
+            content,
+        }))
+    }
+
+    pub(crate) async fn copy_object_response(self) -> Result<CopyObjectResponse, Error> {
+        let (parts, content) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::NOT_FOUND => bail!("object not found"),
+            StatusCode::FORBIDDEN => bail!("the source object is not in the active tier"),
+            status_code => bail!("unexpected status code {status_code}"),
+        }
+
+        let body = content.collect().await?.to_bytes();
+        let body = String::from_utf8(body.to_vec())?;
+
+        let x_amz_version_id = match parts.headers.get("x-amz-version-id") {
+            Some(version_id) => Some(
+                version_id
+                    .to_str()
+                    .context("failed to parse version id header")?
+                    .to_owned(),
+            ),
+            None => None,
+        };
+
+        let copy_object_result: CopyObjectResult =
+            serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+        Ok(CopyObjectResponse {
+            copy_object_result,
+            x_amz_version_id,
+        })
+    }
+
+    pub(crate) async fn put_object_response(self) -> Result<PutObjectResponse, Error> {
+        let (parts, body) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            // If-None-Match precondition failed, an object with same key already present.
+            // FIXME: Should this be dropped in favor of re-uploading and rely on the local
+            // cache to detect duplicates to increase data safety guarantees?
+            StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed),
+            StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry),
+            StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+            status_code => bail!("unexpected status code {status_code}"),
+        };
+
+        let body = body.collect().await?.to_bytes();
+        if !body.is_empty() {
+            bail!("got unexpected non-empty response body");
+        }
+
+        let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+
+        Ok(PutObjectResponse::Success(e_tag))
+    }
+
+    pub(crate) async fn delete_objects_response(self) -> Result<DeleteObjectsResponse, Error> {
+        let (parts, body) = self.response.into_parts();
+
+        match parts.status {
+            StatusCode::OK => (),
+            StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+            status_code => bail!("unexpected status code {status_code}"),
+        };
+
+        let body = body.collect().await?.to_bytes();
+        let body = String::from_utf8(body.to_vec())?;
+
+        let delete_objects_response: DeleteObjectsResponse =
+            serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+        Ok(delete_objects_response)
+    }
+
+    fn parse_header<T: FromStr>(name: HeaderName, headers: &HeaderMap) -> Result<T, Error>
+    where
+        <T as FromStr>::Err: Send + Sync + 'static,
+        Result<T, <T as FromStr>::Err>: Context<T, <T as FromStr>::Err>,
+    {
+        let header_value = headers
+            .get(&name)
+            .ok_or_else(|| anyhow!("missing header '{name}'"))?;
+        let header_str = header_value
+            .to_str()
+            .with_context(|| format!("non UTF-8 header '{name}'"))?;
+        let value = header_str
+            .parse()
+            .with_context(|| format!("failed to parse header '{name}'"))?;
+        Ok(value)
+    }
+
+    fn parse_x_amz_checksum_crc32_header(headers: &HeaderMap) -> Result<u32, Error> {
+        let x_amz_checksum_crc32 = headers
+            .get("x-amz-checksum-crc32")
+            .ok_or_else(|| anyhow!("missing header 'x-amz-checksum-crc32'"))?;
+        let x_amz_checksum_crc32 = base64::decode(x_amz_checksum_crc32.to_str()?)?;
+        let x_amz_checksum_crc32: [u8; 4] = x_amz_checksum_crc32
+            .try_into()
+            .map_err(|_e| anyhow!("failed to convert x-amz-checksum-crc32 header"))?;
+        let x_amz_checksum_crc32 = u32::from_be_bytes(x_amz_checksum_crc32);
+        Ok(x_amz_checksum_crc32)
+    }
+}
-- 
2.39.5





More information about the pbs-devel mailing list