[pbs-devel] [PATCH proxmox 2/3] http: add user tag to rate-limited streams
Hannes Laimer
h.laimer at proxmox.com
Tue Sep 9 10:52:41 CEST 2025
This handle is initialized whenever a connection in accepted, and the
user is filled in once this conenction is authenticated. This tag is
used for user specific rate-limiting.
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
---
proxmox-http/src/rate_limited_stream.rs | 30 ++++++++++++++++++++++++-
1 file changed, 29 insertions(+), 1 deletion(-)
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e9308a47..dcd167c0 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -26,6 +26,12 @@ pub struct RateLimitedStream<S> {
write_delay: Option<Pin<Box<Sleep>>>,
update_limiter_cb: Option<Box<RateLimiterCallback>>,
last_limiter_update: Instant,
+ user_tag: Option<Arc<Mutex<Option<String>>>>,
+ // Since the option holding the handle is set on accept, we have to keep track of when/if the
+ // connection completes auth and the user tag was set. Without this we'd have to wait for normal
+ // update, so ~5s but auth happens also immediately after accept. Like this user rate-limits
+ // are applied as soons as the connection completes auth.
+ user_set: bool,
stream: S,
}
@@ -53,6 +59,8 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: None,
last_limiter_update: Instant::now(),
+ user_tag: None,
+ user_set: false,
stream,
}
}
@@ -77,13 +85,25 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: Some(Box::new(update_limiter_cb)),
last_limiter_update: Instant::now(),
+ user_tag: None,
+ user_set: false,
stream,
}
}
fn update_limiters(&mut self) {
if let Some(ref update_limiter_cb) = self.update_limiter_cb {
- if self.last_limiter_update.elapsed().as_secs() >= 5 {
+ let mut force_update = false;
+ if !self.user_set {
+ let current_user = self
+ .user_tag
+ .as_ref()
+ .and_then(|h| h.lock().ok().and_then(|g| g.clone()));
+ self.user_set = current_user.is_some();
+ force_update = self.user_set;
+ }
+
+ if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
self.last_limiter_update = Instant::now();
let (read_limiter, write_limiter) = update_limiter_cb();
self.read_limiter = read_limiter;
@@ -99,6 +119,14 @@ impl<S> RateLimitedStream<S> {
pub fn inner_mut(&mut self) -> &mut S {
&mut self.stream
}
+
+ pub fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
+ self.user_tag.as_ref().map(Arc::clone)
+ }
+
+ pub fn set_user_tag_handle(&mut self, handle: Arc<Mutex<Option<String>>>) {
+ self.user_tag = Some(handle);
+ }
}
fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option<Pin<Box<Sleep>>> {
--
2.47.2
More information about the pbs-devel
mailing list