[pdm-devel] [PATCH datacenter-manager 1/7] server: generic multi-client wrapper

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Feb 4 10:55:48 CET 2025


We'll use this to instantiate multiple clients for a pve cluster.
Unfortunately we cannot transparently just "connect to different
nodes" on the connection layer, since different remotes may have
different certificate fingerprints and may (if eg. a reverse proxy is
used) require different `Host:` headers, therefore the entire request
needs to be recreated when we need to switch servers.

Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
---
 server/src/connection.rs | 284 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 270 insertions(+), 14 deletions(-)

diff --git a/server/src/connection.rs b/server/src/connection.rs
index 0adeba2..767a2f9 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,14 +3,21 @@
 //! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
 //! instance before calling any of the provided functions.
 
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex as StdMutex;
 use std::sync::OnceLock;
+use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
 use http::uri::Authority;
+use http::Method;
+use serde::Serialize;
 
-use proxmox_client::{Client, TlsOptions};
+use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
 
-use pdm_api_types::remotes::{Remote, RemoteType};
+use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
 use pve_api_types::client::{PveClient, PveClientImpl};
 
 use crate::pbs_client::PbsClient;
@@ -41,6 +48,26 @@ fn prepare_connect_client(
             Some(endpoint) => format_err!("{endpoint} not configured for remote"),
             None => format_err!("no nodes configured for remote"),
         })?;
+
+    let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+        RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+        RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+    };
+
+    let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+
+    Ok(ConnectInfo {
+        client,
+        prefix,
+        perl_compat,
+    })
+}
+
+fn prepare_connect_client_to_node(
+    node: &NodeUrl,
+    default_port: u16,
+    pve_compat: bool,
+) -> Result<Client, Error> {
     let mut options = TlsOptions::default();
 
     if let Some(fp) = &node.fingerprint {
@@ -49,11 +76,6 @@ fn prepare_connect_client(
 
     let host_port: Authority = node.hostname.parse()?;
 
-    let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
-        RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
-        RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
-    };
-
     let uri: http::uri::Uri = format!(
         "https://{}:{}",
         host_port.host(),
@@ -64,12 +86,7 @@ fn prepare_connect_client(
     let mut client =
         proxmox_client::Client::with_options(uri.clone(), options, Default::default())?;
     client.set_pve_compatibility(pve_compat);
-
-    Ok(ConnectInfo {
-        client,
-        prefix,
-        perl_compat,
-    })
+    Ok(client)
 }
 
 /// Constructs a [`Client`] for the given [`Remote`] for an API token
@@ -92,6 +109,33 @@ fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, any
     Ok(client)
 }
 
+/// Like [`connect()`], but for remotes which have multiple clients.
+fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
+    let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+        RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+        RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+    };
+
+    let mut clients = Vec::new();
+
+    for node in &remote.nodes {
+        let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+        client.set_authentication(proxmox_client::Token {
+            userid: remote.authid.to_string(),
+            prefix: prefix.clone(),
+            value: remote.token.to_string(),
+            perl_compat,
+        });
+        clients.push(Arc::new(client));
+    }
+
+    if clients.is_empty() {
+        bail!("no nodes configured for remote");
+    }
+
+    Ok(MultiClient::new(clients))
+}
+
 /// Constructs a [`Client`] for the given [`Remote`] for an API token or user
 ///
 /// In case the remote has a user configured (instead of an API token), it will connect and get a
@@ -183,7 +227,7 @@ pub struct DefaultClientFactory;
 #[async_trait::async_trait]
 impl ClientFactory for DefaultClientFactory {
     fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
-        let client = crate::connection::connect(remote, None)?;
+        let client = crate::connection::multi_connect(remote)?;
         Ok(Box::new(PveClientImpl(client)))
     }
 
@@ -279,3 +323,215 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
         panic!("connection factory instance already set");
     }
 }
+
+/// This is another wrapper around the actual HTTP client responsible for dealing with connection
+/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on
+/// another node.
+///
+/// # Possible improvements
+///
+/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
+/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on
+///   the last rrd polling we'd start with a different one.
+///   For this, we still need to include the node names in the clients here somehow so that we can
+///   actually manage/re-shuffle them from the outside after this struct is already created.
+struct MultiClient {
+    state: StdMutex<MultiClientState>,
+    timeout: Duration,
+}
+
+impl MultiClient {
+    fn new(clients: Vec<Arc<Client>>) -> Self {
+        Self {
+            state: StdMutex::new(MultiClientState::new(clients)),
+            timeout: Duration::from_secs(60),
+        }
+    }
+}
+
+/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using
+/// right now.
+struct MultiClientState {
+    /// The current index *not* modulo the client count.
+    current: usize,
+    clients: Vec<Arc<Client>>,
+}
+
+impl MultiClientState {
+    fn new(clients: Vec<Arc<Client>>) -> Self {
+        Self {
+            current: 0,
+            clients,
+        }
+    }
+
+    /// Whenever a request fails with the *current* client we move the current entry forward.
+    ///
+    /// # Note:
+    ///
+    /// With our current strategy `failed_index` is always less than `current`, but if we change
+    /// the strategy, we may want to change this to something like `1 + max(current, failed)`.
+    fn failed(&mut self, failed_index: usize) {
+        if self.current == failed_index {
+            self.current = self.current.wrapping_add(1);
+        }
+    }
+
+    /// Get `current` as an *index* (i.e. modulo `clients.len()`).
+    fn index(&self) -> usize {
+        self.current % self.clients.len()
+    }
+
+    /// Get the current client and its index which can be passed to `failed()` if the client fails
+    /// to connect.
+    fn get(&self) -> (Arc<Client>, usize) {
+        let index = self.index();
+        (Arc::clone(&self.clients[index]), self.current)
+    }
+
+    /// Check if we already tried all clients since a specific starting index.
+    ///
+    /// When an API request is made we loop through the possible clients.
+    /// Since multiple requests might be running simultaneously, it's possible that multiple tasks
+    /// mark the same *or* *multiple* clients as failed already.
+    ///
+    /// We don't want to try clients which we know are currently non-functional, so a
+    /// request-retry-loop will fail as soon as the same *number* of clients since its starting
+    /// point were marked as faulty without retrying them all.
+    fn tried_all_since(&self, start: usize) -> bool {
+        self.tried_clients(start) >= self.clients.len()
+    }
+
+    /// We store the current index continuously without wrapping it modulo the client count (and
+    /// only do that when indexing the `clients` array), so that we can easily check if "all
+    /// currently running tasks taken together" have already tried all clients by comparing our
+    /// starting point to the current index.
+    fn tried_clients(&self, start: usize) -> usize {
+        self.current.wrapping_sub(start)
+    }
+}
+
+impl MultiClient {
+    /// This is the client usage strategy.
+    ///
+    /// This is basically a "generator" for clients to try.
+    ///
+    /// We share the "state" with other tasks. When a client fails, it is "marked" as failed and
+    /// the state "rotates" through the clients.
+    /// We might be skipping clients if other tasks already tried "more" clients, but that's fine,
+    /// since there's no point in trying the same remote twice simultaneously if it is currently
+    /// offline...
+    fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ {
+        let mut start_current = None;
+        let state = &self.state;
+        std::iter::from_fn(move || {
+            let mut state = state.lock().unwrap();
+            match start_current {
+                None => {
+                    // first attempt, just use the current client and remember the starting index
+                    let (client, index) = state.get();
+                    start_current = Some((index, index));
+                    Some(client)
+                }
+                Some((start, current)) => {
+                    // If our last request failed, the retry-loop asks for another client, mark the
+                    // one we just used as failed and check if all clients have gone through a
+                    // retry loop...
+                    state.failed(current);
+                    if state.tried_all_since(start) {
+                        // This iterator (and therefore this retry-loop) has tried all clients.
+                        // Give up.
+                        return None;
+                    }
+                    // finally just get the new current client and update `current` for the later
+                    // call to `failed()`
+                    let (client, current) = state.get();
+                    start_current = Some((start, current));
+                    Some(client)
+                }
+            }
+        })
+        .fuse()
+    }
+}
+
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+    ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+        let params = $params.map(serde_json::to_value);
+        Box::pin(async move {
+            let params = params
+                .transpose()
+                .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+
+            let mut last_err = None;
+            let mut timed_out = false;
+            // The iterator in use here will automatically mark a client as faulty if we move on to
+            // the `next()` one.
+            for client in $self.try_clients() {
+                if let Some(err) = last_err.take() {
+                    log::error!("API client error, trying another remote - {err:?}");
+                }
+                if timed_out {
+                    timed_out = false;
+                    log::error!("API client timed out, trying another remote");
+                }
+
+                let request = client.$how($method.clone(), $path_and_query, params.as_ref());
+                match tokio::time::timeout($self.timeout, request).await {
+                    Ok(Err(proxmox_client::Error::Client(err))) => {
+                        last_err = Some(err);
+                    }
+                    Ok(result) => return result,
+                    Err(_) => {
+                        timed_out = true;
+                    }
+                }
+            }
+
+            Err(proxmox_client::Error::Other(
+                "failed to perform API request",
+            ))
+        })
+    };
+}
+
+impl HttpApiClient for MultiClient {
+    type ResponseFuture<'a> =
+        Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+
+    type ResponseStreamFuture<'a> = Pin<
+        Box<
+            dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+                + Send
+                + 'a,
+        >,
+    >;
+    type Body = hyper::Body;
+
+    fn request<'a, T>(
+        &'a self,
+        method: Method,
+        path_and_query: &'a str,
+        params: Option<T>,
+    ) -> Self::ResponseFuture<'a>
+    where
+        T: Serialize + 'a,
+    {
+        try_request! { self, method, path_and_query, params, request }
+    }
+
+    fn streaming_request<'a, T>(
+        &'a self,
+        method: Method,
+        path_and_query: &'a str,
+        params: Option<T>,
+    ) -> Self::ResponseStreamFuture<'a>
+    where
+        T: Serialize + 'a,
+    {
+        try_request! { self, method, path_and_query, params, streaming_request }
+    }
+}
-- 
2.39.5





More information about the pdm-devel mailing list