[pbs-devel] [PATCH proxmox v3 2/3] http: track user tag updates on rate-limited streams

Christian Ebner c.ebner at proxmox.com
Wed Nov 12 10:46:17 CET 2025


nit: needs reformatting via cargo fmt

On 11/10/25 2:42 PM, Hannes Laimer wrote:
> Introduce rate-limit tags with a user variant and let rate-limited
> streams hold a shared handle so callbacks can refresh limits
> whenever the tag set changes.
> 
> If we decide to implement something like [1] in the future this could
> potentially include group rate-limits for example.
> 
> [1] https://bugzilla.proxmox.com/show_bug.cgi?id=5867
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   proxmox-http/src/lib.rs                 |  2 +-
>   proxmox-http/src/rate_limited_stream.rs | 40 +++++++++++++++++++++----
>   2 files changed, 36 insertions(+), 6 deletions(-)
> 
> diff --git a/proxmox-http/src/lib.rs b/proxmox-http/src/lib.rs
> index 8b6953b0..990f8f36 100644
> --- a/proxmox-http/src/lib.rs
> +++ b/proxmox-http/src/lib.rs
> @@ -34,7 +34,7 @@ pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimi
>   #[cfg(feature = "rate-limited-stream")]
>   mod rate_limited_stream;
>   #[cfg(feature = "rate-limited-stream")]
> -pub use rate_limited_stream::RateLimitedStream;
> +pub use rate_limited_stream::{RateLimitedStream, RateLimiterTag, RateLimiterTags};
>   
>   #[cfg(feature = "body")]
>   mod body;
> diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
> index e24df7af..6b525591 100644
> --- a/proxmox-http/src/rate_limited_stream.rs
> +++ b/proxmox-http/src/rate_limited_stream.rs
> @@ -15,8 +15,15 @@ use super::{RateLimiter, ShareableRateLimit};
>   
>   type SharedRateLimit = Arc<dyn ShareableRateLimit>;
>   
> +#[derive(Clone, Debug, PartialEq, Eq)]
> +pub enum RateLimiterTag {
> +    User(String),
> +}
> +
> +pub type RateLimiterTags = Vec<RateLimiterTag>;
> +
>   pub type RateLimiterCallback =
> -    dyn Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
> +    dyn Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send;
>   
>   /// A rate limited stream using [RateLimiter]
>   pub struct RateLimitedStream<S> {
> @@ -26,6 +33,8 @@ pub struct RateLimitedStream<S> {
>       write_delay: Option<Pin<Box<Sleep>>>,
>       update_limiter_cb: Option<Box<RateLimiterCallback>>,
>       last_limiter_update: Instant,
> +    tag_handle: Option<Arc<Mutex<RateLimiterTags>>>,
> +    last_tags: Option<RateLimiterTags>,
>       stream: S,
>   }
>   
> @@ -53,6 +62,8 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: None,
>               last_limiter_update: Instant::now(),
> +            tag_handle: None,
> +            last_tags: None,
>               stream,
>           }
>       }
> @@ -64,12 +75,13 @@ impl<S> RateLimitedStream<S> {
>       /// Note: This function is called within an async context, so it
>       /// should be fast and must not block.
>       pub fn with_limiter_update_cb<
> -        F: Fn() -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
> +        F: Fn(&[RateLimiterTag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send + 'static,
>       >(
>           stream: S,
>           update_limiter_cb: F,
>       ) -> Self {
> -        let (read_limiter, write_limiter) = update_limiter_cb();
> +        let tag_handle = Some(Arc::new(Mutex::new(Vec::new())));
> +        let (read_limiter, write_limiter) = update_limiter_cb(&[]);
>           Self {
>               read_limiter,
>               read_delay: None,
> @@ -77,15 +89,29 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: Some(Box::new(update_limiter_cb)),
>               last_limiter_update: Instant::now(),
> +            tag_handle,
> +            last_tags: None,
>               stream,
>           }
>       }
>   
>       fn update_limiters(&mut self) {
>           if let Some(ref update_limiter_cb) = self.update_limiter_cb {
> -            if self.last_limiter_update.elapsed().as_secs() >= 5 {
> +            let mut force_update = false;
> +            let current_tags = self
> +                .tag_handle
> +                .as_ref()
> +                .map(|handle| handle.lock().unwrap().clone());
> +
> +            if self.last_tags != current_tags {
> +                self.last_tags = current_tags.clone();
> +                force_update = true;
> +            }
> +
> +            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
>                   self.last_limiter_update = Instant::now();
> -                let (read_limiter, write_limiter) = update_limiter_cb();
> +                let tags = self.last_tags.as_ref().map(|tags| tags.as_slice()).unwrap_or(&[]);
> +                let (read_limiter, write_limiter) = update_limiter_cb(tags);
>                   self.read_limiter = read_limiter;
>                   self.write_limiter = write_limiter;
>               }
> @@ -99,6 +125,10 @@ impl<S> RateLimitedStream<S> {
>       pub fn inner_mut(&mut self) -> &mut S {
>           &mut self.stream
>       }
> +
> +    pub fn tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle.as_ref().map(Arc::clone)
> +    }
>   }
>   
>   fn register_traffic(limiter: &dyn ShareableRateLimit, count: usize) -> Option<Pin<Box<Sleep>>> {





More information about the pbs-devel mailing list