[pbs-devel] [PATCH proxmox v4 2/3] http: track user tag updates on rate-limited streams

Hannes Laimer h.laimer at proxmox.com
Wed Nov 12 11:35:01 CET 2025


Introduce rate-limit tags with a user variant and let rate-limited
streams hold a shared handle so callbacks can refresh limits
whenever the tag set changes.

If we decide to implement something like [1] in the future this could
potentially include group rate-limits for example.

[1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
Reviewed-by: Christian Ebner <c.ebner at proxmox.com>
Tested-by: Christian Ebner <c.ebner at proxmox.com>
---
 proxmox-http/src/lib.rs                 |  2 +-
 proxmox-http/src/rate_limited_stream.rs | 46 ++++++++++++++++++++++---
 2 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 8b6953b0..990f8f36 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
 #[cfg(feature = "rate-limited-stream")]
 mod rate_limited_stream;
 #[cfg(feature = "rate-limited-stream")]
-pub use rate_limited_stream::RateLimitedStream;
+pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags};
 
 #[cfg(feature = "body")]
 mod body;
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e24df7af..f4d02eb0 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -15,8 +15,15 @@ use super::{RateLimiter, ShareableRateLimit};
 
 type SharedRateLimit = Arc<dyn ShareableRateLimit>;
 
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum RateLimiterTag {
+    User(String),
+}
+
+pub type RateLimiterTags = Vec<RateLimiterTag>;
+
 pub type RateLimiterCallback =
-    dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
+    dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
 
 /// A rate limited stream using [RateLimiter]
 pub struct RateLimitedStream<S> {
@@ -26,6 +33,8 @@ pub struct RateLimitedStream<S> {
     write_delay: Option<Pin<Box<Sleep>>>,
     update_limiter_cb: Option<Box<RateLimiterCallback>>,
     last_limiter_update: Instant,
+    tag_handle: Option<Arc<Mutex<RateLimiterTags>>>,
+    last_tags: Option<RateLimiterTags>,
     stream: S,
 }
 
@@ -53,6 +62,8 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: None,
             last_limiter_update: Instant::now(),
+            tag_handle: None,
+            last_tags: None,
             stream,
         }
     }
@@ -64,12 +75,15 @@ impl<S> RateLimitedStream<S> {
     /// Note: This function is called within an async context, so it
     /// should be fast and must not block.
     pub fn with_limiter_update_cb<
-        F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
+        F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+            + Send
+            + 'static,
     >(
         stream: S,
         update_limiter_cb: F,
     ) -> Self {
-        let (read_limiter, write_limiter) = update_limiter_cb();
+        let tag_handle = Some(Arc::new(Mutex::new(Vec::new())));
+        let (read_limiter, write_limiter) = update_limiter_cb(&[]);
         Self {
             read_limiter,
             read_delay: None,
@@ -77,15 +91,33 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: Some(Box::new(update_limiter_cb)),
             last_limiter_update: Instant::now(),
+            tag_handle,
+            last_tags: None,
             stream,
         }
     }
 
     fn update_limiters(&mut self) {
         if let Some(ref update_limiter_cb) = self.update_limiter_cb {
-            if self.last_limiter_update.elapsed().as_secs() >= 5 {
+            let mut force_update = false;
+            let current_tags = self
+                .tag_handle
+                .as_ref()
+                .map(|handle| handle.lock().unwrap().clone());
+
+            if self.last_tags != current_tags {
+                self.last_tags = current_tags.clone();
+                force_update = true;
+            }
+
+            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
                 self.last_limiter_update = Instant::now();
-                let (read_limiter, write_limiter) = update_limiter_cb();
+                let tags = self
+                    .last_tags
+                    .as_ref()
+                    .map(|tags| tags.as_slice())
+                    .unwrap_or(&[]);
+                let (read_limiter, write_limiter) = update_limiter_cb(tags);
                 self.read_limiter = read_limiter;
                 self.write_limiter = write_limiter;
             }
@@ -99,6 +131,10 @@ impl<S> RateLimitedStream<S> {
     pub fn inner_mut(&mut self) -> &mut S {
         &mut self.stream
     }
+
+    pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+        self.tag_handle.as_ref().map(Arc::clone)
+    }
 }
 
 fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {
-- 
2.47.3





More information about the pbs-devel mailing list