[pdm-devel] [PATCH v2 datacenter-manager 1/7] server: generic multi-client wrapper
Wolfgang Bumiller
w.bumiller at proxmox.com
Wed Mar 5 16:01:02 CET 2025
We'll use this to instantiate multiple clients for a pve cluster.
Unfortunately we cannot transparently just "connect to different
nodes" on the connection layer, since different remotes may have
different certificate fingerprints and may (if eg. a reverse proxy is
used) require different `Host:` headers, therefore the entire request
needs to be recreated when we need to switch servers.
Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
Changes to v1:
Also log the final client's error if all clients fail. (Previously
this was silently dropped). Noted by Lukas.
server/src/connection.rs | 290 +++++++++++++++++++++++++++++++++++++--
1 file changed, 276 insertions(+), 14 deletions(-)
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 0adeba2..9f93eb3 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,14 +3,21 @@
//! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
//! instance before calling any of the provided functions.
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
+use std::time::Duration;
use anyhow::{bail, format_err, Error};
use http::uri::Authority;
+use http::Method;
+use serde::Serialize;
-use proxmox_client::{Client, TlsOptions};
+use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
-use pdm_api_types::remotes::{Remote, RemoteType};
+use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
use pve_api_types::client::{PveClient, PveClientImpl};
use crate::pbs_client::PbsClient;
@@ -41,6 +48,26 @@ fn prepare_connect_client(
Some(endpoint) => format_err!("{endpoint} not configured for remote"),
None => format_err!("no nodes configured for remote"),
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+ Ok(ConnectInfo {
+ client,
+ prefix,
+ perl_compat,
+ })
+fn prepare_connect_client_to_node(
+ node: &NodeUrl,
+ default_port: u16,
+ pve_compat: bool,
+) -> Result<Client, Error> {
let mut options = TlsOptions::default();
if let Some(fp) = &node.fingerprint {
@@ -49,11 +76,6 @@ fn prepare_connect_client(
let host_port: Authority = node.hostname.parse()?;
- let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
- RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
- RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
- };
let uri: http::uri::Uri = format!(
@@ -64,12 +86,7 @@ fn prepare_connect_client(
let mut client =
proxmox_client::Client::with_options(uri.clone(), options, Default::default())?;
- Ok(ConnectInfo {
- client,
- prefix,
- perl_compat,
- })
+ Ok(client)
/// Constructs a [`Client`] for the given [`Remote`] for an API token
@@ -92,6 +109,33 @@ fn connect(remote: &Remote, target_endpoint: Option<&str>) -> Result<Client, any
+/// Like [`connect()`], but for remotes which have multiple clients.
+fn multi_connect(remote: &Remote) -> Result<MultiClient, anyhow::Error> {
+ let (default_port, prefix, perl_compat, pve_compat) = match remote.ty {
+ RemoteType::Pve => (8006, "PVEAPIToken".to_string(), true, true),
+ RemoteType::Pbs => (8007, "PBSAPIToken".to_string(), false, false),
+ };
+ let mut clients = Vec::new();
+ for node in &remote.nodes {
+ let client = prepare_connect_client_to_node(node, default_port, pve_compat)?;
+ client.set_authentication(proxmox_client::Token {
+ userid: remote.authid.to_string(),
+ prefix: prefix.clone(),
+ value: remote.token.to_string(),
+ perl_compat,
+ });
+ clients.push(Arc::new(client));
+ }
+ if clients.is_empty() {
+ bail!("no nodes configured for remote");
+ }
+ Ok(MultiClient::new(clients))
/// Constructs a [`Client`] for the given [`Remote`] for an API token or user
/// In case the remote has a user configured (instead of an API token), it will connect and get a
@@ -183,7 +227,7 @@ pub struct DefaultClientFactory;
impl ClientFactory for DefaultClientFactory {
fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- let client = crate::connection::connect(remote, None)?;
+ let client = crate::connection::multi_connect(remote)?;
@@ -279,3 +323,221 @@ pub fn init(instance: Box<dyn ClientFactory + Send + Sync>) {
panic!("connection factory instance already set");
+/// This is another wrapper around the actual HTTP client responsible for dealing with connection
+/// problems: if we cannot reach a node of a cluster, this will attempt to retry a request on
+/// another node.
+/// # Possible improvements
+/// - For `GET` requests we could also start a 2nd request after a shorter time out (eg. 10s).
+/// - We could use RRD data for a "best guess" where to start eg. if we know a node was offline on
+/// the last rrd polling we'd start with a different one.
+/// For this, we still need to include the node names in the clients here somehow so that we can
+/// actually manage/re-shuffle them from the outside after this struct is already created.
+struct MultiClient {
+ state: StdMutex<MultiClientState>,
+ timeout: Duration,
+impl MultiClient {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ state: StdMutex::new(MultiClientState::new(clients)),
+ timeout: Duration::from_secs(60),
+ }
+ }
+/// Keeps track of which client (iow. which specific node of a remote) we're supposed to be using
+/// right now.
+struct MultiClientState {
+ /// The current index *not* modulo the client count.
+ current: usize,
+ clients: Vec<Arc<Client>>,
+impl MultiClientState {
+ fn new(clients: Vec<Arc<Client>>) -> Self {
+ Self {
+ current: 0,
+ clients,
+ }
+ }
+ /// Whenever a request fails with the *current* client we move the current entry forward.
+ ///
+ /// # Note:
+ ///
+ /// With our current strategy `failed_index` is always less than `current`, but if we change
+ /// the strategy, we may want to change this to something like `1 + max(current, failed)`.
+ fn failed(&mut self, failed_index: usize) {
+ if self.current == failed_index {
+ self.current = self.current.wrapping_add(1);
+ }
+ }
+ /// Get `current` as an *index* (i.e. modulo `clients.len()`).
+ fn index(&self) -> usize {
+ self.current % self.clients.len()
+ }
+ /// Get the current client and its index which can be passed to `failed()` if the client fails
+ /// to connect.
+ fn get(&self) -> (Arc<Client>, usize) {
+ let index = self.index();
+ (Arc::clone(&self.clients[index]), self.current)
+ }
+ /// Check if we already tried all clients since a specific starting index.
+ ///
+ /// When an API request is made we loop through the possible clients.
+ /// Since multiple requests might be running simultaneously, it's possible that multiple tasks
+ /// mark the same *or* *multiple* clients as failed already.
+ ///
+ /// We don't want to try clients which we know are currently non-functional, so a
+ /// request-retry-loop will fail as soon as the same *number* of clients since its starting
+ /// point were marked as faulty without retrying them all.
+ fn tried_all_since(&self, start: usize) -> bool {
+ self.tried_clients(start) >= self.clients.len()
+ }
+ /// We store the current index continuously without wrapping it modulo the client count (and
+ /// only do that when indexing the `clients` array), so that we can easily check if "all
+ /// currently running tasks taken together" have already tried all clients by comparing our
+ /// starting point to the current index.
+ fn tried_clients(&self, start: usize) -> usize {
+ self.current.wrapping_sub(start)
+ }
+impl MultiClient {
+ /// This is the client usage strategy.
+ ///
+ /// This is basically a "generator" for clients to try.
+ ///
+ /// We share the "state" with other tasks. When a client fails, it is "marked" as failed and
+ /// the state "rotates" through the clients.
+ /// We might be skipping clients if other tasks already tried "more" clients, but that's fine,
+ /// since there's no point in trying the same remote twice simultaneously if it is currently
+ /// offline...
+ fn try_clients(&self) -> impl Iterator<Item = Arc<Client>> + '_ {
+ let mut start_current = None;
+ let state = &self.state;
+ std::iter::from_fn(move || {
+ let mut state = state.lock().unwrap();
+ match start_current {
+ None => {
+ // first attempt, just use the current client and remember the starting index
+ let (client, index) = state.get();
+ start_current = Some((index, index));
+ Some(client)
+ }
+ Some((start, current)) => {
+ // If our last request failed, the retry-loop asks for another client, mark the
+ // one we just used as failed and check if all clients have gone through a
+ // retry loop...
+ state.failed(current);
+ if state.tried_all_since(start) {
+ // This iterator (and therefore this retry-loop) has tried all clients.
+ // Give up.
+ return None;
+ }
+ // finally just get the new current client and update `current` for the later
+ // call to `failed()`
+ let (client, current) = state.get();
+ start_current = Some((start, current));
+ Some(client)
+ }
+ }
+ })
+ .fuse()
+ }
+// doing this via a generic method is currently tedious as it requires an extra helper trait to
+// declare the flow of the lifetime in the `self.request` vs `self.streaming_request` function from
+// its input to its generic output future... and then you run into borrow-checker limitations...
+macro_rules! try_request {
+ ($self:expr, $method:expr, $path_and_query:expr, $params:expr, $how:ident) => {
+ let params = $params.map(serde_json::to_value);
+ Box::pin(async move {
+ let params = params
+ .transpose()
+ .map_err(|err| proxmox_client::Error::Anyhow(err.into()))?;
+ let mut last_err = None;
+ let mut timed_out = false;
+ // The iterator in use here will automatically mark a client as faulty if we move on to
+ // the `next()` one.
+ for client in $self.try_clients() {
+ if let Some(err) = last_err.take() {
+ log::error!("API client error, trying another remote - {err:?}");
+ }
+ if timed_out {
+ timed_out = false;
+ log::error!("API client timed out, trying another remote");
+ }
+ let request = client.$how($method.clone(), $path_and_query, params.as_ref());
+ match tokio::time::timeout($self.timeout, request).await {
+ Ok(Err(proxmox_client::Error::Client(err))) => {
+ last_err = Some(err);
+ }
+ Ok(result) => return result,
+ Err(_) => {
+ timed_out = true;
+ }
+ }
+ }
+ if let Some(err) = last_err {
+ log::error!("API client error (giving up) - {err:?}");
+ } else if timed_out {
+ log::error!("API client timed out, no remotes reachable, giving up");
+ }
+ Err(proxmox_client::Error::Other(
+ "failed to perform API request",
+ ))
+ })
+ };
+impl HttpApiClient for MultiClient {
+ type ResponseFuture<'a> =
+ Pin<Box<dyn Future<Output = Result<HttpApiResponse, proxmox_client::Error>> + Send + 'a>>;
+ type ResponseStreamFuture<'a> = Pin<
+ Box<
+ dyn Future<Output = Result<HttpApiResponseStream<Self::Body>, proxmox_client::Error>>
+ + Send
+ + 'a,
+ >,
+ >;
+ type Body = hyper::Body;
+ fn request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, request }
+ }
+ fn streaming_request<'a, T>(
+ &'a self,
+ method: Method,
+ path_and_query: &'a str,
+ params: Option<T>,
+ ) -> Self::ResponseStreamFuture<'a>
+ where
+ T: Serialize + 'a,
+ {
+ try_request! { self, method, path_and_query, params, streaming_request }
+ }
More information about the pdm-devel
mailing list