[pbs-devel] [PATCH proxmox 3/3] HttpsConnector: use RateLimitedStream

Dietmar Maurer dietmar at proxmox.com
Wed Nov 3 13:42:47 CET 2021


So that we can limit used bandwidth.

Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
 proxmox-http/src/client/connector.rs          | 51 ++++++++++++++++---
 .../src/client/rate_limited_stream.rs         |  8 +++
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git a/proxmox-http/src/client/connector.rs b/proxmox-http/src/client/connector.rs
index acbb992..71704d5 100644
--- a/proxmox-http/src/client/connector.rs
+++ b/proxmox-http/src/client/connector.rs
@@ -1,14 +1,14 @@
 use anyhow::{bail, format_err, Error};
 use std::os::unix::io::AsRawFd;
 use std::pin::Pin;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use futures::*;
 use http::Uri;
 use hyper::client::HttpConnector;
 use openssl::ssl::SslConnector;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use tokio::net::TcpStream;
 use tokio_openssl::SslStream;
 
@@ -18,12 +18,16 @@ use crate::proxy_config::ProxyConfig;
 use crate::tls::MaybeTlsStream;
 use crate::uri::build_authority;
 
+use super::{RateLimiter, RateLimitedStream};
+
 #[derive(Clone)]
 pub struct HttpsConnector {
     connector: HttpConnector,
     ssl_connector: Arc<SslConnector>,
     proxy: Option<ProxyConfig>,
     tcp_keepalive: u32,
+    read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+    write_limiter: Option<Arc<Mutex<RateLimiter>>>,
 }
 
 impl HttpsConnector {
@@ -38,6 +42,8 @@ impl HttpsConnector {
             ssl_connector: Arc::new(ssl_connector),
             proxy: None,
             tcp_keepalive,
+            read_limiter: None,
+            write_limiter: None,
         }
     }
 
@@ -45,13 +51,21 @@ impl HttpsConnector {
         self.proxy = Some(proxy);
     }
 
-    async fn secure_stream(
-        tcp_stream: TcpStream,
+    pub fn set_read_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.read_limiter = limiter;
+    }
+
+    pub fn set_write_limiter(&mut self, limiter: Option<Arc<Mutex<RateLimiter>>>) {
+        self.write_limiter = limiter;
+    }
+
+    async fn secure_stream<S: AsyncRead + AsyncWrite + Unpin>(
+        tcp_stream: S,
         ssl_connector: &SslConnector,
         host: &str,
-    ) -> Result<MaybeTlsStream<TcpStream>, Error> {
+    ) -> Result<MaybeTlsStream<S>, Error> {
         let config = ssl_connector.configure()?;
-        let mut conn: SslStream<TcpStream> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
+        let mut conn: SslStream<S> = SslStream::new(config.into_ssl(host)?, tcp_stream)?;
         Pin::new(&mut conn).connect().await?;
         Ok(MaybeTlsStream::Secured(conn))
     }
@@ -107,7 +121,7 @@ impl HttpsConnector {
 }
 
 impl hyper::service::Service<Uri> for HttpsConnector {
-    type Response = MaybeTlsStream<TcpStream>;
+    type Response = MaybeTlsStream<RateLimitedStream<TcpStream>>;
     type Error = Error;
     #[allow(clippy::type_complexity)]
     type Future =
@@ -129,6 +143,9 @@ impl hyper::service::Service<Uri> for HttpsConnector {
         };
         let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 });
         let keepalive = self.tcp_keepalive;
+        let read_limiter = self.read_limiter.clone();
+        let write_limiter = self.write_limiter.clone();
+
 
         if let Some(ref proxy) = self.proxy {
             let use_connect = is_https || proxy.force_connect;
@@ -152,12 +169,18 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
             if use_connect {
                 async move {
-                    let mut tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
+                    let tcp_stream = connector.call(proxy_uri).await.map_err(|err| {
                         format_err!("error connecting to {} - {}", proxy_authority, err)
                     })?;
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let mut tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port);
                     if let Some(authorization) = authorization {
                         connect_request
@@ -185,6 +208,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                     let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                    let tcp_stream = RateLimitedStream::with_limiter(
+                        tcp_stream,
+                        read_limiter,
+                        write_limiter,
+                    );
+
                     Ok(MaybeTlsStream::Proxied(tcp_stream))
                 }
                 .boxed()
@@ -199,6 +228,12 @@ impl hyper::service::Service<Uri> for HttpsConnector {
 
                 let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive);
 
+                let tcp_stream = RateLimitedStream::with_limiter(
+                    tcp_stream,
+                    read_limiter,
+                    write_limiter,
+                );
+
                 if is_https {
                     Self::secure_stream(tcp_stream, &ssl_connector, &host).await
                 } else {
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
index 8b4123f..d21f55c 100644
--- a/proxmox-http/src/client/rate_limited_stream.rs
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -7,6 +7,7 @@ use std::io::IoSlice;
 use futures::Future;
 use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
 use tokio::time::Sleep;
+use hyper::client::connect::{Connection, Connected};
 
 use std::task::{Context, Poll};
 
@@ -174,3 +175,10 @@ impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
     }
 
 }
+
+// we need this for the hyper http client
+impl<S: Connection + AsyncRead + AsyncWrite + Unpin> Connection for RateLimitedStream<S> {
+    fn connected(&self) -> Connected {
+        self.stream.connected()
+    }
+}
-- 
2.30.2






More information about the pbs-devel mailing list