[pbs-devel] [PATCH proxmox v4 2/3] http: track user tag updates on rate-limited streams
Hannes Laimer
h.laimer at proxmox.com
Wed Nov 12 11:35:01 CET 2025
Introduce rate-limit tags with a user variant and let rate-limited
streams hold a shared handle so callbacks can refresh limits
whenever the tag set changes.
If we decide to implement something like [1] in the future this could
potentially include group rate-limits for example.
[1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867
Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
Reviewed-by: Christian Ebner <c.ebner at proxmox.com>
Tested-by: Christian Ebner <c.ebner at proxmox.com>
---
proxmox-http/src/lib.rs | 2 +-
proxmox-http/src/rate_limited_stream.rs | 46 ++++++++++++++++++++++---
2 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
index 8b6953b0..990f8f36 100644
--- a/proxmox-http/src/lib.rs
+++ b/proxmox-http/src/lib.rs
@@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
#[cfg(feature = "rate-limited-stream")]
mod rate_limited_stream;
#[cfg(feature = "rate-limited-stream")]
-pub use rate_limited_stream::RateLimitedStream;
+pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags};
#[cfg(feature = "body")]
mod body;
diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
index e24df7af..f4d02eb0 100644
--- a/proxmox-http/src/rate_limited_stream.rs
+++ b/proxmox-http/src/rate_limited_stream.rs
@@ -15,8 +15,15 @@ use super::{RateLimiter, ShareableRateLimit};
type SharedRateLimit = Arc<dyn ShareableRateLimit>;
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub enum RateLimiterTag {
+ User(String),
+}
+
+pub type RateLimiterTags = Vec<RateLimiterTag>;
+
pub type RateLimiterCallback =
- dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
+ dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
/// A rate limited stream using [RateLimiter]
pub struct RateLimitedStream<S> {
@@ -26,6 +33,8 @@ pub struct RateLimitedStream<S> {
write_delay: Option<Pin<Box<Sleep>>>,
update_limiter_cb: Option<Box<RateLimiterCallback>>,
last_limiter_update: Instant,
+ tag_handle: Option<Arc<Mutex<RateLimiterTags>>>,
+ last_tags: Option<RateLimiterTags>,
stream: S,
}
@@ -53,6 +62,8 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: None,
last_limiter_update: Instant::now(),
+ tag_handle: None,
+ last_tags: None,
stream,
}
}
@@ -64,12 +75,15 @@ impl<S> RateLimitedStream<S> {
/// Note: This function is called within an async context, so it
/// should be fast and must not block.
pub fn with_limiter_update_cb<
- F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
+ F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
+ + Send
+ + 'static,
>(
stream: S,
update_limiter_cb: F,
) -> Self {
- let (read_limiter, write_limiter) = update_limiter_cb();
+ let tag_handle = Some(Arc::new(Mutex::new(Vec::new())));
+ let (read_limiter, write_limiter) = update_limiter_cb(&[]);
Self {
read_limiter,
read_delay: None,
@@ -77,15 +91,33 @@ impl<S> RateLimitedStream<S> {
write_delay: None,
update_limiter_cb: Some(Box::new(update_limiter_cb)),
last_limiter_update: Instant::now(),
+ tag_handle,
+ last_tags: None,
stream,
}
}
fn update_limiters(&mut self) {
if let Some(ref update_limiter_cb) = self.update_limiter_cb {
- if self.last_limiter_update.elapsed().as_secs() >= 5 {
+ let mut force_update = false;
+ let current_tags = self
+ .tag_handle
+ .as_ref()
+ .map(|handle| handle.lock().unwrap().clone());
+
+ if self.last_tags != current_tags {
+ self.last_tags = current_tags.clone();
+ force_update = true;
+ }
+
+ if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
self.last_limiter_update = Instant::now();
- let (read_limiter, write_limiter) = update_limiter_cb();
+ let tags = self
+ .last_tags
+ .as_ref()
+ .map(|tags| tags.as_slice())
+ .unwrap_or(&[]);
+ let (read_limiter, write_limiter) = update_limiter_cb(tags);
self.read_limiter = read_limiter;
self.write_limiter = write_limiter;
}
@@ -99,6 +131,10 @@ impl<S> RateLimitedStream<S> {
pub fn inner_mut(&mut self) -> &mut S {
&mut self.stream
}
+
+ pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
+ self.tag_handle.as_ref().map(Arc::clone)
+ }
}
fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {
--
2.47.3
More information about the pbs-devel
mailing list