[pdm-devel] [PATCH datacenter-manager 2/7] server: store pve MultiClient for re-use
Wolfgang Bumiller
w.bumiller at proxmox.com
Tue Feb 4 10:55:49 CET 2025
The traits to create the clients needs adapting to keep around clients
by remote names.
Signed-off-by: Wolfgang Bumiller <w.bumiller at proxmox.com>
---
server/src/api/pve/mod.rs | 20 +++---
server/src/connection.rs | 127 ++++++++++++++++++++++++++++++--------
2 files changed, 113 insertions(+), 34 deletions(-)
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 2cefbb4..7b81504 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -20,14 +20,16 @@ use pdm_api_types::{
PRIV_SYS_MODIFY,
};
-use pve_api_types::client::PveClient;
-use pve_api_types::{
- ClusterNodeStatus, ClusterResourceKind, ClusterResourceType, ListRealm, PveUpid,
-};
+use pve_api_types::ClusterNodeStatus;
+use pve_api_types::ListRealm;
+use pve_api_types::PveUpid;
+use pve_api_types::{ClusterResourceKind, ClusterResourceType};
use super::resources::{map_pve_lxc, map_pve_node, map_pve_qemu, map_pve_storage};
-use crate::{connection, task_cache};
+use crate::connection;
+use crate::connection::PveClient;
+use crate::task_cache;
mod lxc;
mod node;
@@ -91,18 +93,18 @@ pub(crate) fn get_remote<'a>(
Ok(remote)
}
-pub async fn connect_or_login(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn connect_or_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
connection::make_pve_client_and_login(remote).await
}
-pub fn connect(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn connect(remote: &Remote) -> Result<Arc<PveClient>, Error> {
connection::make_pve_client(remote)
}
fn connect_to_remote(
config: &SectionConfigData<Remote>,
id: &str,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
connect(get_remote(config, id)?)
}
@@ -264,7 +266,7 @@ fn check_guest_permissions(
async fn find_node_for_vm(
node: Option<String>,
vmid: u32,
- pve: &(dyn PveClient + Send + Sync),
+ pve: &PveClient,
) -> Result<String, Error> {
// FIXME: The pve client should cache the resources
Ok(match node {
diff --git a/server/src/connection.rs b/server/src/connection.rs
index 767a2f9..bee4959 100644
--- a/server/src/connection.rs
+++ b/server/src/connection.rs
@@ -3,12 +3,15 @@
//! Make sure to call [`init`] to inject a concrete [`ClientFactory`]
//! instance before calling any of the provided functions.
+use std::collections::HashMap;
use std::future::Future;
-use std::pin::Pin;
+use std::pin::{pin, Pin};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
-use std::sync::OnceLock;
-use std::time::Duration;
+use std::sync::Once;
+use std::sync::{LazyLock, OnceLock};
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::uri::Authority;
@@ -18,7 +21,7 @@ use serde::Serialize;
use proxmox_client::{Client, HttpApiClient, HttpApiResponse, HttpApiResponseStream, TlsOptions};
use pdm_api_types::remotes::{NodeUrl, Remote, RemoteType};
-use pve_api_types::client::{PveClient, PveClientImpl};
+use pve_api_types::client::PveClientImpl;
use crate::pbs_client::PbsClient;
@@ -183,7 +186,7 @@ async fn connect_or_login(
#[async_trait::async_trait]
pub trait ClientFactory {
/// Create a new API client for PVE remotes
- fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PBS remotes
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error>;
@@ -193,7 +196,7 @@ pub trait ClientFactory {
&self,
remote: &Remote,
target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ ) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PVE remotes.
///
@@ -204,10 +207,7 @@ pub trait ClientFactory {
/// This is intended for API calls that accept a user in addition to tokens.
///
/// Note: currently does not support two factor authentication.
- async fn make_pve_client_and_login(
- &self,
- remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error>;
+ async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error>;
/// Create a new API client for PBS remotes.
///
@@ -224,11 +224,93 @@ pub trait ClientFactory {
/// Default production client factory
pub struct DefaultClientFactory;
+pub type PveClient = dyn pve_api_types::client::PveClient + Send + Sync;
+
+/// A cached client for a remote (to reuse connections and share info about connection issues in
+/// remotes with multiple nodes...).
+struct ClientEntry<T: ?Sized> {
+ last_used: SystemTime,
+ client: Arc<T>,
+ remote: Remote,
+}
+
+/// Contains the cached clients and handle to the future dealing with timing them out.
+#[derive(Default)]
+struct ConnectionCache {
+ pve_clients: StdMutex<HashMap<String, ClientEntry<PveClient>>>,
+}
+
+/// This cache is a singleton.
+static CONNECTION_CACHE: LazyLock<ConnectionCache> = LazyLock::new(Default::default);
+static CLEANUP_FUTURE_STARTED: Once = Once::new();
+
+impl ConnectionCache {
+ const CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
+ const STALE_TIMEOUT: Duration = Duration::from_secs(30);
+
+ /// Access the cache
+ fn get() -> &'static Self {
+ let this = &CONNECTION_CACHE;
+ this.init();
+ this
+ }
+
+ /// If it hasn't already, spawn the cleanup future.
+ fn init(&self) {
+ CLEANUP_FUTURE_STARTED.call_once(|| {
+ tokio::spawn(async move {
+ let future = pin!(CONNECTION_CACHE.cleanup_future());
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(future, abort_future).await;
+ });
+ });
+ }
+
+ /// Run a cleanup operation every 30 seconds.
+ async fn cleanup_future(&self) {
+ loop {
+ tokio::time::sleep(Self::CLEANUP_INTERVAL).await;
+ self.cleanup_cycle();
+ }
+ }
+
+ /// Clean out cached clients older than 30 seconds.
+ fn cleanup_cycle(&self) {
+ let oldest_time = SystemTime::now() - Self::STALE_TIMEOUT;
+ self.pve_clients
+ .lock()
+ .unwrap()
+ .retain(|_remote_name, client| client.last_used >= oldest_time)
+ }
+
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, anyhow::Error> {
+ let mut pve_clients = self.pve_clients.lock().unwrap();
+ if let Some(client) = pve_clients.get_mut(&remote.id) {
+ // Verify the remote is still the same:
+ if client.remote == *remote {
+ client.last_used = SystemTime::now();
+ return Ok(Arc::clone(&client.client));
+ }
+ }
+
+ let client: Arc<PveClient> =
+ Arc::new(PveClientImpl(crate::connection::multi_connect(remote)?));
+ pve_clients.insert(
+ remote.id.clone(),
+ ClientEntry {
+ last_used: SystemTime::now(),
+ client: Arc::clone(&client),
+ remote: remote.clone(),
+ },
+ );
+ Ok(client)
+ }
+}
+
#[async_trait::async_trait]
impl ClientFactory for DefaultClientFactory {
- fn make_pve_client(&self, remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
- let client = crate::connection::multi_connect(remote)?;
- Ok(Box::new(PveClientImpl(client)))
+ fn make_pve_client(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
+ ConnectionCache::get().make_pve_client(remote)
}
fn make_pbs_client(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -240,17 +322,14 @@ impl ClientFactory for DefaultClientFactory {
&self,
remote: &Remote,
target_endpoint: Option<&str>,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ ) -> Result<Arc<PveClient>, Error> {
let client = crate::connection::connect(remote, target_endpoint)?;
- Ok(Box::new(PveClientImpl(client)))
+ Ok(Arc::new(PveClientImpl(client)))
}
- async fn make_pve_client_and_login(
- &self,
- remote: &Remote,
- ) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+ async fn make_pve_client_and_login(&self, remote: &Remote) -> Result<Arc<PveClient>, Error> {
let client = connect_or_login(remote, None).await?;
- Ok(Box::new(PveClientImpl(client)))
+ Ok(Arc::new(PveClientImpl(client)))
}
async fn make_pbs_client_and_login(&self, remote: &Remote) -> Result<Box<PbsClient>, Error> {
@@ -270,7 +349,7 @@ fn instance() -> &'static (dyn ClientFactory + Send + Sync) {
}
/// Create a new API client for PVE remotes
-pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub fn make_pve_client(remote: &Remote) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client(remote)
}
@@ -278,7 +357,7 @@ pub fn make_pve_client(remote: &Remote) -> Result<Box<dyn PveClient + Send + Syn
pub fn make_pve_client_with_endpoint(
remote: &Remote,
target_endpoint: Option<&str>,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client_with_endpoint(remote, target_endpoint)
}
@@ -296,9 +375,7 @@ pub fn make_pbs_client(remote: &Remote) -> Result<Box<PbsClient>, Error> {
/// This is intended for API calls that accept a user in addition to tokens.
///
/// Note: currently does not support two factor authentication.
-pub async fn make_pve_client_and_login(
- remote: &Remote,
-) -> Result<Box<dyn PveClient + Send + Sync>, Error> {
+pub async fn make_pve_client_and_login(remote: &Remote) -> Result<Arc<PveClient>, Error> {
instance().make_pve_client_and_login(remote).await
}
--
2.39.5
More information about the pdm-devel
mailing list