[pbs-devel] [PATCH proxmox 3/3] rest-server: add use tag field to RateLimitedStreams

Christian Ebner c.ebner at proxmox.com
Fri Nov 7 12:12:12 CET 2025


comments inline

On 9/9/25 10:53 AM, Hannes Laimer wrote:
> Similarly to how the IP is attached we also attach the user that is
> authenticated on this connections. Since this is only used for rate
> limiting this is behind the "rate-limited-stream" feature.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
>   proxmox-rest-server/src/connection.rs | 16 +++++-
>   proxmox-rest-server/src/rest.rs       | 72 ++++++++++++++++++++++++++-
>   2 files changed, 85 insertions(+), 3 deletions(-)
> 
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..a9c3ccb3 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
>   type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>   
>   #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> +        std::net::SocketAddr,
> +        Option<String>,

this could be a simple `Tag` or slice thereof

> +    ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
>       + Send
>       + Sync
>       + 'static;
> @@ -369,7 +372,16 @@ impl AcceptBuilder {
>   
>           #[cfg(feature = "rate-limited-stream")]
>           let socket = match self.lookup_rate_limiter.clone() {
> -            Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> +            Some(lookup) => {
> +                let user_tag: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
> +                let user_tag_cb = Arc::clone(&user_tag);
> +                let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || {
> +                    let user = user_tag_cb.lock().unwrap().clone();
> +                    lookup(peer, user)
> +                });
> +                s.set_user_tag_handle(user_tag);
> +                s

if passed along as parameter to the callback function as suggested in 
the previous patch, this would simplify to

```
Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move 
|tag| {
     lookup(peer, tag)
}),

and the set_user_tag_handle() can be obsolete, setting the tag will now 
always be done in the constructor or when updated on the rate limited stream

> +            }
>               None => RateLimitedStream::with_limiter(socket, None, None),
>           };
>   
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index 035a9537..c7d833a2 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -86,10 +86,26 @@ impl RestServer {
>           }
>       }
>   
> -    pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> +    #[cfg(not(feature = "rate-limited-stream"))]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + ?Sized,
> +    {
> +        Ok(ApiService {
> +            peer: peer.peer_addr()?,
> +            api_config: Arc::clone(&self.api_config),
> +        })
> +    }
> +
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> +    where
> +        T: PeerAddress + PeerUser + ?Sized,
> +    {
>           Ok(ApiService {
>               peer: peer.peer_addr()?,
>               api_config: Arc::clone(&self.api_config),
> +            user_tag: peer.user_tag_handle(),
>           })
>       }
>   }
> @@ -185,6 +201,11 @@ pub trait PeerAddress {
>       fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerUser {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>>;
> +}
> +
>   // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
>   // accept before the peer address is requested, so let's just generally implement this for
>   // Pin<Box<T>>
> @@ -221,6 +242,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>       }
>   }
>   
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerUser> PeerUser for Pin<Box<T>> {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        T::user_tag_handle(&**self)
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerUser> PeerUser for tokio_openssl::SslStream<T> {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        self.get_ref().user_tag_handle()
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerUser for tokio::net::TcpStream {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerUser for tokio::net::UnixStream {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        None
> +    }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerUser for proxmox_http::RateLimitedStream<T> {
> +    fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> +        self.user_tag_handle()
> +    }
> +}
> +
>   // Helper [Service] containing the peer Address
>   //
>   // The lower level connection [Service] implementation on
> @@ -233,6 +289,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
>   pub struct ApiService {
>       pub peer: std::net::SocketAddr,
>       pub api_config: Arc<ApiConfig>,
> +    #[cfg(feature = "rate-limited-stream")]
> +    pub user_tag: Option<Arc<Mutex<Option<String>>>>,
>   }
>   
>   impl ApiService {
> @@ -357,6 +415,8 @@ impl Service<Request<Incoming>> for ApiService {
>               Some(proxied_peer) => proxied_peer,
>               None => self.peer,
>           };
> +        #[cfg(feature = "rate-limited-stream")]
> +        let user_tag = self.user_tag.clone();
>   
>           let header = self.api_config
>               .auth_cookie_name
> @@ -394,6 +454,16 @@ impl Service<Request<Incoming>> for ApiService {
>                   }
>               }
>   
> +            #[cfg(feature = "rate-limited-stream")]
> +            {
> +                if let Some(handle) = user_tag {
> +                    if let Some(ext) = response.extensions().get::<AuthStringExtension>() {
> +                        let mut guard = handle.lock().unwrap();
> +                        *guard = Some(ext.0.clone());
> +                    }

so this is were the tag is set. This is however only after the response 
is generated. Maybe we could pass this as callback method to the 
`handle_request` which internally also does the auth, so it can be set 
there already?

> +                }
> +            }
> +
>               let logger = config.get_access_log();
>               log_response(logger, &peer, method, &path, &response, user_agent);
>               Ok(response)





More information about the pbs-devel mailing list