[pbs-devel] [PATCH proxmox 2/3] http: add user tag to rate-limited streams

Christian Ebner c.ebner at proxmox.com
Fri Nov 7 12:01:03 CET 2025


some comments inline

On 9/9/25 10:53 AM, Hannes Laimer wrote:
> This handle is initialized whenever a connection in accepted, and the
> user is filled in once this conenction is authenticated. This tag is
> used for user specific rate-limiting.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   proxmox-http/src/rate_limited_stream.rs | 30 ++++++++++++++++++++++++-
>   1 file changed, 29 insertions(+), 1 deletion(-)
> 
> diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
> index e9308a47..dcd167c0 100644
> --- a/proxmox-http/src/rate_limited_stream.rs
> +++ b/proxmox-http/src/rate_limited_stream.rs
> @@ -26,6 +26,12 @@ pub struct RateLimitedStream<S> {
>       write_delay: Option<Pin<Box<Sleep>>>,
>       update_limiter_cb: Option<Box<RateLimiterCallback>>,
>       last_limiter_update: Instant,
> +    user_tag: Option<Arc<Mutex<Option<String>>>>,
> +    // Since the option holding the handle is set on accept, we have to keep track of when/if the
> +    // connection completes auth and the user tag was set. Without this we'd have to wait for normal
> +    // update, so ~5s but auth happens also immediately after accept. Like this user rate-limits
> +    // are applied as soons as the connection completes auth.
> +    user_set: bool,

as mentioned in the reply to the cover letter, this probably should be 
agnostic to the concept of a user, rather let's call this a generic tag, 
maybe with a dedicated type.

E.g. this could be an
pub enum Tag {
     String(String),
     Untagged,
}

>       stream: S,
>   }
>   
> @@ -53,6 +59,8 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: None,
>               last_limiter_update: Instant::now(),
> +            user_tag: None,
> +            user_set: false,
>               stream,
>           }
>       }
> @@ -77,13 +85,25 @@ impl<S> RateLimitedStream<S> {
>               write_delay: None,
>               update_limiter_cb: Some(Box::new(update_limiter_cb)),
>               last_limiter_update: Instant::now(),
> +            user_tag: None,
> +            user_set: false,
>               stream,
>           }
>       }
>   
>       fn update_limiters(&mut self) {
>           if let Some(ref update_limiter_cb) = self.update_limiter_cb {
> -            if self.last_limiter_update.elapsed().as_secs() >= 5 {
> +            let mut force_update = false;
> +            if !self.user_set {
> +                let current_user = self
> +                    .user_tag
> +                    .as_ref()
> +                    .and_then(|h| h.lock().ok().and_then(|g| g.clone()));
> +                self.user_set = current_user.is_some();
> +                force_update = self.user_set;
> +            }
> +
> +            if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
>                   self.last_limiter_update = Instant::now();
>                   let (read_limiter, write_limiter) = update_limiter_cb();
>                   self.read_limiter = read_limiter;
> @@ -99,6 +119,14 @@ impl<S> RateLimitedStream<S> {
>       pub fn inner_mut(&mut self) -> &mut S {
>           &mut self.stream
>       }
> +
> +    pub fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        self.user_tag.as_ref().map(Arc::clone)
> +    }
> +
> +    pub fn set_user_tag_handle(&mut self, handle: Arc<Mutex<Option<String>>>) {
> +        self.user_tag = Some(handle);
> +    }

could we extend the callback by passing the tag along as parameter 
instead of using these (see also next patch)?

F: Fn(Tag) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send 
+ 'static,

or even something like

F: Fn(&[Tag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + 
Send + 'static,

as far as I see, this callback never needs to modify the tag itself, it 
is just used to determine the rate limits to be applied.

This simplifies the logic on the next patch.

>   }
>   
>   fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option<Pin<Box<Sleep>>> {





More information about the pbs-devel mailing list