[pbs-devel] [PATCH proxmox-backup v4 10/45] s3 client: implement methods to operate on s3 objects in bucket
Christian Ebner
c.ebner at proxmox.com
Mon Jun 23 11:40:31 CEST 2025
Adds the basic implementation of the client to use s3 object stores
as backend for PBS datastores.
This implements the basic client actions on a bucket and objects
stored within given bucket.
This is not feature complete and intended to be extended on a
per-demand fashion rather than implementing the whole client at once.
Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
Cargo.toml | 3 +
pbs-s3-client/Cargo.toml | 7 +
pbs-s3-client/src/client.rs | 379 ++++++++++++++++++++++++++-
pbs-s3-client/src/lib.rs | 4 +-
pbs-s3-client/src/response_reader.rs | 279 ++++++++++++++++++++
5 files changed, 670 insertions(+), 2 deletions(-)
create mode 100644 pbs-s3-client/src/response_reader.rs
diff --git a/Cargo.toml b/Cargo.toml
index 6c617989b..80635b2b8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -139,15 +139,18 @@ log = "0.4.17"
nix = "0.29"
nom = "7"
num-traits = "0.2"
+md5 = "0.7.0"
once_cell = "1.3.1"
openssl = "0.10.40"
percent-encoding = "2.1"
pin-project-lite = "0.2"
+quick-xml = "0.36.1"
regex = "1.5.5"
rustyline = "14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_plain = "1.0"
+serde-xml-rs = "0.5"
siphasher = "0.3"
syslog = "6"
tar = "0.4"
diff --git a/pbs-s3-client/Cargo.toml b/pbs-s3-client/Cargo.toml
index d41cd8870..2c30730a2 100644
--- a/pbs-s3-client/Cargo.toml
+++ b/pbs-s3-client/Cargo.toml
@@ -8,14 +8,21 @@ rust-version.workspace = true
[dependencies]
anyhow.workspace = true
+bytes.workspace = true
+futures.workspace = true
hex = { workspace = true, features = [ "serde" ] }
http-body-util.workspace = true
hyper-util = { workspace = true, features = ["client-legacy", "tokio", "http1"] }
hyper.workspace = true
iso8601.workspace = true
+md5.workspace = true
openssl.workspace = true
+quick-xml = { workspace = true, features = ["async-tokio"] }
serde.workspace = true
serde_plain.workspace = true
+serde-xml-rs.workspace = true
+tokio = { workspace = true, features = [] }
+tokio-util = { workspace = true, features = ["compat"] }
tracing.workspace = true
url.workspace = true
diff --git a/pbs-s3-client/src/client.rs b/pbs-s3-client/src/client.rs
index 82cbe944b..198ef710b 100644
--- a/pbs-s3-client/src/client.rs
+++ b/pbs-s3-client/src/client.rs
@@ -1,12 +1,19 @@
+use std::path::Path;
+use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
-use hyper::http::uri::Authority;
+use hyper::body::Incoming;
+use hyper::http::method::Method;
+use hyper::http::uri::{Authority, Parts, PathAndQuery, Scheme};
+use hyper::http::{header, HeaderValue, StatusCode, Uri};
+use hyper::{Request, Response};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use openssl::hash::MessageDigest;
+use openssl::sha::Sha256;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509StoreContextRef;
use tracing::error;
@@ -15,6 +22,14 @@ use pbs_api_types::{S3ClientConfig, S3ClientSecretsConfig};
use proxmox_http::client::HttpsConnector;
use proxmox_http::Body;
+use crate::aws_sign_v4::aws_sign_v4_signature;
+use crate::aws_sign_v4::AWS_SIGN_V4_DATETIME_FORMAT;
+use crate::object_key::{RelS3ObjectKey, S3ObjectKey};
+use crate::response_reader::{
+ DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, ListObjectsV2Response,
+ PutObjectResponse, ResponseReader,
+};
+
const S3_HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const S3_TCP_KEEPALIVE_TIME: u32 = 120;
@@ -169,4 +184,366 @@ impl S3Client {
"unexpected certificate fingerprint {certificate_fingerprint}"
))
}
+
+ async fn prepare(&self, mut request: Request<Body>) -> Result<Request<Body>, Error> {
+ let host_header = request
+ .uri()
+ .authority()
+ .ok_or_else(|| format_err!("request missing authority"))?
+ .to_string();
+
+ // Content verification for aws s3 signature
+ let mut hasher = Sha256::new();
+ let contents = request
+ .body()
+ .as_bytes()
+ .ok_or_else(|| format_err!("cannot prepare request with streaming body"))?;
+ hasher.update(contents);
+ // Use MD5 as upload integrity check, as other methods are not supported by all S3 object
+ // store providers and might be ignored and this is recommended by AWS as described in
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
+ let payload_md5 = md5::compute(contents);
+ let payload_digest = hex::encode(hasher.finish());
+ let payload_len = contents.len();
+
+ let epoch = proxmox_time::epoch_i64();
+ let datetime = proxmox_time::strftime_utc(AWS_SIGN_V4_DATETIME_FORMAT, epoch)?;
+
+ request
+ .headers_mut()
+ .insert("x-amz-date", HeaderValue::from_str(&datetime)?);
+ request
+ .headers_mut()
+ .insert("host", HeaderValue::from_str(&host_header)?);
+ request.headers_mut().insert(
+ "x-amz-content-sha256",
+ HeaderValue::from_str(&payload_digest)?,
+ );
+ request.headers_mut().insert(
+ header::CONTENT_LENGTH,
+ HeaderValue::from_str(&payload_len.to_string())?,
+ );
+ if payload_len > 0 {
+ let md5_digest = proxmox_base64::encode(*payload_md5);
+ request
+ .headers_mut()
+ .insert("Content-MD5", HeaderValue::from_str(&md5_digest)?);
+ }
+
+ let signature = aws_sign_v4_signature(&request, &self.options, epoch, &payload_digest)?;
+
+ request
+ .headers_mut()
+ .insert(header::AUTHORIZATION, HeaderValue::from_str(&signature)?);
+
+ Ok(request)
+ }
+
+ pub async fn send(&self, request: Request<Body>) -> Result<Response<Incoming>, Error> {
+ let request = self.prepare(request).await?;
+ let response = tokio::time::timeout(S3_HTTP_CONNECT_TIMEOUT, self.client.request(request))
+ .await
+ .context("request timeout")??;
+ Ok(response)
+ }
+
+ /// Check if bucket exists and got permissions to access it.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
+ pub async fn head_bucket(&self) -> Result<(), Error> {
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.build_uri("/", &[])?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let (parts, _body) = response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST | StatusCode::FORBIDDEN | StatusCode::NOT_FOUND => {
+ bail!("bucket does not exist or no permission to access it")
+ }
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ Ok(())
+ }
+
+ /// Fetch metadata from an object without returning the object itself.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
+ pub async fn head_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ ) -> Result<Option<HeadObjectResponse>, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::HEAD)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.head_object_response().await
+ }
+
+ /// Fetch an object from object store.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+ pub async fn get_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ ) -> Result<Option<GetObjectResponse>, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.get_object_response().await
+ }
+
+ /// Returns some or all (up to 1,000) of the objects in a bucket with each request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+ pub async fn list_objects_v2(
+ &self,
+ prefix: &S3PathPrefix,
+ continuation_token: Option<&str>,
+ ) -> Result<ListObjectsV2Response, Error> {
+ let mut query = vec![("list-type", "2")];
+ let abs_prefix: String;
+ if let S3PathPrefix::Some(prefix) = prefix {
+ abs_prefix = if prefix.starts_with("/") {
+ format!("{}{prefix}", self.options.store_prefix)
+ } else {
+ format!("{}/{prefix}", self.options.store_prefix)
+ };
+ query.push(("prefix", &abs_prefix));
+ }
+ if let Some(token) = continuation_token {
+ query.push(("continuation-token", token));
+ }
+ let request = Request::builder()
+ .method(Method::GET)
+ .uri(self.build_uri("/", &query)?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.list_objects_v2_response().await
+ }
+
+ /// Add a new object to a bucket.
+ ///
+ /// Do not reupload if an object with matching key already exists in the bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
+ pub async fn put_object(
+ &self,
+ object_key: RelS3ObjectKey,
+ object_data: Body,
+ ) -> Result<PutObjectResponse, Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::PUT)
+ .uri(self.build_uri(&object_key, &[])?)
+ .header(header::CONTENT_TYPE, "binary/octet")
+ // Never overwrite pre-existing objects with the same key.
+ //.header(header::IF_NONE_MATCH, "*")
+ .body(object_data)?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.put_object_response().await
+ }
+
+ /// Removes an object from a bucket.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html
+ pub async fn delete_object(&self, object_key: RelS3ObjectKey) -> Result<(), Error> {
+ let object_key = object_key.to_full_key(&self.options.store_prefix);
+ let request = Request::builder()
+ .method(Method::DELETE)
+ .uri(self.build_uri(&object_key, &[])?)
+ .body(Body::empty())?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_object_response().await
+ }
+
+ /// Delete multiple objects from a bucket using a single HTTP request.
+ /// See reference docs: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
+ pub async fn delete_objects(
+ &self,
+ object_keys: &[S3ObjectKey],
+ ) -> Result<DeleteObjectsResponse, Error> {
+ let mut body = String::from(r#"<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#);
+ for object_key in object_keys {
+ body.push_str("<Object><Key>");
+ body.push_str(object_key);
+ body.push_str("</Key></Object>");
+ }
+ body.push_str("</Delete>");
+ let request = Request::builder()
+ .method(Method::POST)
+ .uri(self.build_uri("/", &[("delete", "")])?)
+ .body(Body::from(body))?;
+
+ let response = self.send(request).await?;
+ let response_reader = ResponseReader::new(response);
+ response_reader.delete_objects_response().await
+ }
+
+ /// Delete objects by given key prefix.
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix(&self, prefix: &S3PathPrefix) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(prefix, next_continuation_token.as_deref())
+ .await?;
+
+ let objects_to_delete: Vec<S3ObjectKey> = list_objects_result
+ .contents
+ .into_iter()
+ .map(|item| item.key)
+ .collect();
+
+ let response = self.delete_objects(&objects_to_delete).await?;
+ if response.error.is_some() {
+ delete_errors = true;
+ }
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+ Ok(delete_errors)
+ }
+
+ /// Delete objects by given key prefix, but exclude items pre-filter based on suffix
+ /// (including the parent component of the matched suffix). E.g. do not remove items in a
+ /// snapshot directory, by matching based on the protected file marker (given as suffix).
+ /// Items matching the suffix provided as `ignore` will be excluded in the parent of a matching
+ /// suffix entry. E.g. owner and notes for a group, if a group snapshots was matched by a
+ /// protected marker.
+ ///
+ /// Requires at least 2 api calls.
+ pub async fn delete_objects_by_prefix_with_suffix_filter(
+ &self,
+ prefix: &S3PathPrefix,
+ suffix: &str,
+ excldue_from_parent: &[&str],
+ ) -> Result<bool, Error> {
+ // S3 API does not provide a convenient way to delete objects by key prefix.
+ // List all objects with given group prefix and delete all objects found, so this
+ // requires at least 2 API calls.
+ let mut next_continuation_token: Option<String> = None;
+ let mut delete_errors = false;
+ let mut prefix_filters = Vec::new();
+ let mut list_objects = Vec::new();
+ loop {
+ let list_objects_result = self
+ .list_objects_v2(prefix, next_continuation_token.as_deref())
+ .await?;
+
+ let mut prefixes: Vec<String> = list_objects_result
+ .contents
+ .iter()
+ .filter_map(|item| {
+ let prefix_filter = item.key.strip_suffix(suffix).map(|prefix| {
+ let path = Path::new(prefix);
+ if let Some(parent) = path.parent() {
+ for filter in excldue_from_parent {
+ let filter = parent.join(filter);
+ // valid utf-8 as combined from `str` values
+ prefix_filters.push(filter.to_string_lossy().to_string());
+ }
+ }
+ prefix.to_string()
+ });
+ if prefix_filter.is_none() {
+ list_objects.push(item.key.clone());
+ }
+ prefix_filter
+ })
+ .collect();
+ prefix_filters.append(&mut prefixes);
+
+ if list_objects_result.is_truncated {
+ next_continuation_token = list_objects_result
+ .next_continuation_token
+ .as_ref()
+ .cloned();
+ continue;
+ }
+ break;
+ }
+
+ let objects_to_delete: Vec<S3ObjectKey> = list_objects
+ .into_iter()
+ .filter_map(|item| {
+ for prefix in &prefix_filters {
+ if item.strip_prefix(prefix).is_some() {
+ return None;
+ }
+ }
+ Some(item)
+ })
+ .collect();
+
+ for objects in objects_to_delete.chunks(1000) {
+ let result = self.delete_objects(objects).await?;
+ if result.error.is_some() {
+ delete_errors = true;
+ }
+ }
+
+ Ok(delete_errors)
+ }
+
+ #[inline(always)]
+ /// Helper to generate [`Uri`] instance with common properties based on given path and query.
+ fn build_uri(&self, mut path: &str, query: &[(&str, &str)]) -> Result<Uri, Error> {
+ if path.starts_with('/') {
+ path = &path[1..];
+ }
+ let mut path_and_query = if self.options.path_style {
+ format!("/{bucket}/{path}", bucket = self.options.bucket)
+ } else {
+ format!("/{path}")
+ };
+
+ if !query.is_empty() {
+ path_and_query.push('?');
+ // No further input validation as http::uri::Builder will check path and query
+ let mut query_iter = query.iter().peekable();
+ while let Some((key, value)) = query_iter.next() {
+ path_and_query.push_str(key);
+ if !value.is_empty() {
+ path_and_query.push('=');
+ path_and_query.push_str(value);
+ }
+ if query_iter.peek().is_some() {
+ path_and_query.push('&');
+ }
+ }
+ }
+
+ let path_and_query =
+ PathAndQuery::from_str(&path_and_query).context("failed to parse path and query")?;
+
+ let mut uri_parts = Parts::default();
+ uri_parts.scheme = Some(Scheme::HTTPS);
+ uri_parts.authority = Some(self.authority.clone());
+ uri_parts.path_and_query = Some(path_and_query);
+
+ Uri::from_parts(uri_parts).context("failed to build uri")
+ }
}
diff --git a/pbs-s3-client/src/lib.rs b/pbs-s3-client/src/lib.rs
index 9c4d11e43..5ee103e25 100644
--- a/pbs-s3-client/src/lib.rs
+++ b/pbs-s3-client/src/lib.rs
@@ -3,7 +3,9 @@ pub use aws_sign_v4::uri_decode;
mod client;
pub use client::{S3Client, S3ClientOptions, S3PathPrefix};
mod object_key;
-pub use object_key::{S3ObjectKey, S3_CONTENT_PREFIX};
+pub use object_key::{RelS3ObjectKey, S3ObjectKey, S3_CONTENT_PREFIX};
+mod response_reader;
+pub use response_reader::PutObjectResponse;
use std::time::Duration;
diff --git a/pbs-s3-client/src/response_reader.rs b/pbs-s3-client/src/response_reader.rs
new file mode 100644
index 000000000..85ff0f23a
--- /dev/null
+++ b/pbs-s3-client/src/response_reader.rs
@@ -0,0 +1,279 @@
+use std::str::FromStr;
+
+use anyhow::{anyhow, bail, Context, Error};
+use http_body_util::BodyExt;
+use hyper::body::Incoming;
+use hyper::header::HeaderName;
+use hyper::http::header;
+use hyper::http::StatusCode;
+use hyper::{HeaderMap, Response};
+use serde::Deserialize;
+
+use crate::S3ObjectKey;
+use crate::{HttpDate, LastModifiedTimestamp};
+
+pub(crate) struct ResponseReader {
+ response: Response<Incoming>,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsV2Response {
+ pub date: HttpDate,
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Vec<ListObjectsV2Contents>,
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct ListObjectsV2ResponseBody {
+ pub name: String,
+ pub prefix: String,
+ pub key_count: u64,
+ pub max_keys: u64,
+ pub is_truncated: bool,
+ pub continuation_token: Option<String>,
+ pub next_continuation_token: Option<String>,
+ pub contents: Option<Vec<ListObjectsV2Contents>>,
+}
+
+impl ListObjectsV2ResponseBody {
+ fn with_date(self, date: HttpDate) -> ListObjectsV2Response {
+ ListObjectsV2Response {
+ date,
+ name: self.name,
+ prefix: self.prefix,
+ key_count: self.key_count,
+ max_keys: self.max_keys,
+ is_truncated: self.is_truncated,
+ continuation_token: self.continuation_token,
+ next_continuation_token: self.next_continuation_token,
+ contents: self.contents.unwrap_or_default(),
+ }
+ }
+}
+
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListObjectsV2Contents {
+ pub key: S3ObjectKey,
+ pub last_modified: LastModifiedTimestamp,
+ pub e_tag: String,
+ pub size: u64,
+ pub storage_class: String,
+}
+
+#[derive(Debug)]
+/// Subset of the head object response (headers only, there is no body)
+/// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
+pub struct HeadObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+}
+
+/// Subset of the get object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_ResponseSyntax
+pub struct GetObjectResponse {
+ pub content_length: u64,
+ pub content_type: String,
+ pub date: HttpDate,
+ pub e_tag: String,
+ pub last_modified: HttpDate,
+ pub content: Incoming,
+}
+
+/// Subset of the put object response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_ResponseSyntax
+#[derive(Debug)]
+pub enum PutObjectResponse {
+ NeedsRetry,
+ PreconditionFailed,
+ Success(String),
+}
+
+/// Subset of the delete objects response
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_ResponseElements
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectsResponse {
+ pub deleted: Option<Vec<DeletedObject>>,
+ pub error: Option<Vec<DeleteObjectError>>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeletedObject.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeletedObject {
+ pub delete_marker: Option<bool>,
+ pub delete_marker_version_id: Option<String>,
+ pub key: Option<S3ObjectKey>,
+ pub version_id: Option<String>,
+}
+
+/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_Error.html
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+pub struct DeleteObjectError {
+ pub code: Option<String>,
+ pub key: Option<S3ObjectKey>,
+ pub message: Option<String>,
+ pub version_id: Option<String>,
+}
+
+impl ResponseReader {
+ pub(crate) fn new(response: Response<Incoming>) -> Self {
+ Self { response }
+ }
+
+ pub(crate) async fn list_objects_v2_response(self) -> Result<ListObjectsV2Response, Error> {
+ let (parts, body) = self.response.into_parts();
+ let body = body.collect().await?.to_bytes();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => bail!("bucket does not exist"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let body = String::from_utf8(body.to_vec())?;
+
+ let date: HttpDate = Self::parse_header(header::DATE, &parts.headers)?;
+
+ let response: ListObjectsV2ResponseBody =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(response.with_date(date))
+ }
+
+ pub(crate) async fn head_object_response(self) -> Result<Option<HeadObjectResponse>, Error> {
+ let (parts, body) = self.response.into_parts();
+ let body = body.collect().await?.to_bytes();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(HeadObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ }))
+ }
+
+ pub(crate) async fn get_object_response(self) -> Result<Option<GetObjectResponse>, Error> {
+ let (parts, content) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::NOT_FOUND => return Ok(None),
+ StatusCode::FORBIDDEN => bail!("object is archived and inaccessible until restored"),
+ status_code => bail!("unexpected status code {status_code}"),
+ }
+
+ let content_length: u64 = Self::parse_header(header::CONTENT_LENGTH, &parts.headers)?;
+ let content_type = Self::parse_header(header::CONTENT_TYPE, &parts.headers)?;
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+ let date = Self::parse_header(header::DATE, &parts.headers)?;
+ let last_modified = Self::parse_header(header::LAST_MODIFIED, &parts.headers)?;
+
+ Ok(Some(GetObjectResponse {
+ content_length,
+ content_type,
+ date,
+ e_tag,
+ last_modified,
+ content,
+ }))
+ }
+
+ pub(crate) async fn put_object_response(self) -> Result<PutObjectResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ // If-None-Match precondition failed, an object with same key already present.
+ // FIXME: Should this be dropped in favor of re-uploading and rely on the local
+ // cache to detect duplicates to increase data safety guarantees?
+ StatusCode::PRECONDITION_FAILED => return Ok(PutObjectResponse::PreconditionFailed),
+ StatusCode::CONFLICT => return Ok(PutObjectResponse::NeedsRetry),
+ StatusCode::BAD_REQUEST => bail!("invalid request"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ if !body.is_empty() {
+ bail!("got unexpected non-empty response body");
+ }
+
+ let e_tag = Self::parse_header(header::ETAG, &parts.headers)?;
+
+ Ok(PutObjectResponse::Success(e_tag))
+ }
+
+ pub(crate) async fn delete_object_response(self) -> Result<(), Error> {
+ let (parts, _body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::NO_CONTENT => (),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ Ok(())
+ }
+
+ pub(crate) async fn delete_objects_response(self) -> Result<DeleteObjectsResponse, Error> {
+ let (parts, body) = self.response.into_parts();
+
+ match parts.status {
+ StatusCode::OK => (),
+ StatusCode::BAD_REQUEST => bail!("invalid request"),
+ status_code => bail!("unexpected status code {status_code}"),
+ };
+
+ let body = body.collect().await?.to_bytes();
+ let body = String::from_utf8(body.to_vec())?;
+
+ let delete_objects_response: DeleteObjectsResponse =
+ serde_xml_rs::from_str(&body).context("failed to parse response body")?;
+
+ Ok(delete_objects_response)
+ }
+
+ fn parse_header<T: FromStr>(name: HeaderName, headers: &HeaderMap) -> Result<T, Error>
+ where
+ <T as FromStr>::Err: Send + Sync + 'static,
+ Result<T, <T as FromStr>::Err>: Context<T, <T as FromStr>::Err>,
+ {
+ let header_value = headers
+ .get(&name)
+ .ok_or_else(|| anyhow!("missing header '{name}'"))?;
+ let header_str = header_value
+ .to_str()
+ .with_context(|| format!("non UTF-8 header '{name}'"))?;
+ let value = header_str
+ .parse()
+ .with_context(|| format!("failed to parse header '{name}'"))?;
+ Ok(value)
+ }
+}
--
2.47.2
More information about the pbs-devel
mailing list