[pbs-devel] [RFC v2 proxmox-backup 12/42] s3 client: implement methods to operate on s3 objects in bucket
Christian Ebner
c.ebner at proxmox.com
Thu May 29 16:31:37 CEST 2025
Adds the basic implementation of the client to use s3 object stores
as backend for PBS datastores.
This implements the basic client actions on a bucket and objects
stored within given bucket.
This is not feature complete and intended to be extended on a
per-demand fashion rather than implementing the whole client at once.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Cargo.toml | 3 +
pbs-s3-client/Cargo.toml | 8 +
pbs-s3-client/src/client.rs | 463 +++++++++++++++++++++++++++
pbs-s3-client/src/lib.rs | 2 +
pbs-s3-client/src/response_reader.rs | 343 ++++++++++++++++++++
5 files changed, 819 insertions(+)
create mode 100644 pbs-s3-client/src/response_reader.rs
diff --git a/Cargo.toml b/Cargo.toml
index aaa79c2aa..1bc3bb88b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -137,15 +137,18 @@ log = "0.4.17"
nix = "0.26.1"
nom = "7"
num-traits = "0.2"
+md5 = "0.7.0"
once_cell = "1.3.1"
openssl = "0.10.40"
percent-encoding = "2.1"
pin-project-lite = "0.2"
+quick-xml = "0.26"
regex = "1.5.5"
rustyline = "9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_plain = "1.0"
+serde-xml-rs = "0.5"
siphasher = "0.3"
syslog = "6"
tar = "0.4"
diff --git a/pbs-s3-client/Cargo.toml b/pbs-s3-client/Cargo.toml
index 3261a32bb..d20cd8f38 100644
--- a/pbs-s3-client/Cargo.toml
+++ b/pbs-s3-client/Cargo.toml
@@ -8,12 +8,20 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
+base64.workspace = true
+bytes.workspace = true
+futures.workspace = true
hex = { workspace = true, features = [ "serde" ] }
hyper.workspace = true
iso8601.workspace = true
+md5.workspace = true
openssl.workspace = true
+quick-xml = { workspace = true, features = ["async-tokio"] }
serde.workspace = true
serde_plain.workspace = true
+serde-xml-rs.workspace = true
+tokio = { workspace = true, features = [] }
+tokio-util = { workspace = true, features = ["compat"] }
tracing.workspace = true
url.workspace = true
diff --git a/pbs-s3-client/src/client.rs b/pbs-s3-client/src/client.rs
index e001cc7b0..b7ca4e298 100644
--- a/pbs-s3-client/src/client.rs
+++ b/pbs-s3-client/src/client.rs
@@ -1,17 +1,36 @@
+use std::collections::HashMap;
+use std::io::Cursor;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
+use bytes::{Bytes, BytesMut};
+use hyper::body::HttpBody;
use hyper::client::{Client, HttpConnector};
+use hyper::http::method::Method;
use hyper::http::uri::Authority;
+use hyper::http::StatusCode;
+use hyper::http::{header, HeaderValue, Uri};
use hyper::Body;
+use hyper::{Request, Response};
use openssl::hash::MessageDigest;
+use openssl::sha::Sha256;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509StoreContextRef;
+use quick_xml::events::BytesText;
+use quick_xml::writer::Writer;
use tracing::error;
use proxmox_http::client::HttpsConnector;
+use crate::aws_sign_v4::aws_sign_v4_signature;
+use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT;
+use crate::object_key::S3ObjectKey;
+use crate::response_reader::{
+ CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse,
+ ListObjectsV2Response, PutObjectResponse, ResponseReader,
+};
+
const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const S3_TCP_KEEPALIVE_TIME: u32 = 120;
@@ -128,4 +147,448 @@ impl S3Client {
"unexpected certificate fingerprint {certificate_fingerprint}"
))
}
+
+ async fn prepare(&self, mut request: Request<Body>) -> Result<Request<Body>, Error> {
+ let host_header = request
+ .uri()
+ .authority()
+ .ok_or_else(|| format_err!("request missing authority"))?
+ .to_string();
+
+ // Content verification for aws s3 signature
+ let mut hasher = Sha256::new();
+ // Load payload into memory, needed as the hash and checksum have to be calculated a-priori
+ let buffer: Bytes = {
+ let body = request.body_mut();
+ let mut buf = BytesMut::with_capacity(body.size_hint().lower() as usize);
+ while let Some(chunk) = body.data().await {
+ let chunk = chunk?;
+ hasher.update(&chunk);
+ buf.extend_from_slice(&chunk);
+ }
+ buf.freeze()
+ };
+ // Use MD5 as upload integrity check, as other methods are not supported by all S3 object
+ // store providers and might be ignored and this is recommended by AWS as described in
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
+ let payload_md5 = md5::compute(&buffer);
+ let payload_digest = hex::encode(hasher.finish());
+ let payload_len = buffer.len();
+ *request.body_mut() = Body::from(buffer);
+
+ let epoch = proxmox_time::epoch_i64();
+ let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?;
+
+ request
+ .headers_mut()
+ .insert("x-amz-date", HeaderValue::from_str(&datetime)?);
+ request
+ .headers_mut()
+ .insert("host", HeaderValue::from_str(&host_header)?);
+ request.headers_mut().insert(
+ "x-amz-content-sha256",
+ HeaderValue::from_str(&payload_digest)?,
+ );
+ request.headers_mut().insert(
+ header::CONTENT_LENGTH,
+ HeaderValue::from_str(&payload_len.to_string())?,
+ );
+ if payload_len > 0 {
+ let md5_digest = base64::encode(*payload_md5);
+ request
+ .headers_mut()
+ .insert("Content-MD5", HeaderValue::from_str(&md5_digest)?);
+ }
+
+ let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?;
+
+ request
+ .headers_mut()
+ .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?);
+
+ Ok(request)
+ }
+
+ pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
+ let request = self.prepare(request).await?;
+ let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request))
+ .await
+ .context("request timeout")??;
+ Ok(response)
+ }
+
+ /// Check if bucket exists and got permissions to access it.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
+ pub async fn head_bucket(&self) -> Result<(), Error> {
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.uri_builder("/")?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let (parts, _body) = response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => {
+ bail!("bucket does not exist or no permission to access it")
+ }
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ Ok(())
+ }
+
+ /// Fetch metadata from an object without returning the object itself.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
+ pub async fn head_object(
+ &self,
+ object_key: S3ObjectKey,
+ ) -> Result<Option<HeadObjectResponse>, Error> {
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.uri_builder(&object_key)?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.head_object_response().await
+ }
+
+ /// Fetch an object from object store.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+ pub async fn get_object(
+ &self,
+ object_key: S3ObjectKey,
+ ) -> Result<Option<GetObjectResponse>, Error> {
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.uri_builder(&object_key)?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.get_object_response().await
+ }
+
+ /// Returns some or all (up to 1,000) of the objects in a bucket with each request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+ pub async fn list_objects_v2(
+ &self,
+ prefix: Option<&str>,
+ max_keys: Option<u64>,
+ continuation_token: Option<&str>,
+ ) -> Result<ListObjectsV2Response, Error> {
+ let mut path_and_query = String::from("/?list-type=2");
+ if let Some(prefix) = prefix {
+ path_and_query.push_str("&prefix=");
+ path_and_query.push_str(prefix);
+ }
+ if let Some(max_keys) = max_keys {
+ path_and_query.push_str("&max-keys=");
+ path_and_query.push_str(&max_keys.to_string());
+ }
+ if let Some(token) = continuation_token {
+ path_and_query.push_str("&continuation-token=");
+ path_and_query.push_str(token);
+ }
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.uri_builder(&path_and_query)?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.list_objects_v2_response().await
+ }
+
+ /// Add a new object to a bucket.
+ ///
+ /// Do not reupload if an object with matching key already exists in the bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
+ pub async fn put_object(
+ &self,
+ object_key: S3ObjectKey,
+ object_data: Body,
+ ) -> Result<PutObjectResponse, Error> {
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.uri_builder(&object_key)?)
+ .header(header::CONTENT_TYPE, "binary/octet")
+ // Never overwrite pre-existing objects with the same key.
+ //.header(header::IF_NONE_MATCH, "*")
+ .body(object_data)?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.put_object_response().await
+ }
+
+ /// Sets the supplied tag-set to an object that already exists in a bucket. A tag is a key-value pair.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
+ pub async fn put_object_tagging(
+ &self,
+ object_key: S3ObjectKey,
+ tagset: &HashMap<String, String>,
+ ) -> Result<bool, Error> {
+ let mut writer = Writer::new(Cursor::new(Vec::new()));
+ writer
+ .create_element("Tagging")
+ .with_attribute(("xmlns", "http://s3.amazonaws.com/doc/2006-03-01/"))
+ .write_inner_content(|writer| {
+ writer
+ .create_element("TagSet")
+ .write_inner_content(|writer| {
+ for (key, value) in tagset.iter() {
+ writer.create_element("Tag").write_inner_content(|writer| {
+ writer
+ .create_element("Key")
+ .write_text_content(BytesText::new(key))?;
+ writer
+ .create_element("Value")
+ .write_text_content(BytesText::new(value))?;
+ Ok(())
+ })?;
+ }
+ Ok(())
+ })?;
+ Ok(())
+ })?;
+
+ let body: Body = writer.into_inner().into_inner().into();
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.uri_builder(&format!("{object_key}?tagging"))?)
+ .body(body)?;
+
+ let response = self.send(request).await?;
+ Ok(response.status().is_success())
+ }
+
+ /// Sets the supplied tag to an object that already exists in a bucket. A tag is a key-value pair.
+ /// Optimized version of the `put_object_tagging` to only set a single tag.
+ pub async fn put_object_tag(
+ &self,
+ object_key: S3ObjectKey,
+ tag_key: &str,
+ tag_value: &str,
+ ) -> Result<bool, Error> {
+ let body: Body = format!(
+ r#"<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <TagSet>
+ <Tag>
+ <Key>{tag_key}</Key>
+ <Value>{tag_value}</Value>
+ </Tag>
+ </TagSet>
+ </Tagging>"#
+ )
+ .into();
+
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.uri_builder(&format!("{object_key}?tagging"))?)
+ .body(body)?;
+
+ let response = self.send(request).await?;
+ //TODO: Response and error handling!
+ Ok(response.status().is_success())
+ }
+
+ /// Creates a copy of an object that is already stored in Amazon S3.
+ /// Uses the `x-amz-metadata-directive` set to `REPLACE`, therefore resulting in updated metadata.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
+ pub async fn copy_object(
+ &self,
+ destination_key: S3ObjectKey,
+ source_bucket: &str,
+ source_key: S3ObjectKey,
+ ) -> Result<CopyObjectResponse, Error> {
+ let copy_source = source_key.to_copy_source_key(source_bucket);
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.uri_builder(&destination_key)?)
+ .header("x-amz-copy-source", HeaderValue::from_str(©_source)?)
+ .header(
+ "x-amz-metadata-directive",
+ HeaderValue::from_str("REPLACE")?,
+ )
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.copy_object_response().await
+ }
+
+ /// Helper to update the metadata for an object by copying it to itself. This will not cause
+ /// any additional costs other than the request cost itself.
+ ///
+ /// Note: This will actually create a new object for buckets with versioning enabled.
+ /// Return with error if that is the case, detected by checking the presence of the
+ /// `x-amz-version-id` header in the response.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
+ pub async fn update_object_metadata(
+ &self,
+ object_key: S3ObjectKey,
+ ) -> Result<CopyObjectResponse, Error> {
+ let response = self
+ .copy_object(object_key.clone(), &self.options.bucket, object_key)
+ .await?;
+ if response.x_amz_version_id.is_some() {
+ // Return an error if the response contains an `x-amz-version-id`, indicating that the
+ // bucket has versioning enabled, as that will bloat the bucket size and therefore cost.
+ bail!("Failed to update object metadata as versioning is enabled");
+ }
+ Ok(response)
+ }
+
+ /// Removes an object from a bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
+ pub async fn delete_object(&self, object_key: S3ObjectKey) -> Result<(), Error> {
+ let request = Request::builder()
+ .method(Method::DELETE)
+ .uri(self.uri_builder(&object_key)?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_object_response().await
+ }
+
+ /// Delete multiple objects from a bucket using a single HTTP request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
+ pub async fn delete_objects(
+ &self,
+ object_keys: &[String],
+ ) -> Result<DeleteObjectsResponse, Error> {
+ let mut body = String::from(r#"<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#);
+ for object_key in object_keys {
+ let object = format!("<Object><Key>{object_key}</Key></Object>");
+ body.push_str(&object);
+ }
+ body.push_str("</Delete>");
+ let request = Request::builder()
+ .method(Method::POST)
+ .uri(self.uri_builder("/?delete")?)
+ .body(Body::from(body))?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_objects_response().await
+ }
+
+ /// Delete objects by given key prefix.
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix(&self, prefix: &str) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(Some(prefix), None, next_continuation_token.as_deref())
+ .await?;
+ let objects_to_delete: Vec<String> = list_objects_result
+ .contents
+ .into_iter()
+ .map(|item| item.key)
+ .collect();
+ let response = self.delete_objects(&objects_to_delete).await?;
+ if response.error.is_some() {
+ delete_errors = true;
+ }
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+ Ok(delete_errors)
+ }
+
+ /// Delete objects by given key prefix, but exclude items pre-filter based on suffix
+ /// (including the parent component of the matched suffix). E.g. do not remove items in a
+ /// snapshot directory, by matching based on the protected file marker (given as suffix).
+ ///
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix_with_suffix_filter(
+ &self,
+ prefix: &str,
+ suffix: &str,
+ ) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ let mut prefix_filters = Vec::new();
+ let mut list_objects = Vec::new();
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(Some(prefix), None, next_continuation_token.as_deref())
+ .await?;
+ let mut prefixes: Vec<String> = list_objects_result
+ .contents
+ .iter()
+ .filter_map(|item| {
+ let prefix_filter = item
+ .key
+ .strip_suffix(suffix)
+ .map(|prefix| prefix.to_string());
+ if prefix_filter.is_none() {
+ list_objects.push(item.key.clone());
+ }
+ prefix_filter
+ })
+ .collect();
+ prefix_filters.append(&mut prefixes);
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+
+ // Re-filter in case the 1000 items per request boundary lead to the prefix not being
+ // filtered for some items
+ let objects_to_delete: Vec<String> = list_objects
+ .into_iter()
+ .filter_map(|item| {
+ for prefix in &prefix_filters {
+ if item.strip_prefix(prefix).is_some() {
+ return None;
+ }
+ }
+ Some(item)
+ })
+ .collect();
+
+ for objects in objects_to_delete.chunks(1000) {
+ let result = self.delete_objects(objects).await?;
+ if result.error.is_some() {
+ delete_errors = true;
+ }
+ }
+
+ Ok(delete_errors)
+ }
+
+ #[inline(always)]
+ /// Helper to generate [`Uri`] instance with common properties based on given path and query
+ /// string
+ fn uri_builder(&self, path_and_query: &str) -> Result<Uri, Error> {
+ Uri::builder()
+ .scheme("https")
+ .authority(self.authority.clone())
+ .path_and_query(path_and_query)
+ .build()
+ .context("failed to build uri")
+ }
}
diff --git a/pbs-s3-client/src/lib.rs b/pbs-s3-client/src/lib.rs
index b3e539bdd..b4e7eb497 100644
--- a/pbs-s3-client/src/lib.rs
+++ b/pbs-s3-client/src/lib.rs
@@ -3,6 +3,8 @@ mod client;
pub use client::{S3Client, S3ClientOptions};
mod object_key;
pub use object_key::{S3ObjectKey, S3_CONTENT_PREFIX};
+mod response_reader;
+pub use response_reader::PutObjectResponse;
use std::time::Duration;
diff --git a/pbs-s3-client/src/response_reader.rs b/pbs-s3-client/src/response_reader.rs
new file mode 100644
index 000000000..8c553ce8f
--- /dev/null
+++ b/pbs-s3-client/src/response_reader.rs
@@ -0,0 +1,343 @@
+use std::str::FromStr;
+
+use anyhow::{anyhow, bail, Context, Error};
+use hyper::body::HttpBody;
+use hyper::header::HeaderName;
+use hyper::http::header;
+use hyper::http::StatusCode;
+use hyper::{Body, HeaderMap, Response};
+use serde::Deserialize;
+
+use crate::{HttpDate, LastModifiedTimestamp};
+
+pub(crate) struct ResponseReader {
+ response: Response<Body>,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsV2Response {
+ pub date: HttpDate,
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Vec<ListObjectsV2Contents>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct ListObjectsV2ResponseBody {
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Option<Vec<ListObjectsV2Contents>>,
+}
+
+impl ListObjectsV2ResponseBody {
+ fn with_date(self, date: HttpDate) -> ListObjectsV2Response {
+ ListObjectsV2Response {
+ date,
+ name: self.name,
+ prefix: self.prefix,
+ key_count: self.key_count,
+ max_keys: self.max_keys,
+ is_truncated: self.is_truncated,
+ continuation_token: self.continuation_token,
+ next_continuation_token: self.next_continuation_token,
+ contents: self.contents.unwrap_or_default(),
+ }
+ }
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectsV2Contents {
+ pub key: String,
+ pub last_modified: LastModifiedTimestamp,
+ pub e_tag: String,
+ pub size: u64,
+ pub storage_class: String,
+}
+
+#[derive(Debug)]
+/// Subset of the head object response (headers only, there is no body)
+/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
+pub struct HeadObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+}
+
+#[derive(Debug)]
+/// Subset of the get object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax
+pub struct GetObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+ pub content: Body,
+}
+
+#[derive(Debug)]
+pub struct CopyObjectResponse {
+ pub copy_object_result: CopyObjectResult,
+ pub x_amz_version_id: Option<String>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct CopyObjectResult {
+ pub e_tag: String,
+ pub last_modified: LastModifiedTimestamp,
+}
+
+/// Subset of the put object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax
+#[derive(Debug)]
+pub enum PutObjectResponse {
+ NeedsRetry,
+ PreconditionFailed,
+ Success(String),
+}
+
+/// Subset of the delete objects response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectsResponse {
+ pub deleted: Option<Vec<DeletedObject>>,
+ pub error: Option<Vec<DeleteObjectError>>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeletedObject {
+ pub delete_marker: Option<bool>,
+ pub delete_marker_version_id: Option<String>,
+ pub key: Option<String>,
+ pub version_id: Option<String>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectError {
+ pub code: Option<String>,
+ pub key: Option<String>,
+ pub message: Option<String>,
+ pub version_id: Option<String>,
+}
+
+impl ResponseReader {
+ pub(crate) fn new(response: Response<Body>) -> Self {
+ Self { response }
+ }
+
+ pub(crate) async fn list_objects_v2_response(self) -> Result<ListObjectsV2Response, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => bail!("bucket does not exist"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let body = body.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?;
+
+ let response: ListObjectsV2ResponseBody =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(response.with_date(date))
+ }
+
+ pub(crate) async fn head_object_response(self) -> Result<Option<HeadObjectResponse>, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+ let body = body.collect().await?.to_bytes();
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+ println!("Headers {:?}", parts.headers);
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(HeadObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ }))
+ }
+
+ pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
+ let (parts, content) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(GetObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ content,
+ }))
+ }
+
+ pub(crate) async fn copy_object_response(self) -> Result<CopyObjectResponse, Error> {
+ let (parts, content) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => bail!("object not found"),
+ StatusCode::FORBIDDEN => bail!("the source object is not in the active tier"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let body = content.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let x_amz_version_id = match parts.headers.get("x-amz-version-id") {
+ Some(version_id) => Some(
+ version_id
+ .to_str()
+ .context("failed to parse version id header")?
+ .to_owned(),
+ ),
+ None => None,
+ };
+
+ let copy_object_result: CopyObjectResult =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(CopyObjectResponse {
+ copy_object_result,
+ x_amz_version_id,
+ })
+ }
+
+ pub(crate) async fn put_object_response(self) -> Result<PutObjectResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ // If-None-Match precondition failed, an object with same key already present.
+ // FIXME: Should this be dropped in favor of re-uploading and rely on the local
+ // cache to detect duplicates to increase data safety guarantees?
+ StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed),
+ StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry),
+ StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+
+ Ok(PutObjectResponse::Success(e_tag))
+ }
+
+ pub(crate) async fn delete_object_response(self) -> Result<(), Error> {
+ let (parts, _body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::NO_CONTENT => (),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ Ok(())
+ }
+
+ pub(crate) async fn delete_objects_response(self) -> Result<DeleteObjectsResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST => bail!("invalid request: {body:?}"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let delete_objects_response: DeleteObjectsResponse =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(delete_objects_response)
+ }
+
+ fn parse_header<T: FromStr>(name: HeaderName, headers: &HeaderMap) -> Result<T, Error>
+ where
+ <T as FromStr>::Err: Send + Sync + 'static,
+ Result<T, <T as FromStr>::Err>: Context<T, <T as FromStr>::Err>,
+ {
+ let header_value = headers
+ .get(&name)
+ .ok_or_else(|| anyhow!("missing header '{name}'"))?;
+ let header_str = header_value
+ .to_str()
+ .with_context(|| format!("non UTF-8 header '{name}'"))?;
+ let value = header_str
+ .parse()
+ .with_context(|| format!("failed to parse header '{name}'"))?;
+ Ok(value)
+ }
+
+ // TODO: Integrity checks via CRC32 or SHA265 currently cannot be performed, since not
+ // supported by all S3 object store providers.
+ // See also:
+ // https://tracker.ceph.com/issues/63951
+ // https://tracker.ceph.com/issues/69105
+ // https://www.backblaze.com/docs/cloud-storage-s3-compatible-api
+ fn parse_x_amz_checksum_crc32_header(headers: &HeaderMap) -> Result<Option<u32>, Error> {
+ let x_amz_checksum_crc32 = match headers.get("x-amz-checksum-crc32") {
+ Some(x_amz_checksum_crc32) => x_amz_checksum_crc32,
+ None => return Ok(None),
+ };
+ let x_amz_checksum_crc32 = base64::decode(x_amz_checksum_crc32.to_str()?)?;
+ let x_amz_checksum_crc32: [u8; 4] = x_amz_checksum_crc32
+ .try_into()
+ .map_err(|_e| anyhow!("failed to convert x-amz-checksum-crc32 header"))?;
+ let x_amz_checksum_crc32 = u32::from_be_bytes(x_amz_checksum_crc32);
+ Ok(Some(x_amz_checksum_crc32))
+ }
+}
--
2.39.5
More information about the pbs-devel
mailing list