[pbs-devel] [PATCH proxmox v2 2/3] http: track user tag updates on rate-limited streams

Hannes Laimer h.laimer at proxmox.com
Fri Nov 7 14:23:25 CET 2025


Introduce rate-limit tags with a user variant and let rate-limited
streams hold a shared handle so callbacks can refresh limits
whenever the tag set changes.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 proxmox-http/src/lib.rs                 |  2 +-
 proxmox-http/src/rate_limited_stream.rs | 35 ++++++++++++++++++++++++-
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 8b6953b0..990f8f36 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
 #[cfg(feature = "rate-limited-stream")]
 mod rate_limited_stream;
 #[cfg(feature = "rate-limited-stream")]
-pub use rate_limited_stream::RateLimitedStream;
+pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags};
 
 #[cfg(feature = "body")]
 mod body;
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e24df7af..4bc5c414 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -15,6 +15,13 @@ use super::{RateLimiter, ShareableRateLimit};
 
 type SharedRateLimit = Arc<dyn ShareableRateLimit>;
 
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum RateLimiterTag {
+    User(String),
+}
+
+pub type RateLimiterTags = Vec<RateLimiterTag>;
+
 pub type RateLimiterCallback =
     dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
 
@@ -26,6 +33,8 @@ pub struct RateLimitedStream<S> {
     write_delay: Option<Pin<Box<Sleep>>>,
     update_limiter_cb: Option<Box<RateLimiterCallback>>,
     last_limiter_update: Instant,
+    tag_handle: Option<Arc<Mutex<RateLimiterTags>>>,
+    last_tags: Option<RateLimiterTags>,
     stream: S,
 }
 
@@ -53,6 +62,8 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: None,
             last_limiter_update: Instant::now(),
+            tag_handle: None,
+            last_tags: None,
             stream,
         }
     }
@@ -77,13 +88,26 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: Some(Box::new(update_limiter_cb)),
             last_limiter_update: Instant::now(),
+            tag_handle: None,
+            last_tags: None,
             stream,
         }
     }
 
     fn update_limiters(&mut self) {
         if let Some(ref update_limiter_cb) = self.update_limiter_cb {
-            if self.last_limiter_update.elapsed().as_secs() >= 5 {
+            let mut force_update = false;
+            let current_tags = self
+                .tag_handle
+                .as_ref()
+                .map(|handle| handle.lock().unwrap().clone());
+
+            if self.last_tags != current_tags {
+                self.last_tags = current_tags;
+                force_update = true;
+            }
+
+            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
                 self.last_limiter_update = Instant::now();
                 let (read_limiter, write_limiter) = update_limiter_cb();
                 self.read_limiter = read_limiter;
@@ -99,6 +123,15 @@ impl<S> RateLimitedStream<S> {
     pub fn inner_mut(&mut self) -> &mut S {
         &mut self.stream
     }
+
+    pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+        self.tag_handle.as_ref().map(Arc::clone)
+    }
+
+    pub fn set_tag_handle(&mut self, handle: Arc<Mutex<RateLimiterTags>>) {
+        self.tag_handle = Some(handle);
+        self.last_tags = None;
+    }
 }
 
 fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {
-- 
2.47.3





More information about the pbs-devel mailing list