[pbs-devel] [PATCH proxmox 2/3] http: add user tag to rate-limited streams

Hannes Laimer h.laimer at proxmox.com
Tue Sep 9 10:52:41 CEST 2025


This handle is initialized whenever a connection in accepted, and the
user is filled in once this conenction is authenticated. This tag is
used for user specific rate-limiting.

Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
 proxmox-http/src/rate_limited_stream.rs | 30 ++++++++++++++++++++++++-
 1 file changed, 29 insertions(+), 1 deletion(-)

diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e9308a47..dcd167c0 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -26,6 +26,12 @@ pub struct RateLimitedStream<S> {
     write_delay: Option<Pin<Box<Sleep>>>,
     update_limiter_cb: Option<Box<RateLimiterCallback>>,
     last_limiter_update: Instant,
+    user_tag: Option<Arc<Mutex<Option<String>>>>,
+    // Since the option holding the handle is set on accept, we have to keep track of when/if the
+    // connection completes auth and the user tag was set. Without this we'd have to wait for normal
+    // update, so ~5s but auth happens also immediately after accept. Like this user rate-limits
+    // are applied as soons as the connection completes auth.
+    user_set: bool,
     stream: S,
 }
 
@@ -53,6 +59,8 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: None,
             last_limiter_update: Instant::now(),
+            user_tag: None,
+            user_set: false,
             stream,
         }
     }
@@ -77,13 +85,25 @@ impl<S> RateLimitedStream<S> {
             write_delay: None,
             update_limiter_cb: Some(Box::new(update_limiter_cb)),
             last_limiter_update: Instant::now(),
+            user_tag: None,
+            user_set: false,
             stream,
         }
     }
 
     fn update_limiters(&mut self) {
         if let Some(ref update_limiter_cb) = self.update_limiter_cb {
-            if self.last_limiter_update.elapsed().as_secs() >= 5 {
+            let mut force_update = false;
+            if !self.user_set {
+                let current_user = self
+                    .user_tag
+                    .as_ref()
+                    .and_then(|h| h.lock().ok().and_then(|g| g.clone()));
+                self.user_set = current_user.is_some();
+                force_update = self.user_set;
+            }
+
+            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
                 self.last_limiter_update = Instant::now();
                 let (read_limiter, write_limiter) = update_limiter_cb();
                 self.read_limiter = read_limiter;
@@ -99,6 +119,14 @@ impl<S> RateLimitedStream<S> {
     pub fn inner_mut(&mut self) -> &mut S {
         &mut self.stream
     }
+
+    pub fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
+        self.user_tag.as_ref().map(Arc::clone)
+    }
+
+    pub fn set_user_tag_handle(&mut self, handle: Arc<Mutex<Option<String>>>) {
+        self.user_tag = Some(handle);
+    }
 }
 
 fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option<Pin<Box<Sleep>>> {
-- 
2.47.2





More information about the pbs-devel mailing list