[pbs-devel] [PATCH proxmox v5 2/3] http: track user tag updates on rate-limited streams

Hannes Laimer h.laimer at proxmox.com
Fri Nov 21 14:50:39 CET 2025


Introduce rate-limit tags with a user variant and let rate-limited
streams hold a shared handle so callbacks can refresh limits
whenever the tag set changes.

If we decide to implement something like [1] in the future this could
potentially include group rate-limits for example.

[1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 proxmox-http/src/lib.rs                 |  4 +-
 proxmox-http/src/rate_limited_stream.rs | 71 +++++++++++++++++++++++--
 2 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 8b6953b0..2c7bb750 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -34,7 +34,9 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
 #[cfg(feature = "rate-limited-stream")]
 mod rate_limited_stream;
 #[cfg(feature = "rate-limited-stream")]
-pub use rate_limited_stream::RateLimitedStream;
+pub use rate_limited_stream::{
+    RateLimitedStream, RateLimiterTag, RateLimiterTags, RateLimiterTagsHandle,
+};
 
 #[cfg(feature = "body")]
 mod body;
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e24df7af..27f1d1f9 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -2,6 +2,7 @@ use std::future::Future;
 use std::io::IoSlice;
 use std::marker::Unpin;
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
 
@@ -15,8 +16,39 @@ use super::{RateLimiter, ShareableRateLimit};
 
 type SharedRateLimit = Arc<dyn ShareableRateLimit>;
 
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum RateLimiterTag {
+    User(String),
+}
+
+pub type RateLimiterTags = Vec<RateLimiterTag>;
+
+#[derive(Clone, Debug)]
+pub struct RateLimiterTagsHandle {
+    tags: Arc<Mutex<RateLimiterTags>>,
+    dirty: Arc<AtomicBool>,
+}
+
+impl RateLimiterTagsHandle {
+    fn new() -> Self {
+        Self {
+            tags: Arc::new(Mutex::new(Vec::new())),
+            dirty: Arc::new(AtomicBool::new(false)),
+        }
+    }
+
+    pub fn lock(&self) -> std::sync::MutexGuard<'_, RateLimiterTags> {
+        self.tags.lock().unwrap()
+    }
+
+    pub fn set_tags(&self, tags: RateLimiterTags) {
+        *self.tags.lock().unwrap() = tags;
+        self.dirty.store(true, Ordering::Release);
+    }
+}
+
 pub type RateLimiterCallback =
-    dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
+    dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
 
 /// A rate limited stream using [RateLimiter]
 pub struct RateLimitedStream<S> {
@@ -26,6 +58,7 @@ pub struct RateLimitedStream<S> {
     write_delay: Option<Pin<Box<Sleep>>>,
     update_limiter_cb: Option<Box<RateLimiterCallback>>,
     last_limiter_update: Instant,
+    tag_handle: Option<RateLimiterTagsHandle>,
     stream: S,
 }
 
@@ -53,6 +86,7 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: None,
             last_limiter_update: Instant::now(),
+            tag_handle: None,
             stream,
         }
     }
@@ -64,12 +98,15 @@ impl<S> RateLimitedStream<S> {
     /// Note: This function is called within an async context, so it
     /// should be fast and must not block.
     pub fn with_limiter_update_cb<
-        F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
+        F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+            + Send
+            + 'static,
     >(
         stream: S,
         update_limiter_cb: F,
     ) -> Self {
-        let (read_limiter, write_limiter) = update_limiter_cb();
+        let tag_handle = Some(RateLimiterTagsHandle::new());
+        let (read_limiter, write_limiter) = update_limiter_cb(&[]);
         Self {
             read_limiter,
             read_delay: None,
@@ -77,15 +114,29 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: Some(Box::new(update_limiter_cb)),
             last_limiter_update: Instant::now(),
+            tag_handle,
             stream,
         }
     }
 
     fn update_limiters(&mut self) {
         if let Some(ref update_limiter_cb) = self.update_limiter_cb {
-            if self.last_limiter_update.elapsed().as_secs() >= 5 {
+            let mut force_update = false;
+
+            if let Some(ref handle) = self.tag_handle {
+                if handle.dirty.swap(false, Ordering::Acquire) {
+                    force_update = true;
+                }
+            }
+
+            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
                 self.last_limiter_update = Instant::now();
-                let (read_limiter, write_limiter) = update_limiter_cb();
+                let (read_limiter, write_limiter) = if let Some(ref handle) = self.tag_handle {
+                    let tags = handle.lock();
+                    update_limiter_cb(&tags)
+                } else {
+                    update_limiter_cb(&[])
+                };
                 self.read_limiter = read_limiter;
                 self.write_limiter = write_limiter;
             }
@@ -99,6 +150,16 @@ impl<S> RateLimitedStream<S> {
     pub fn inner_mut(&mut self) -> &mut S {
         &mut self.stream
     }
+
+    pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+        self.tag_handle
+            .as_ref()
+            .map(|handle| Arc::clone(&handle.tags))
+    }
+
+    pub fn rate_limiter_tags_handle(&self) -> Option<&RateLimiterTagsHandle> {
+        self.tag_handle.as_ref()
+    }
 }
 
 fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {
-- 
2.47.3





More information about the pbs-devel mailing list