[pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use

Wolfgang Bumiller w.bumiller at proxmox.com
Tue Feb 4 10:55:49 CET 2025


The traits to create the clients needs adapting to keep around clients
by remote names.

Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
---
 server/src/api/pve/mod.rs |  20 +++---
 server/src/connection.rs  | 127 ++++++++++++++++++++++++++++++--------
 2 files changed, 113 insertions(+), 34 deletions(-)

diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 2cefbb4..7b81504 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -20,14 +20,16 @@ use pdm_api_types::{
     PRIV_SYS_MODIFY,
 };
 
-use pve_api_types::client::PveClient;
-use pve_api_types::{
-    ClusterNodeStatus, ClusterResourceKind, ClusterResourceType, ListRealm, PveUpid,
-};
+use pve_api_types::ClusterNodeStatus;
+use pve_api_types::ListRealm;
+use pve_api_types::PveUpid;
+use pve_api_types::{ClusterResourceKind, ClusterResourceType};
 
 use super::resources::{map_pve_lxc, map_pve_node, map_pve_qemu, map_pve_storage};
 
-use crate::{connection, task_cache};
+use crate::connection;
+use crate::connection::PveClient;
+use crate::task_cache;
 
 mod lxc;
 mod node;
@@ -91,18 +93,18 @@ pub(crate) fn get_remote<'a>(
     Ok(remote)
 }
 
-pub async fn connect_or_login(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn connect_or_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
     connection::make_pve_client_and_login(remote).await
 }
 
-pub fn connect(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn connect(remote: &Remote) -> Result<Arc<PveClient>, Error> {
     connection::make_pve_client(remote)
 }
 
 fn connect_to_remote(
     config: &SectionConfigData<Remote>,
     id: &str,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
     connect(get_remote(config, id)?)
 }
 
@@ -264,7 +266,7 @@ fn check_guest_permissions(
 async fn find_node_for_vm(
     node: Option<String>,
     vmid: u32,
-    pve: &(dyn PveClient + Send + Sync),
+    pve: &PveClient,
 ) -> Result<String, Error> {
     // FIXME: The pve client should cache the resources
     Ok(match node {
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 767a2f9..bee4959 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,12 +3,15 @@
 //! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
 //! instance before calling any of the provided functions.
 
+use std::collections::HashMap;
 use std::future::Future;
-use std::pin::Pin;
+use std::pin::{pin, Pin};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::sync::Mutex as StdMutex;
-use std::sync::OnceLock;
-use std::time::Duration;
+use std::sync::Once;
+use std::sync::{LazyLock, OnceLock};
+use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Error};
 use http::uri::Authority;
@@ -18,7 +21,7 @@ use serde::Serialize;
 use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
 
 use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
-use pve_api_types::client::{PveClient, PveClientImpl};
+use pve_api_types::client::PveClientImpl;
 
 use crate::pbs_client::PbsClient;
 
@@ -183,7 +186,7 @@ async fn connect_or_login(
 #[async_trait::async_trait]
 pub trait ClientFactory {
     /// Create a new API client for PVE remotes
-    fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+    fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
 
     /// Create a new API client for PBS remotes
     fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error>;
@@ -193,7 +196,7 @@ pub trait ClientFactory {
         &self,
         remote: &Remote,
         target_endpoint: Option<&str>,
-    ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+    ) -> Result<Arc<PveClient>, Error>;
 
     /// Create a new API client for PVE remotes.
     ///
@@ -204,10 +207,7 @@ pub trait ClientFactory {
     /// This is intended for API calls that accept a user in addition to tokens.
     ///
     /// Note: currently does not support two factor authentication.
-    async fn make_pve_client_and_login(
-        &self,
-        remote: &Remote,
-    ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+    async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
 
     /// Create a new API client for PBS remotes.
     ///
@@ -224,11 +224,93 @@ pub trait ClientFactory {
 /// Default production client factory
 pub struct DefaultClientFactory;
 
+pub type PveClient = dyn pve_api_types::client::PveClient + Send + Sync;
+
+/// A cached client for a remote (to reuse connections and share info about connection issues in
+/// remotes with multiple nodes...).
+struct ClientEntry<T: ?Sized> {
+    last_used: SystemTime,
+    client: Arc<T>,
+    remote: Remote,
+}
+
+/// Contains the cached clients and handle to the future dealing with timing them out.
+#[derive(Default)]
+struct ConnectionCache {
+    pve_clients: StdMutex<HashMap<String, ClientEntry<PveClient>>>,
+}
+
+/// This cache is a singleton.
+static CONNECTION_CACHE: LazyLock<ConnectionCache> = LazyLock::new(Default::default);
+static CLEANUP_FUTURE_STARTED: Once = Once::new();
+
+impl ConnectionCache {
+    const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
+    const STALE_TIMEOUT: Duration = Duration::from_secs(30);
+
+    /// Access the cache
+    fn get() -> &'static Self {
+        let this = &CONNECTION_CACHE;
+        this.init();
+        this
+    }
+
+    /// If it hasn't already, spawn the cleanup future.
+    fn init(&self) {
+        CLEANUP_FUTURE_STARTED.call_once(|| {
+            tokio::spawn(async move {
+                let future = pin!(CONNECTION_CACHE.cleanup_future());
+                let abort_future = pin!(proxmox_daemon::shutdown_future());
+                futures::future::select(future, abort_future).await;
+            });
+        });
+    }
+
+    /// Run a cleanup operation every 30 seconds.
+    async fn cleanup_future(&self) {
+        loop {
+            tokio::time::sleep(Self::CLEANUP_INTERVAL).await;
+            self.cleanup_cycle();
+        }
+    }
+
+    /// Clean out cached clients older than 30 seconds.
+    fn cleanup_cycle(&self) {
+        let oldest_time = SystemTime::now() - Self::STALE_TIMEOUT;
+        self.pve_clients
+            .lock()
+            .unwrap()
+            .retain(|_remote_name, client| client.last_used >= oldest_time)
+    }
+
+    fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, anyhow::Error> {
+        let mut pve_clients = self.pve_clients.lock().unwrap();
+        if let Some(client) = pve_clients.get_mut(&remote.id) {
+            // Verify the remote is still the same:
+            if client.remote == *remote {
+                client.last_used = SystemTime::now();
+                return Ok(Arc::clone(&client.client));
+            }
+        }
+
+        let client: Arc<PveClient> =
+            Arc::new(PveClientImpl(crate::connection::multi_connect(remote)?));
+        pve_clients.insert(
+            remote.id.clone(),
+            ClientEntry {
+                last_used: SystemTime::now(),
+                client: Arc::clone(&client),
+                remote: remote.clone(),
+            },
+        );
+        Ok(client)
+    }
+}
+
 #[async_trait::async_trait]
 impl ClientFactory for DefaultClientFactory {
-    fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
-        let client = crate::connection::multi_connect(remote)?;
-        Ok(Box::new(PveClientImpl(client)))
+    fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
+        ConnectionCache::get().make_pve_client(remote)
     }
 
     fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -240,17 +322,14 @@ impl ClientFactory for DefaultClientFactory {
         &self,
         remote: &Remote,
         target_endpoint: Option<&str>,
-    ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+    ) -> Result<Arc<PveClient>, Error> {
         let client = crate::connection::connect(remote, target_endpoint)?;
-        Ok(Box::new(PveClientImpl(client)))
+        Ok(Arc::new(PveClientImpl(client)))
     }
 
-    async fn make_pve_client_and_login(
-        &self,
-        remote: &Remote,
-    ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+    async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
         let client = connect_or_login(remote, None).await?;
-        Ok(Box::new(PveClientImpl(client)))
+        Ok(Arc::new(PveClientImpl(client)))
     }
 
     async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -270,7 +349,7 @@ fn instance() -> &'static (dyn ClientFactory + Send + Sync) {
 }
 
 /// Create a new API client for PVE remotes
-pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn make_pve_client(remote: &Remote) -> Result<Arc<PveClient>, Error> {
     instance().make_pve_client(remote)
 }
 
@@ -278,7 +357,7 @@ pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Syn
 pub fn make_pve_client_with_endpoint(
     remote: &Remote,
     target_endpoint: Option<&str>,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
     instance().make_pve_client_with_endpoint(remote, target_endpoint)
 }
 
@@ -296,9 +375,7 @@ pub fn make_pbs_client(remote: &Remote) -> Result<Box<PbsClient>, Error> {
 /// This is intended for API calls that accept a user in addition to tokens.
 ///
 /// Note: currently does not support two factor authentication.
-pub async fn make_pve_client_and_login(
-    remote: &Remote,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn make_pve_client_and_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
     instance().make_pve_client_and_login(remote).await
 }
 
-- 
2.39.5





More information about the pdm-devel mailing list