[pbs-devel] [PATCH proxmox v2 3/3] rest-server: propagate rate-limit tags from authenticated users

Christian Ebner c.ebner at proxmox.com
Mon Nov 10 09:55:12 CET 2025


On 11/7/25 2:23 PM, Hannes Laimer wrote:
> Tie REST connections rate-limiter callbacks to a shared tag handle
> so we can push authenticated user IDs into the limiter, keeping
> tags in sync whenever we clear or update them.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   proxmox-rest-server/src/connection.rs |  18 +++-
>   proxmox-rest-server/src/rest.rs       | 137 +++++++++++++++++++++++++-
>   2 files changed, 148 insertions(+), 7 deletions(-)
> 
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..47750dc8 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
>   use tokio_stream::wrappers::ReceiverStream;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -use proxmox_http::{RateLimitedStream, ShareableRateLimit};
> +use proxmox_http::{RateLimitedStream, RateLimiterTag, RateLimiterTags, ShareableRateLimit};
>   
>   #[cfg(feature = "rate-limited-stream")]
>   pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
>   type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> +        std::net::SocketAddr,
> +        &[RateLimiterTag],
> +    ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
>       + Send
>       + Sync
>       + 'static;
> @@ -369,7 +372,16 @@ impl AcceptBuilder {
>   
>           #[cfg(feature = "rate-limited-stream")]
>           let socket = match self.lookup_rate_limiter.clone() {
> -            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> +            Some(lookup) => {
> +                let tags: Arc<Mutex<RateLimiterTags>> = Arc::new(Mutex::new(Vec::new()));
> +                let tags_cb = Arc::clone(&tags);
> +                let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || {
> +                    let tags = tags_cb.lock().unwrap().clone();
> +                    lookup(peer, &tags)
> +                });
> +                s.set_tag_handle(tags);
> +                s
> +            }

as mentioned on the previous patch, this can greatly be simplified if 
the tag handle is instantiated within the helper and the tag list passed 
along as parameter to the callback, see the suggested diff below.

>               None => RateLimitedStream::with_limiter(socket, None, None),
>           };
>   
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index b76c4bc9..6e9692ae 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -29,6 +29,10 @@ use tower_service::Service;
>   use url::form_urlencoded;
>   
>   use proxmox_http::Body;
> +#[cfg(feature = "rate-limited-stream")]
> +use proxmox_http::{RateLimiterTag, RateLimiterTags};
> +#[cfg(not(feature = "rate-limited-stream"))]
> +type RateLimiterTags = ();
>   use proxmox_router::{
>       check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>       RpcEnvironmentType, UserInformation,
> @@ -86,10 +90,26 @@ impl RestServer {
>           }
>       }
>   
> -    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +    #[cfg(not(feature = "rate-limited-stream"))]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + ?Sized,
> +    {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
> +        })
> +    }
> +
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + PeerRateLimitTags + ?Sized,
> +    {
>           Ok(ApiService {
>               peer: peer.peer_addr()?,
>               api_config: Arc::clone(&self.api_config),
> +            rate_limit_tags: peer.rate_limiter_tag_handle(),
>           })
>       }
>   }
> @@ -185,6 +205,11 @@ pub trait PeerAddress {
>       fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerRateLimitTags {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
> +}
> +
>   // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
>   // accept before the peer address is requested, so let's just generally implement this for
>   // Pin<Box<T>>
> @@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>       }
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        T::rate_limiter_tag_handle(&**self)
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.get_ref().rate_limiter_tag_handle()
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::TcpStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::UnixStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle()
> +    }
> +}
> +
>   // Helper [Service] containing the peer Address
>   //
>   // The lower level connection [Service] implementation on
> @@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>   pub struct ApiService {
>       pub peer: std::net::SocketAddr,
>       pub api_config: Arc<ApiConfig>,
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   impl ApiService {
> @@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
>               Some(proxied_peer) => proxied_peer,
>               None => self.peer,
>           };
> +        #[cfg(feature = "rate-limited-stream")]
> +        let rate_limit_tags = self.rate_limit_tags.clone();
> +        #[cfg(not(feature = "rate-limited-stream"))]
> +        let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
>   
>           let header = self.api_config
>               .auth_cookie_name
> @@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
>                });
>   
>           async move {
> -            let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
> +
> +            let mut response = match Arc::clone(&config)
> +                .handle_request(req, &peer, rate_limit_tags.clone())
> +                .await
> +            {
>                   Ok(response) => response,
>                   Err(err) => {
>                       let (err, code) = match err.downcast_ref::<HttpError>() {
> @@ -860,6 +934,8 @@ impl ApiConfig {
>           self: Arc<ApiConfig>,
>           req: Request<Incoming>,
>           peer: &std::net::SocketAddr,
> +        #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
> +        rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>       ) -> Result<Response<Body>, Error> {
>           let (parts, body) = req.into_parts();
>           let method = parts.method.clone();
> @@ -890,6 +966,8 @@ impl ApiConfig {
>                       full_path: &path,
>                       relative_path_components,
>                       rpcenv,
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    rate_limit_tags: rate_limit_tags.clone(),
>                   })
>                   .await;
>           }
> @@ -901,13 +979,29 @@ impl ApiConfig {
>           if components.is_empty() {
>               match self.check_auth(&parts.headers, &method).await {
>                   Ok((auth_id, _user_info)) => {
> -                    rpcenv.set_auth_id(Some(auth_id));
> +                    rpcenv.set_auth_id(Some(auth_id.clone()));
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(auth_id));
> +                    }
>                       return Ok(self.get_index(rpcenv, parts).await);
>                   }
>                   Err(AuthError::Generic(_)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
>                   }
> -                Err(AuthError::NoData) => {}
> +                Err(AuthError::NoData) =>
> +                {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
> +                }
>               }
>               Ok(self.get_index(rpcenv, parts).await)
>           } else {
> @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
>       full_path: &'a str,
>       relative_path_components: &'a [&'a str],
>       rpcenv: RestEnvironment,
> +    #[cfg(feature = "rate-limited-stream")]
> +    rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   pub(crate) struct Formatted {
> @@ -992,6 +1088,8 @@ impl Formatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1026,10 +1124,20 @@ impl Formatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1045,6 +1153,11 @@ impl Formatted {
>                       return Err(err);
>                   }
>               }
> +        } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>           }
>   
>           match api_method {
> @@ -1108,6 +1221,8 @@ impl Unformatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1133,10 +1248,20 @@ impl Unformatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1153,6 +1278,10 @@ impl Unformatted {
>                   }
>               }
>           } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>               user_info = Box::new(EmptyUserInformation {});
>           }
>   


diff --git a/proxmox-rest-server/src/connection.rs 
b/proxmox-rest-server/src/connection.rs
index 47750dc8..e9f51828 100644
--- a/proxmox-rest-server/src/connection.rs
+++ b/proxmox-rest-server/src/connection.rs
@@ -372,16 +372,9 @@ impl AcceptBuilder {

          #[cfg(feature = "rate-limited-stream")]
          let socket = match self.lookup_rate_limiter.clone() {
-            Some(lookup) => {
-                let tags: Arc<Mutex<RateLimiterTags>> = 
Arc::new(Mutex::new(Vec::new()));
-                let tags_cb = Arc::clone(&tags);
-                let mut s = 
RateLimitedStream::with_limiter_update_cb(socket, move || {
-                    let tags = tags_cb.lock().unwrap().clone();
-                    lookup(peer, &tags)
-                });
-                s.set_tag_handle(tags);
-                s
-            }
+            Some(lookup) => 
RateLimitedStream::with_limiter_update_cb(socket, move |tags| {
+                lookup(peer, tags)
+            }),
              None => RateLimitedStream::with_limiter(socket, None, None),
          };





More information about the pbs-devel mailing list