[pbs-devel] [PATCH proxmox v3 3/3] rest-server: propagate rate-limit tags from authenticated users

Christian Ebner c.ebner at proxmox.com
Wed Nov 12 10:55:10 CET 2025


nit: needs reformatting via cargo fmt

On 11/10/25 2:43 PM, Hannes Laimer wrote:
> Tie REST connections rate-limiter callbacks to a shared tag handle
> so we can push authenticated user IDs into the limiter, keeping
> tags in sync whenever we clear or update them.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   proxmox-rest-server/src/connection.rs |  11 ++-
>   proxmox-rest-server/src/rest.rs       | 137 +++++++++++++++++++++++++-
>   2 files changed, 141 insertions(+), 7 deletions(-)
> 
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..ff2ee139 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -24,7 +24,7 @@ use tokio_openssl::SslStream;
>   use tokio_stream::wrappers::ReceiverStream;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -use proxmox_http::{RateLimitedStream, ShareableRateLimit};
> +use proxmox_http::{RateLimitedStream, RateLimiterTag, ShareableRateLimit};
>   
>   #[cfg(feature = "rate-limited-stream")]
>   pub type SharedRateLimit = Arc<dyn ShareableRateLimit>;
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
>   type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> +        std::net::SocketAddr,
> +        &[RateLimiterTag],
> +    ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
>       + Send
>       + Sync
>       + 'static;
> @@ -369,7 +372,9 @@ impl AcceptBuilder {
>   
>           #[cfg(feature = "rate-limited-stream")]
>           let socket = match self.lookup_rate_limiter.clone() {
> -            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> +            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move |tags| {
> +                lookup(peer, tags)
> +            }),
>               None => RateLimitedStream::with_limiter(socket, None, None),
>           };
>   
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index b76c4bc9..6e9692ae 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -29,6 +29,10 @@ use tower_service::Service;
>   use url::form_urlencoded;
>   
>   use proxmox_http::Body;
> +#[cfg(feature = "rate-limited-stream")]
> +use proxmox_http::{RateLimiterTag, RateLimiterTags};
> +#[cfg(not(feature = "rate-limited-stream"))]
> +type RateLimiterTags = ();
>   use proxmox_router::{
>       check_api_permission, ApiHandler, ApiMethod, HttpError, Permission, RpcEnvironment,
>       RpcEnvironmentType, UserInformation,
> @@ -86,10 +90,26 @@ impl RestServer {
>           }
>       }
>   
> -    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +    #[cfg(not(feature = "rate-limited-stream"))]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + ?Sized,
> +    {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
> +        })
> +    }
> +
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + PeerRateLimitTags + ?Sized,
> +    {
>           Ok(ApiService {
>               peer: peer.peer_addr()?,
>               api_config: Arc::clone(&self.api_config),
> +            rate_limit_tags: peer.rate_limiter_tag_handle(),
>           })
>       }
>   }
> @@ -185,6 +205,11 @@ pub trait PeerAddress {
>       fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerRateLimitTags {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>>;
> +}
> +
>   // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
>   // accept before the peer address is requested, so let's just generally implement this for
>   // Pin<Box<T>>
> @@ -221,6 +246,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>       }
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for Pin<Box<T>> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        T::rate_limiter_tag_handle(&**self)
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerRateLimitTags> PeerRateLimitTags for tokio_openssl::SslStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.get_ref().rate_limiter_tag_handle()
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::TcpStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerRateLimitTags for tokio::net::UnixStream {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerRateLimitTags for proxmox_http::RateLimitedStream<T> {
> +    fn rate_limiter_tag_handle(&self) -> Option<Arc<Mutex<RateLimiterTags>>> {
> +        self.tag_handle()
> +    }
> +}
> +
>   // Helper [Service] containing the peer Address
>   //
>   // The lower level connection [Service] implementation on
> @@ -233,6 +293,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>   pub struct ApiService {
>       pub peer: std::net::SocketAddr,
>       pub api_config: Arc<ApiConfig>,
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   impl ApiService {
> @@ -354,6 +416,10 @@ impl Service<Request<Incoming>> for ApiService {
>               Some(proxied_peer) => proxied_peer,
>               None => self.peer,
>           };
> +        #[cfg(feature = "rate-limited-stream")]
> +        let rate_limit_tags = self.rate_limit_tags.clone();
> +        #[cfg(not(feature = "rate-limited-stream"))]
> +        let rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>> = None;
>   
>           let header = self.api_config
>               .auth_cookie_name
> @@ -368,7 +434,15 @@ impl Service<Request<Incoming>> for ApiService {
>                });
>   
>           async move {
> -            let mut response = match Arc::clone(&config).handle_request(req, &peer).await {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
> +
> +            let mut response = match Arc::clone(&config)
> +                .handle_request(req, &peer, rate_limit_tags.clone())
> +                .await
> +            {
>                   Ok(response) => response,
>                   Err(err) => {
>                       let (err, code) = match err.downcast_ref::<HttpError>() {
> @@ -860,6 +934,8 @@ impl ApiConfig {
>           self: Arc<ApiConfig>,
>           req: Request<Incoming>,
>           peer: &std::net::SocketAddr,
> +        #[cfg_attr(not(feature = "rate-limited-stream"), allow(unused_variables))]
> +        rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>       ) -> Result<Response<Body>, Error> {
>           let (parts, body) = req.into_parts();
>           let method = parts.method.clone();
> @@ -890,6 +966,8 @@ impl ApiConfig {
>                       full_path: &path,
>                       relative_path_components,
>                       rpcenv,
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    rate_limit_tags: rate_limit_tags.clone(),
>                   })
>                   .await;
>           }
> @@ -901,13 +979,29 @@ impl ApiConfig {
>           if components.is_empty() {
>               match self.check_auth(&parts.headers, &method).await {
>                   Ok((auth_id, _user_info)) => {
> -                    rpcenv.set_auth_id(Some(auth_id));
> +                    rpcenv.set_auth_id(Some(auth_id.clone()));
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(auth_id));
> +                    }
>                       return Ok(self.get_index(rpcenv, parts).await);
>                   }
>                   Err(AuthError::Generic(_)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       tokio::time::sleep_until(Instant::from_std(delay_unauth_time())).await;
>                   }
> -                Err(AuthError::NoData) => {}
> +                Err(AuthError::NoData) =>
> +                {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
> +                }
>               }
>               Ok(self.get_index(rpcenv, parts).await)
>           } else {
> @@ -975,6 +1069,8 @@ pub struct ApiRequestData<'a> {
>       full_path: &'a str,
>       relative_path_components: &'a [&'a str],
>       rpcenv: RestEnvironment,
> +    #[cfg(feature = "rate-limited-stream")]
> +    rate_limit_tags: Option<Arc<Mutex<RateLimiterTags>>>,
>   }
>   
>   pub(crate) struct Formatted {
> @@ -992,6 +1088,8 @@ impl Formatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1026,10 +1124,20 @@ impl Formatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1045,6 +1153,11 @@ impl Formatted {
>                       return Err(err);
>                   }
>               }
> +        } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>           }
>   
>           match api_method {
> @@ -1108,6 +1221,8 @@ impl Unformatted {
>               full_path,
>               relative_path_components,
>               mut rpcenv,
> +            #[cfg(feature = "rate-limited-stream")]
> +            rate_limit_tags,
>           }: ApiRequestData<'_>,
>       ) -> Result<Response<Body>, Error> {
>           if relative_path_components.is_empty() {
> @@ -1133,10 +1248,20 @@ impl Unformatted {
>           if auth_required {
>               match config.check_auth(&parts.headers, &parts.method).await {
>                   Ok((authid, info)) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        let mut guard = handle.lock().unwrap();
> +                        guard.clear();
> +                        guard.push(RateLimiterTag::User(authid.clone()));
> +                    }
>                       rpcenv.set_auth_id(Some(authid));
>                       user_info = info;
>                   }
>                   Err(auth_err) => {
> +                    #[cfg(feature = "rate-limited-stream")]
> +                    if let Some(handle) = rate_limit_tags.as_ref() {
> +                        handle.lock().unwrap().clear();
> +                    }
>                       let err = match auth_err {
>                           AuthError::Generic(err) => err,
>                           AuthError::NoData => {
> @@ -1153,6 +1278,10 @@ impl Unformatted {
>                   }
>               }
>           } else {
> +            #[cfg(feature = "rate-limited-stream")]
> +            if let Some(handle) = rate_limit_tags.as_ref() {
> +                handle.lock().unwrap().clear();
> +            }
>               user_info = Box::new(EmptyUserInformation {});
>           }
>   





More information about the pbs-devel mailing list