[pbs-devel] [PATCH proxmox 3/3] rest-server: add use tag field to RateLimitedStreams
Christian Ebner
c.ebner at proxmox.com
Fri Nov 7 12:12:12 CET 2025
comments inline
On 9/9/25 10:53 AM, Hannes Laimer wrote:
> Similarly to how the IP is attached we also attach the user that is
> authenticated on this connections. Since this is only used for rate
> limiting this is behind the "rate-limited-stream" feature.
>
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> proxmox-rest-server/src/connection.rs | 16 +++++-
> proxmox-rest-server/src/rest.rs | 72 ++++++++++++++++++++++++++-
> 2 files changed, 85 insertions(+), 3 deletions(-)
>
> diff --git a/proxmox-rest-server/src/connection.rs b/proxmox-rest-server/src/connection.rs
> index 9511b7cb..a9c3ccb3 100644
> --- a/proxmox-rest-server/src/connection.rs
> +++ b/proxmox-rest-server/src/connection.rs
> @@ -165,7 +165,10 @@ type InsecureClientStreamResult = Pin<Box<InsecureClientStream>>;
> type ClientStreamResult = Pin<Box<SslStream<InsecureClientStream>>>;
>
> #[cfg(feature = "rate-limited-stream")]
> -type LookupRateLimiter = dyn Fn(std::net::SocketAddr) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> +type LookupRateLimiter = dyn Fn(
> + std::net::SocketAddr,
> + Option<String>,
this could be a simple `Tag` or slice thereof
> + ) -> (Option<SharedRateLimit>, Option<SharedRateLimit>)
> + Send
> + Sync
> + 'static;
> @@ -369,7 +372,16 @@ impl AcceptBuilder {
>
> #[cfg(feature = "rate-limited-stream")]
> let socket = match self.lookup_rate_limiter.clone() {
> - Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move || lookup(peer)),
> + Some(lookup) => {
> + let user_tag: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
> + let user_tag_cb = Arc::clone(&user_tag);
> + let mut s = RateLimitedStream::with_limiter_update_cb(socket, move || {
> + let user = user_tag_cb.lock().unwrap().clone();
> + lookup(peer, user)
> + });
> + s.set_user_tag_handle(user_tag);
> + s
if passed along as parameter to the callback function as suggested in
the previous patch, this would simplify to
```
Some(lookup) => RateLimitedStream::with_limiter_update_cb(socket, move
|tag| {
lookup(peer, tag)
}),
and the set_user_tag_handle() can be obsolete, setting the tag will now
always be done in the constructor or when updated on the rate limited stream
> + }
> None => RateLimitedStream::with_limiter(socket, None, None),
> };
>
> diff --git a/proxmox-rest-server/src/rest.rs b/proxmox-rest-server/src/rest.rs
> index 035a9537..c7d833a2 100644
> --- a/proxmox-rest-server/src/rest.rs
> +++ b/proxmox-rest-server/src/rest.rs
> @@ -86,10 +86,26 @@ impl RestServer {
> }
> }
>
> - pub fn api_service(&self, peer: &dyn PeerAddress) -> Result<ApiService, Error> {
> + #[cfg(not(feature = "rate-limited-stream"))]
> + pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> + where
> + T: PeerAddress + ?Sized,
> + {
> + Ok(ApiService {
> + peer: peer.peer_addr()?,
> + api_config: Arc::clone(&self.api_config),
> + })
> + }
> +
> + #[cfg(feature = "rate-limited-stream")]
> + pub fn api_service<T>(&self, peer: &T) -> Result<ApiService, Error>
> + where
> + T: PeerAddress + PeerUser + ?Sized,
> + {
> Ok(ApiService {
> peer: peer.peer_addr()?,
> api_config: Arc::clone(&self.api_config),
> + user_tag: peer.user_tag_handle(),
> })
> }
> }
> @@ -185,6 +201,11 @@ pub trait PeerAddress {
> fn peer_addr(&self) -> Result<std::net::SocketAddr, Error>;
> }
>
> +#[cfg(feature = "rate-limited-stream")]
> +pub trait PeerUser {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>>;
> +}
> +
> // tokio_openssl's SslStream requires the stream to be pinned in order to accept it, and we need to
> // accept before the peer address is requested, so let's just generally implement this for
> // Pin<Box<T>>
> @@ -221,6 +242,41 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> }
> }
>
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerUser> PeerUser for Pin<Box<T>> {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + T::user_tag_handle(&**self)
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T: PeerUser> PeerUser for tokio_openssl::SslStream<T> {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + self.get_ref().user_tag_handle()
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerUser for tokio::net::TcpStream {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + None
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl PeerUser for tokio::net::UnixStream {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + None
> + }
> +}
> +
> +#[cfg(feature = "rate-limited-stream")]
> +impl<T> PeerUser for proxmox_http::RateLimitedStream<T> {
> + fn user_tag_handle(&self) -> Option<Arc<Mutex<Option<String>>>> {
> + self.user_tag_handle()
> + }
> +}
> +
> // Helper [Service] containing the peer Address
> //
> // The lower level connection [Service] implementation on
> @@ -233,6 +289,8 @@ impl<T: PeerAddress> PeerAddress for proxmox_http::RateLimitedStream<T> {
> pub struct ApiService {
> pub peer: std::net::SocketAddr,
> pub api_config: Arc<ApiConfig>,
> + #[cfg(feature = "rate-limited-stream")]
> + pub user_tag: Option<Arc<Mutex<Option<String>>>>,
> }
>
> impl ApiService {
> @@ -357,6 +415,8 @@ impl Service<Request<Incoming>> for ApiService {
> Some(proxied_peer) => proxied_peer,
> None => self.peer,
> };
> + #[cfg(feature = "rate-limited-stream")]
> + let user_tag = self.user_tag.clone();
>
> let header = self.api_config
> .auth_cookie_name
> @@ -394,6 +454,16 @@ impl Service<Request<Incoming>> for ApiService {
> }
> }
>
> + #[cfg(feature = "rate-limited-stream")]
> + {
> + if let Some(handle) = user_tag {
> + if let Some(ext) = response.extensions().get::<AuthStringExtension>() {
> + let mut guard = handle.lock().unwrap();
> + *guard = Some(ext.0.clone());
> + }
so this is were the tag is set. This is however only after the response
is generated. Maybe we could pass this as callback method to the
`handle_request` which internally also does the auth, so it can be set
there already?
> + }
> + }
> +
> let logger = config.get_access_log();
> log_response(logger, &peer, method, &path, &response, user_agent);
> Ok(response)
More information about the pbs-devel
mailing list