openzeppelin_monitor/services/blockchain/
pool.rsuse crate::{
models::{BlockChainType, Network},
services::blockchain::{
BlockChainClient, BlockFilterFactory, EVMTransportClient, EvmClient, EvmClientTrait,
StellarClient, StellarClientTrait, StellarTransportClient,
},
};
use anyhow::Context;
use async_trait::async_trait;
use futures::future::BoxFuture;
use std::{any::Any, collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
#[async_trait]
pub trait ClientPoolTrait: Send + Sync {
type EvmClient: EvmClientTrait + BlockChainClient + BlockFilterFactory<Self::EvmClient>;
type StellarClient: StellarClientTrait
+ BlockChainClient
+ BlockFilterFactory<Self::StellarClient>;
async fn get_evm_client(
&self,
network: &Network,
) -> Result<Arc<Self::EvmClient>, anyhow::Error>;
async fn get_stellar_client(
&self,
network: &Network,
) -> Result<Arc<Self::StellarClient>, anyhow::Error>;
}
pub struct ClientStorage<T> {
clients: Arc<RwLock<HashMap<String, Arc<T>>>>,
}
impl<T> ClientStorage<T> {
pub fn new() -> Self {
Self {
clients: Arc::new(RwLock::new(HashMap::new())),
}
}
}
pub struct ClientPool {
pub storages: HashMap<BlockChainType, Box<dyn Any + Send + Sync>>,
}
impl ClientPool {
pub fn new() -> Self {
let mut pool = Self {
storages: HashMap::new(),
};
pool.register_client_type::<EvmClient<EVMTransportClient>>(BlockChainType::EVM);
pool.register_client_type::<StellarClient<StellarTransportClient>>(BlockChainType::Stellar);
pool
}
fn register_client_type<T: 'static + Send + Sync>(&mut self, client_type: BlockChainType) {
self.storages
.insert(client_type, Box::new(ClientStorage::<T>::new()));
}
async fn get_or_create_client<T: BlockChainClient + 'static>(
&self,
client_type: BlockChainType,
network: &Network,
create_fn: impl Fn(&Network) -> BoxFuture<'static, Result<T, anyhow::Error>>,
) -> Result<Arc<T>, anyhow::Error> {
let storage = self
.storages
.get(&client_type)
.and_then(|s| s.downcast_ref::<ClientStorage<T>>())
.with_context(|| "Invalid client type")?;
if let Some(client) = storage.clients.read().await.get(&network.slug) {
return Ok(client.clone());
}
let mut clients = storage.clients.write().await;
let client = Arc::new(create_fn(network).await?);
clients.insert(network.slug.clone(), client.clone());
Ok(client)
}
pub async fn get_client_count<T: 'static>(&self, client_type: BlockChainType) -> usize {
match self
.storages
.get(&client_type)
.and_then(|s| s.downcast_ref::<ClientStorage<T>>())
{
Some(storage) => storage.clients.read().await.len(),
None => 0,
}
}
}
#[async_trait]
impl ClientPoolTrait for ClientPool {
type EvmClient = EvmClient<EVMTransportClient>;
type StellarClient = StellarClient<StellarTransportClient>;
async fn get_evm_client(
&self,
network: &Network,
) -> Result<Arc<Self::EvmClient>, anyhow::Error> {
self.get_or_create_client(BlockChainType::EVM, network, |n| {
let network = n.clone();
Box::pin(async move { Self::EvmClient::new(&network).await })
})
.await
.with_context(|| "Failed to get or create EVM client")
}
async fn get_stellar_client(
&self,
network: &Network,
) -> Result<Arc<Self::StellarClient>, anyhow::Error> {
self.get_or_create_client(BlockChainType::Stellar, network, |n| {
let network = n.clone();
Box::pin(async move { Self::StellarClient::new(&network).await })
})
.await
.with_context(|| "Failed to get or create Stellar client")
}
}
impl Default for ClientPool {
fn default() -> Self {
Self::new()
}
}