[pbs-devel] [PATCH proxmox 2/2] s3-client: add shared rate limiter via https connector

Christian Ebner c.ebner at proxmox.com
Thu Aug 28 12:26:00 CEST 2025


Allows to configure a shared rate limiter for the s3 client to limit
upload and download bandwidth. This will help users which suffer from
issues due to network congestion by the unlimited s3 client data
transfers.

Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
---
 proxmox-s3-client/Cargo.toml            |  5 +-
 proxmox-s3-client/debian/control        |  4 ++
 proxmox-s3-client/examples/s3_client.rs |  1 +
 proxmox-s3-client/src/api_types.rs      | 29 ++++++++++++
 proxmox-s3-client/src/client.rs         | 62 +++++++++++++++++++++++--
 proxmox-s3-client/src/lib.rs            |  4 +-
 6 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml
index a4a947c1..371921f1 100644
--- a/proxmox-s3-client/Cargo.toml
+++ b/proxmox-s3-client/Cargo.toml
@@ -22,6 +22,7 @@ hyper-util = { workspace = true, features = [ "client-legacy", "tokio", "http1"
 hyper = { workspace = true, optional = true }
 iso8601 = { workspace = true, optional = true }
 md5 = { workspace = true, optional = true }
+nix = { workspace = true, optional = true }
 openssl = { workspace = true, optional = true }
 quick-xml = { workspace = true, features = [ "async-tokio" ], optional = true }
 regex.workspace = true
@@ -34,7 +35,8 @@ tracing = { workspace = true, optional = true }
 url = {workspace = true, optional = true }
 
 proxmox-base64 = { workspace = true, optional = true }
-proxmox-http = { workspace = true, features = [ "body", "client", "client-trait", "rate-limiter" ], optional = true }
+proxmox-http = { workspace = true, features = [ "body", "client", "client-trait", "rate-limiter", "shared-rate-limiter" ], optional = true }
+proxmox-human-byte.workspace = true
 proxmox-schema = { workspace = true, features = [ "api-macro", "api-types" ] }
 proxmox-serde.workspace = true
 proxmox-time = {workspace = true, optional = true }
@@ -51,6 +53,7 @@ impl = [
     "dep:hyper",
     "dep:iso8601",
     "dep:md5",
+    "dep:nix",
     "dep:openssl",
     "dep:quick-xml",
     "dep:serde-xml-rs",
diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control
index b9a25246..817c248b 100644
--- a/proxmox-s3-client/debian/control
+++ b/proxmox-s3-client/debian/control
@@ -8,6 +8,7 @@ Build-Depends-Arch: cargo:native <!nocheck>,
  libstd-rust-dev <!nocheck>,
  librust-anyhow-1+default-dev <!nocheck>,
  librust-const-format-0.2+default-dev <!nocheck>,
+ librust-proxmox-human-byte-1+default-dev <!nocheck>,
  librust-proxmox-schema-4+api-macro-dev (>= 4.1.0-~~) <!nocheck>,
  librust-proxmox-schema-4+api-types-dev (>= 4.1.0-~~) <!nocheck>,
  librust-proxmox-schema-4+default-dev (>= 4.1.0-~~) <!nocheck>,
@@ -31,6 +32,7 @@ Depends:
  ${misc:Depends},
  librust-anyhow-1+default-dev,
  librust-const-format-0.2+default-dev,
+ librust-proxmox-human-byte-1+default-dev,
  librust-proxmox-schema-4+api-macro-dev (>= 4.1.0-~~),
  librust-proxmox-schema-4+api-types-dev (>= 4.1.0-~~),
  librust-proxmox-schema-4+default-dev (>= 4.1.0-~~),
@@ -74,6 +76,7 @@ Depends:
  librust-hyper-util-0.1+tokio-dev (>= 0.1.12-~~),
  librust-iso8601-0.6+default-dev (>= 0.6.1-~~),
  librust-md5-0.7+default-dev,
+ librust-nix-0.29+default-dev,
  librust-openssl-0.10+default-dev,
  librust-proxmox-base64-1+default-dev,
  librust-proxmox-http-1+body-dev (>= 1.0.2-~~),
@@ -81,6 +84,7 @@ Depends:
  librust-proxmox-http-1+client-trait-dev (>= 1.0.2-~~),
  librust-proxmox-http-1+default-dev (>= 1.0.2-~~),
  librust-proxmox-http-1+rate-limiter-dev (>= 1.0.2-~~),
+ librust-proxmox-http-1+shared-rate-limiter-dev (>= 1.0.2-~~),
  librust-proxmox-time-2+default-dev (>= 2.1.0-~~),
  librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~),
  librust-quick-xml-0.36+default-dev (>= 0.36.1-~~),
diff --git a/proxmox-s3-client/examples/s3_client.rs b/proxmox-s3-client/examples/s3_client.rs
index 67baf467..dd3885d7 100644
--- a/proxmox-s3-client/examples/s3_client.rs
+++ b/proxmox-s3-client/examples/s3_client.rs
@@ -39,6 +39,7 @@ async fn run() -> Result<(), anyhow::Error> {
         fingerprint: Some("<s3-api-fingerprint>".to_string()),
         put_rate_limit: None,
         provider_quirks: Vec::new(),
+        rate_limiter_config: None,
     };
 
     // Creating a client instance and connect to api endpoint
diff --git a/proxmox-s3-client/src/api_types.rs b/proxmox-s3-client/src/api_types.rs
index 115b1d2d..9071868a 100644
--- a/proxmox-s3-client/src/api_types.rs
+++ b/proxmox-s3-client/src/api_types.rs
@@ -2,6 +2,7 @@ use anyhow::bail;
 use const_format::concatcp;
 use serde::{Deserialize, Serialize};
 
+use proxmox_human_byte::HumanByte;
 use proxmox_schema::api_types::{
     CERT_FINGERPRINT_SHA256_SCHEMA, DNS_LABEL_STR, IPRE_STR, SAFE_ID_FORMAT,
 };
@@ -126,6 +127,22 @@ serde_plain::derive_fromstr_from_deserialize!(ProviderQuirks);
                 type: ProviderQuirks,
             },
         },
+        "rate-in": {
+            type: HumanByte,
+            optional: true,
+        },
+        "burst-in": {
+            type: HumanByte,
+            optional: true,
+        },
+        "rate-out": {
+            type: HumanByte,
+            optional: true,
+        },
+        "burst-out": {
+            type: HumanByte,
+            optional: true,
+        },
     },
 )]
 #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)]
@@ -154,6 +171,18 @@ pub struct S3ClientConfig {
     /// List of provider specific feature implementation quirks.
     #[serde(skip_serializing_if = "Option::is_none")]
     pub provider_quirks: Option<Vec<ProviderQuirks>>,
+    /// Download rate limit.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub rate_in: Option<HumanByte>,
+    /// Download burst.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub burst_in: Option<HumanByte>,
+    /// Upload rate limit.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub rate_out: Option<HumanByte>,
+    /// Upload burst
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub burst_out: Option<HumanByte>,
 }
 
 impl S3ClientConfig {
diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs
index ec29d95a..4e3872a0 100644
--- a/proxmox-s3-client/src/client.rs
+++ b/proxmox-s3-client/src/client.rs
@@ -1,4 +1,4 @@
-use std::path::Path;
+use std::path::{Path, PathBuf};
 use std::str::FromStr;
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
@@ -12,6 +12,7 @@ use hyper::{Request, Response};
 use hyper_util::client::legacy::connect::HttpConnector;
 use hyper_util::client::legacy::Client;
 use hyper_util::rt::TokioExecutor;
+use nix::unistd::User;
 use openssl::hash::MessageDigest;
 use openssl::sha::Sha256;
 use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
@@ -19,7 +20,7 @@ use openssl::x509::X509StoreContextRef;
 use tracing::error;
 
 use proxmox_http::client::HttpsConnector;
-use proxmox_http::{Body, RateLimit, RateLimiter};
+use proxmox_http::{Body, RateLimit, RateLimiter, SharedRateLimiter};
 use proxmox_schema::api_types::CERT_FINGERPRINT_SHA256_SCHEMA;
 
 use crate::api_types::{ProviderQuirks, S3ClientConfig};
@@ -51,6 +52,25 @@ pub enum S3PathPrefix {
     None,
 }
 
+/// Options for the https connector's rate limiter
+pub struct S3RateLimiterOptions {
+    /// ID for the shared rate limiter.
+    pub id: String,
+    /// Base path for the shared memory mapped file
+    pub base_path: PathBuf,
+    /// User for the to be created shared memory mapped file and folders
+    pub user: User,
+}
+
+/// Configuration  for the https connector's rate limiter
+pub struct S3RateLimiterConfig {
+    options: S3RateLimiterOptions,
+    rate_in: Option<u64>,
+    burst_in: Option<u64>,
+    rate_out: Option<u64>,
+    burst_out: Option<u64>,
+}
+
 /// Configuration options for client
 pub struct S3ClientOptions {
     /// Endpoint to access S3 object store.
@@ -75,6 +95,8 @@ pub struct S3ClientOptions {
     pub put_rate_limit: Option<u64>,
     /// Provider implementation specific features and limitations
     pub provider_quirks: Vec<ProviderQuirks>,
+    /// Configuration options for the shared rate limiter.
+    pub rate_limiter_config: Option<S3RateLimiterConfig>,
 }
 
 impl S3ClientOptions {
@@ -84,7 +106,15 @@ impl S3ClientOptions {
         secret_key: String,
         bucket: Option<String>,
         common_prefix: String,
+        rate_limiter_options: Option<S3RateLimiterOptions>,
     ) -> Self {
+        let rate_limiter_config = rate_limiter_options.map(|options| S3RateLimiterConfig {
+            options,
+            rate_in: config.rate_in.map(|human_bytes| human_bytes.as_u64()),
+            burst_in: config.burst_in.map(|human_bytes| human_bytes.as_u64()),
+            rate_out: config.rate_out.map(|human_bytes| human_bytes.as_u64()),
+            burst_out: config.burst_out.map(|human_bytes| human_bytes.as_u64()),
+        });
         Self {
             endpoint: config.endpoint,
             port: config.port,
@@ -97,6 +127,7 @@ impl S3ClientOptions {
             secret_key,
             put_rate_limit: config.put_rate_limit,
             provider_quirks: config.provider_quirks.unwrap_or_default(),
+            rate_limiter_config,
         }
     }
 }
@@ -149,11 +180,36 @@ impl S3Client {
         // want communication to object store backend api to always use https
         http_connector.enforce_http(false);
         http_connector.set_connect_timeout(Some(S3_HTTP_CONNECT_TIMEOUT));
-        let https_connector = HttpsConnector::with_connector(
+        let mut https_connector = HttpsConnector::with_connector(
             http_connector,
             ssl_connector_builder.build(),
             S3_TCP_KEEPALIVE_TIME,
         );
+
+        if let Some(limiter_config) = &options.rate_limiter_config {
+            if let Some(limit) = limiter_config.rate_in {
+                let limiter = SharedRateLimiter::mmap_shmem(
+                    &format!("{}.in", limiter_config.options.id),
+                    limit,
+                    limiter_config.burst_in.unwrap_or(limit),
+                    limiter_config.options.user.clone(),
+                    limiter_config.options.base_path.clone(),
+                )?;
+                https_connector.set_read_limiter(Some(Arc::new(limiter)));
+            }
+
+            if let Some(limit) = limiter_config.rate_out {
+                let limiter = SharedRateLimiter::mmap_shmem(
+                    &format!("{}.out", limiter_config.options.id),
+                    limit,
+                    limiter_config.burst_out.unwrap_or(limit),
+                    limiter_config.options.user.clone(),
+                    limiter_config.options.base_path.clone(),
+                )?;
+                https_connector.set_write_limiter(Some(Arc::new(limiter)));
+            }
+        }
+
         let client = Client::builder(TokioExecutor::new()).build::<_, Body>(https_connector);
 
         let authority_template = if let Some(port) = options.port {
diff --git a/proxmox-s3-client/src/lib.rs b/proxmox-s3-client/src/lib.rs
index 26e7032b..d02fd0dc 100644
--- a/proxmox-s3-client/src/lib.rs
+++ b/proxmox-s3-client/src/lib.rs
@@ -20,7 +20,9 @@ pub use aws_sign_v4::uri_decode;
 #[cfg(feature = "impl")]
 mod client;
 #[cfg(feature = "impl")]
-pub use client::{S3Client, S3ClientOptions, S3PathPrefix, S3_HTTP_REQUEST_TIMEOUT};
+pub use client::{
+    S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3_HTTP_REQUEST_TIMEOUT,
+};
 #[cfg(feature = "impl")]
 mod timestamps;
 #[cfg(feature = "impl")]
-- 
2.47.2





More information about the pbs-devel mailing list