[pbs-devel] [PATCH proxmox-backup 8/9] implement a traffic control cache for fast rate control limiter lockups

Dietmar Maurer dietmar at proxmox.com
Tue Nov 9 07:52:52 CET 2021


Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
 Cargo.toml                    |   1 +
 src/cached_traffic_control.rs | 240 ++++++++++++++++++++++++++++++++++
 src/lib.rs                    |   3 +
 3 files changed, 244 insertions(+)
 create mode 100644 src/cached_traffic_control.rs

diff --git a/Cargo.toml b/Cargo.toml
index 0f163d65..ac0983b9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,7 @@ apt-pkg-native = "0.3.2"
 base64 = "0.12"
 bitflags = "1.2.1"
 bytes = "1.0"
+cidr = "0.2.1"
 crc32fast = "1"
 endian_trait = { version = "0.6", features = ["arrays"] }
 env_logger = "0.7"
diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs
new file mode 100644
index 00000000..5a7f46da
--- /dev/null
+++ b/src/cached_traffic_control.rs
@@ -0,0 +1,240 @@
+//! Cached traffic control configuration
+use std::sync::{Arc, Mutex};
+use std::collections::HashMap;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
+
+use anyhow::Error;
+use cidr::IpInet;
+
+use proxmox_http::client::RateLimiter;
+use proxmox_time::epoch_i64;
+
+use proxmox_systemd::daily_duration::parse_daily_duration;
+
+use pbs_api_types::TrafficControlRule;
+
+use pbs_config::memcom::Memcom;
+
+pub struct TrafficControlCache {
+    last_update: i64,
+    last_traffic_control_generation: usize,
+    rules: Vec<(TrafficControlRule, Vec<IpInet>)>,
+    limiter_map: HashMap<String, (Arc<Mutex<RateLimiter>>, Arc<Mutex<RateLimiter>>)>,
+}
+
+fn timeframe_match(
+    duration_list: &[String],
+    now: i64,
+) -> Result<bool, Error> {
+
+    for duration_str in duration_list.iter() {
+        let duration = parse_daily_duration(duration_str)?;
+        if duration.time_match(now, false)? {
+            return Ok(true);
+        }
+    }
+
+    Ok(false)
+}
+
+fn network_match_len(
+    networks: &[IpInet],
+    ip: &IpAddr,
+) -> Option<u8> {
+
+    let mut match_len = None;
+
+    for cidr in networks.iter() {
+        if cidr.contains(ip) {
+            let network_length = cidr.network_length();
+            match match_len {
+                Some(len) => {
+                    if network_length > len {
+                        match_len = Some(network_length);
+                    }
+                }
+                None => match_len = Some(network_length),
+            }
+        }
+    }
+    match_len
+}
+
+fn cannonical_ip(ip: IpAddr) -> IpAddr {
+    // TODO: use std::net::IpAddr::to_cananical once stable
+    match ip {
+        IpAddr::V4(addr) => IpAddr::V4(addr),
+        IpAddr::V6(addr) => {
+            match addr.octets() {
+                [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
+                    IpAddr::V4(Ipv4Addr::new(a, b, c, d))
+                }
+                _ => IpAddr::V6(addr),
+            }
+        }
+    }
+}
+
+impl TrafficControlCache {
+
+    pub fn new() -> Self {
+        Self {
+            rules: Vec::new(),
+            limiter_map: HashMap::new(),
+            last_traffic_control_generation: 0,
+            last_update: 0,
+        }
+    }
+
+    pub fn reload(&mut self) {
+        let now = epoch_i64();
+
+        let memcom = match Memcom::new() {
+            Ok(memcom) => memcom,
+            Err(err) => {
+                log::error!("TrafficControlCache::reload failed in Memcom::new: {}", err);
+                return;
+            }
+        };
+        
+        let traffic_control_generation = memcom.traffic_control_generation();
+
+        if (self.last_update != 0) &&
+            (traffic_control_generation == self.last_traffic_control_generation) &&
+            ((now - self.last_update) < 60) { return; }
+
+        log::debug!("reload traffic control rules");
+
+        self.last_traffic_control_generation = traffic_control_generation;
+        self.last_update = now;
+
+        match self.reload_impl() {
+            Ok(()) => (),
+            Err(err) => {
+                log::error!("TrafficControlCache::reload failed -> {}", err);
+            }
+        }
+    }
+    
+    fn reload_impl(&mut self) -> Result<(), Error> {
+        let (config, _) = pbs_config::traffic_control::config()?;
+
+        self.limiter_map.retain(|key, _value| config.sections.contains_key(key));
+
+        let rules: Vec<TrafficControlRule> =
+            config.convert_to_typed_array("rule")?;
+
+        let now = proxmox_time::epoch_i64();
+
+        let mut active_rules = Vec::new();
+
+        for rule in rules {
+            if let Some(ref timeframe) = rule.config.timeframe {
+                if timeframe_match(timeframe, now)? {
+                    self.limiter_map.remove(&rule.name);
+                    continue;
+                }
+            }
+
+            let rate = rule.config.rate;
+            let burst = rule.config.burst.unwrap_or(rate);
+
+            if let Some(limiter) = self.limiter_map.get(&rule.name) {
+                limiter.0.lock().unwrap().update_rate(rate, burst);
+                limiter.1.lock().unwrap().update_rate(rate, burst);
+            } else {
+
+                let read_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+                let write_limiter = Arc::new(Mutex::new(RateLimiter::new(rate, burst)));
+
+                self.limiter_map.insert(
+                    rule.name.clone(),
+                    (read_limiter, write_limiter),
+                );
+            }
+
+            let mut networks = Vec::new();
+
+            for network in rule.config.network.iter() {
+                let cidr = match network.parse() {
+                    Ok(cidr) => cidr,
+                    Err(err) => {
+                        log::error!("unable to parse network '{}' - {}", network, err);
+                        continue;
+                    }
+                };
+                networks.push(cidr);
+            }
+            
+            active_rules.push((rule, networks));
+        }
+
+        self.rules = active_rules;
+
+        Ok(())
+    }
+
+    pub fn lookup_rate_limiter(
+        &self,
+        peer: Option<SocketAddr>,
+    ) -> (Option<Arc<Mutex<RateLimiter>>>, Option<Arc<Mutex<RateLimiter>>>) {
+
+        let peer = match peer {
+            None => return (None, None),
+            Some(peer) => peer,
+        };
+
+        let peer_ip = cannonical_ip(peer.ip());
+
+        log::debug!("lookup_rate_limiter {} {:?}", peer_ip.is_ipv4(),  peer_ip);
+
+        let mut last_rule_match = None;
+
+        for (rule, networks) in self.rules.iter() {
+            if let Some(match_len) = network_match_len(networks, &peer_ip) {
+                match last_rule_match {
+                    None => last_rule_match = Some((rule, match_len)),
+                    Some((_, last_len)) => {
+                        if match_len > last_len {
+                            last_rule_match = Some((rule, match_len));
+                        }
+                    }
+                }
+            }
+        }
+       
+        match last_rule_match {
+            Some((rule, _)) => {
+                match self.limiter_map.get(&rule.name) {
+                    Some((read_limiter, write_limiter)) => {
+                        (Some(Arc::clone(read_limiter)), Some(Arc::clone(write_limiter)))
+                    }
+                    None => (None, None), // should never happen
+                }
+            }
+            None => (None, None),
+        }
+    }
+}
+
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn testnetwork_match() -> Result<(), Error> {
+
+        let networks = ["192.168.2.1/24", "127.0.0.0/8"];
+        let networks: Vec<IpInet> = networks.iter().map(|n| n.parse().unwrap()).collect();
+
+        assert_eq!(network_match_len(&networks, &"192.168.2.1".parse()?), Some(24));
+        assert_eq!(network_match_len(&networks, &"192.168.2.254".parse()?), Some(24));
+        assert_eq!(network_match_len(&networks, &"192.168.3.1".parse()?), None);
+        assert_eq!(network_match_len(&networks, &"127.1.1.0".parse()?), Some(8));
+        assert_eq!(network_match_len(&networks, &"128.1.1.0".parse()?), None);
+
+        Ok(())
+        
+    }    
+}
diff --git a/src/lib.rs b/src/lib.rs
index 5f6d5e7e..8f5ed245 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -33,6 +33,9 @@ pub mod client_helpers;
 
 pub mod rrd_cache;
 
+mod cached_traffic_control;
+pub use cached_traffic_control::TrafficControlCache;
+
 /// Get the server's certificate info (from `proxy.pem`).
 pub fn cert_info() -> Result<CertInfo, anyhow::Error> {
     CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem")))
-- 
2.30.2






More information about the pbs-devel mailing list