[pbs-devel] [PATCH proxmox 2/3] http: add user tag to rate-limited streams
Christian Ebner
c.ebner at proxmox.com
Fri Nov 7 12:01:03 CET 2025
some comments inline
On 9/9/25 10:53 AM, Hannes Laimer wrote:
> This handle is initialized whenever a connection in accepted, and the
> user is filled in once this conenction is authenticated. This tag is
> used for user specific rate-limiting.
>
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> proxmox-http/src/rate_limited_stream.rs | 30 ++++++++++++++++++++++++-
> 1 file changed, 29 insertions(+), 1 deletion(-)
>
> diff --git a/proxmox-http/src/rate_limited_stream.rs b/proxmox-http/src/rate_limited_stream.rs
> index e9308a47..dcd167c0 100644
> --- a/proxmox-http/src/rate_limited_stream.rs
> +++ b/proxmox-http/src/rate_limited_stream.rs
> @@ -26,6 +26,12 @@ pub struct RateLimitedStream<S> {
> write_delay: Option<Pin<Box<Sleep>>>,
> update_limiter_cb: Option<Box<RateLimiterCallback>>,
> last_limiter_update: Instant,
> + user_tag: Option<Arc<Mutex<Option<String>>>>,
> + // Since the option holding the handle is set on accept, we have to keep track of when/if the
> + // connection completes auth and the user tag was set. Without this we'd have to wait for normal
> + // update, so ~5s but auth happens also immediately after accept. Like this user rate-limits
> + // are applied as soons as the connection completes auth.
> + user_set: bool,
as mentioned in the reply to the cover letter, this probably should be
agnostic to the concept of a user, rather let's call this a generic tag,
maybe with a dedicated type.
E.g. this could be an
pub enum Tag {
String(String),
Untagged,
}
> stream: S,
> }
>
> @@ -53,6 +59,8 @@ impl<S> RateLimitedStream<S> {
> write_delay: None,
> update_limiter_cb: None,
> last_limiter_update: Instant::now(),
> + user_tag: None,
> + user_set: false,
> stream,
> }
> }
> @@ -77,13 +85,25 @@ impl<S> RateLimitedStream<S> {
> write_delay: None,
> update_limiter_cb: Some(Box::new(update_limiter_cb)),
> last_limiter_update: Instant::now(),
> + user_tag: None,
> + user_set: false,
> stream,
> }
> }
>
> fn update_limiters(&mut self) {
> if let Some(ref update_limiter_cb) = self.update_limiter_cb {
> - if self.last_limiter_update.elapsed().as_secs() >= 5 {
> + let mut force_update = false;
> + if !self.user_set {
> + let current_user = self
> + .user_tag
> + .as_ref()
> + .and_then(|h| h.lock().ok().and_then(|g| g.clone()));
> + self.user_set = current_user.is_some();
> + force_update = self.user_set;
> + }
> +
> + if force_update || self.last_limiter_update.elapsed().as_secs() >= 5 {
> self.last_limiter_update = Instant::now();
> let (read_limiter, write_limiter) = update_limiter_cb();
> self.read_limiter = read_limiter;
> @@ -99,6 +119,14 @@ impl<S> RateLimitedStream<S> {
> pub fn inner_mut(&mut self) -> &mut S {
> &mut self.stream
> }
> +
> + pub fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + self.user_tag.as_ref().map(Arc::clone)
> + }
> +
> + pub fn set_user_tag_handle(&mut self, handle: Arc<Mutex<Option<String>>>) {
> + self.user_tag = Some(handle);
> + }
could we extend the callback by passing the tag along as parameter
instead of using these (see also next patch)?
F: Fn(Tag) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) + Send
+ 'static,
or even something like
F: Fn(&[Tag]) -> (Option<SharedRateLimit>, Option<SharedRateLimit>) +
Send + 'static,
as far as I see, this callback never needs to modify the tag itself, it
is just used to determine the rate limits to be applied.
This simplifies the logic on the next patch.
> }
>
> fn register_traffic(limiter: &(dyn ShareableRateLimit), count: usize) -> Option<Pin<Box<Sleep>>> {
More information about the pbs-devel
mailing list