[pbs-devel] [PATCH proxmox 1/7] Implement a rate limiting stream (AsyncRead, AsyncWrite)
Dietmar Maurer
dietmar at proxmox.com
Tue Nov 9 07:52:38 CET 2021
Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
proxmox-http/src/client/mod.rs | 6 +
.../src/client/rate_limited_stream.rs | 144 ++++++++++++++++++
proxmox-http/src/client/rate_limiter.rs | 75 +++++++++
3 files changed, 225 insertions(+)
create mode 100644 proxmox-http/src/client/rate_limited_stream.rs
create mode 100644 proxmox-http/src/client/rate_limiter.rs
diff --git a/proxmox-http/src/client/mod.rs b/proxmox-http/src/client/mod.rs
index b6ee4b0..30e66d5 100644
--- a/proxmox-http/src/client/mod.rs
+++ b/proxmox-http/src/client/mod.rs
@@ -2,6 +2,12 @@
//!
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
+mod rate_limiter;
+pub use rate_limiter::RateLimiter;
+
+mod rate_limited_stream;
+pub use rate_limited_stream::RateLimitedStream;
+
mod connector;
pub use connector::HttpsConnector;
diff --git a/proxmox-http/src/client/rate_limited_stream.rs b/proxmox-http/src/client/rate_limited_stream.rs
new file mode 100644
index 0000000..a11b59e
--- /dev/null
+++ b/proxmox-http/src/client/rate_limited_stream.rs
@@ -0,0 +1,144 @@
+use std::pin::Pin;
+use std::marker::Unpin;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::Future;
+use tokio::io::{ReadBuf, AsyncRead, AsyncWrite};
+use tokio::time::Sleep;
+
+use std::task::{Context, Poll};
+
+use super::RateLimiter;
+
+/// A rate limited stream using [RateLimiter]
+pub struct RateLimitedStream<S> {
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ read_delay: Option<Pin<Box<Sleep>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_delay: Option<Pin<Box<Sleep>>>,
+ stream: S,
+}
+
+impl <S> RateLimitedStream<S> {
+
+ const MIN_DELAY: Duration = Duration::from_millis(20);
+
+ /// Creates a new instance with reads and writes limited to the same `rate`.
+ pub fn new(stream: S, rate: u64, bucket_size: u64) -> Self {
+ let now = Instant::now();
+ let read_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ let write_limiter = Arc::new(Mutex::new(RateLimiter::with_start_time(rate, bucket_size, now)));
+ Self::with_limiter(stream, Some(read_limiter), Some(write_limiter))
+ }
+
+ /// Creates a new instance with specified [RateLimiters] for reads and writes.
+ pub fn with_limiter(
+ stream: S,
+ read_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ write_limiter: Option<Arc<Mutex<RateLimiter>>>,
+ ) -> Self {
+ Self {
+ read_limiter,
+ read_delay: None,
+ write_limiter,
+ write_delay: None,
+ stream,
+ }
+ }
+}
+
+impl <S: AsyncWrite + Unpin> AsyncWrite for RateLimitedStream<S> {
+
+ fn poll_write(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.write_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.write_delay = None;
+
+ let result = Pin::new(&mut this.stream).poll_write(ctx, buf);
+
+ if let Some(ref write_limiter) = this.write_limiter {
+ if let Poll::Ready(Ok(count)) = &result {
+ let now = Instant::now();
+ let delay = write_limiter.lock().unwrap()
+ .register_traffic(now, *count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.write_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_flush(ctx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+ Pin::new(&mut this.stream).poll_shutdown(ctx)
+ }
+}
+
+impl <S: AsyncRead + Unpin> AsyncRead for RateLimitedStream<S> {
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ ctx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ let this = self.get_mut();
+
+ let is_ready = match this.read_delay {
+ Some(ref mut future) => {
+ future.as_mut().poll(ctx).is_ready()
+ }
+ None => true,
+ };
+
+ if !is_ready { return Poll::Pending; }
+
+ this.read_delay = None;
+
+ let filled_len = buf.filled().len();
+ let result = Pin::new(&mut this.stream).poll_read(ctx, buf);
+
+ if let Some(ref read_limiter) = this.read_limiter {
+ if let Poll::Ready(Ok(())) = &result {
+ let count = buf.filled().len() - filled_len;
+ let now = Instant::now();
+ let delay = read_limiter.lock().unwrap()
+ .register_traffic(now, count as u64);
+ if delay >= Self::MIN_DELAY {
+ let sleep = tokio::time::sleep(delay);
+ this.read_delay = Some(Box::pin(sleep));
+ }
+ }
+ }
+
+ result
+ }
+
+}
diff --git a/proxmox-http/src/client/rate_limiter.rs b/proxmox-http/src/client/rate_limiter.rs
new file mode 100644
index 0000000..4742387
--- /dev/null
+++ b/proxmox-http/src/client/rate_limiter.rs
@@ -0,0 +1,75 @@
+use std::time::{Duration, Instant};
+use std::convert::TryInto;
+
+/// Token bucket based rate limiter
+pub struct RateLimiter {
+ rate: u64, // tokens/second
+ start_time: Instant,
+ traffic: u64, // overall traffic
+ bucket_size: u64,
+ last_update: Instant,
+ consumed_tokens: u64,
+}
+
+impl RateLimiter {
+
+ const NO_DELAY: Duration = Duration::from_millis(0);
+
+ /// Creates a new instance, using [Instant::now] as start time.
+ pub fn new(rate: u64, bucket_size: u64) -> Self {
+ let start_time = Instant::now();
+ Self::with_start_time(rate, bucket_size, start_time)
+ }
+
+ /// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
+ pub fn with_start_time(rate: u64, bucket_size: u64, start_time: Instant) -> Self {
+ Self {
+ rate,
+ start_time,
+ traffic: 0,
+ bucket_size,
+ last_update: start_time,
+ // start with empty bucket (all tokens consumed)
+ consumed_tokens: bucket_size,
+ }
+ }
+
+ /// Returns the average rate (since `start_time`)
+ pub fn average_rate(&self, current_time: Instant) -> f64 {
+ let time_diff = (current_time - self.start_time).as_secs_f64();
+ if time_diff <= 0.0 {
+ 0.0
+ } else {
+ (self.traffic as f64) / time_diff
+ }
+ }
+
+ fn refill_bucket(&mut self, current_time: Instant) {
+ let time_diff = (current_time - self.last_update).as_nanos();
+
+ if time_diff <= 0 {
+ //log::error!("update_time: got negative time diff");
+ return;
+ }
+
+ self.last_update = current_time;
+
+ let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
+ .try_into().unwrap_or(u64::MAX);
+
+ self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
+ }
+
+ /// Register traffic, returning a proposed delay to reach the expected rate.
+ pub fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
+ self.refill_bucket(current_time);
+
+ self.traffic += data_len;
+ self.consumed_tokens += data_len;
+
+ if self.consumed_tokens <= self.bucket_size {
+ return Self::NO_DELAY;
+ }
+ Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
+ }
+}
--
2.30.2
More information about the pbs-devel
mailing list