openzeppelin_monitor/services/blockchain/clients/evm/
client.rsuse std::marker::PhantomData;
use anyhow::Context;
use async_trait::async_trait;
use futures;
use serde_json::json;
use tracing::instrument;
use crate::{
models::{BlockType, EVMBlock, EVMReceiptLog, EVMTransactionReceipt, Network},
services::{
blockchain::{
client::BlockChainClient,
transports::{BlockchainTransport, EVMTransportClient},
BlockFilterFactory,
},
filter::{evm_helpers::string_to_h256, EVMBlockFilter},
},
};
#[derive(Clone)]
pub struct EvmClient<T: Send + Sync + Clone> {
http_client: T,
}
impl<T: Send + Sync + Clone> EvmClient<T> {
pub fn new_with_transport(http_client: T) -> Self {
Self { http_client }
}
}
impl EvmClient<EVMTransportClient> {
pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
let client = EVMTransportClient::new(network).await?;
Ok(Self::new_with_transport(client))
}
}
impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for EvmClient<T> {
type Filter = EVMBlockFilter<Self>;
fn filter() -> Self::Filter {
EVMBlockFilter {
_client: PhantomData,
}
}
}
#[async_trait]
pub trait EvmClientTrait {
async fn get_transaction_receipt(
&self,
transaction_hash: String,
) -> Result<EVMTransactionReceipt, anyhow::Error>;
async fn get_logs_for_blocks(
&self,
from_block: u64,
to_block: u64,
addresses: Option<Vec<String>>,
) -> Result<Vec<EVMReceiptLog>, anyhow::Error>;
}
#[async_trait]
impl<T: Send + Sync + Clone + BlockchainTransport> EvmClientTrait for EvmClient<T> {
#[instrument(skip(self), fields(transaction_hash))]
async fn get_transaction_receipt(
&self,
transaction_hash: String,
) -> Result<EVMTransactionReceipt, anyhow::Error> {
let hash = string_to_h256(&transaction_hash)
.map_err(|e| anyhow::anyhow!("Invalid transaction hash: {}", e))?;
let params = json!([format!("0x{:x}", hash)])
.as_array()
.with_context(|| "Failed to create JSON-RPC params array")?
.to_vec();
let response = self
.http_client
.send_raw_request(
"eth_getTransactionReceipt",
Some(serde_json::Value::Array(params)),
)
.await
.with_context(|| format!("Failed to get transaction receipt: {}", transaction_hash))?;
let receipt_data = response
.get("result")
.with_context(|| "Missing 'result' field")?;
if receipt_data.is_null() {
return Err(anyhow::anyhow!("Transaction receipt not found"));
}
Ok(serde_json::from_value(receipt_data.clone())
.with_context(|| "Failed to parse transaction receipt")?)
}
#[instrument(skip(self), fields(from_block, to_block))]
async fn get_logs_for_blocks(
&self,
from_block: u64,
to_block: u64,
addresses: Option<Vec<String>>,
) -> Result<Vec<EVMReceiptLog>, anyhow::Error> {
let params = json!([{
"fromBlock": format!("0x{:x}", from_block),
"toBlock": format!("0x{:x}", to_block),
"address": addresses
}])
.as_array()
.with_context(|| "Failed to create JSON-RPC params array")?
.to_vec();
let response = self
.http_client
.send_raw_request("eth_getLogs", Some(params))
.await
.with_context(|| {
format!(
"Failed to get logs for blocks: {} - {}",
from_block, to_block
)
})?;
let logs_data = response
.get("result")
.with_context(|| "Missing 'result' field")?;
Ok(serde_json::from_value(logs_data.clone()).with_context(|| "Failed to parse logs")?)
}
}
#[async_trait]
impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for EvmClient<T> {
#[instrument(skip(self))]
async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
let response = self
.http_client
.send_raw_request::<serde_json::Value>("eth_blockNumber", None)
.await
.with_context(|| "Failed to get latest block number")?;
let hex_str = response
.get("result")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
u64::from_str_radix(hex_str.trim_start_matches("0x"), 16)
.map_err(|e| anyhow::anyhow!("Failed to parse block number: {}", e))
}
#[instrument(skip(self), fields(start_block, end_block))]
async fn get_blocks(
&self,
start_block: u64,
end_block: Option<u64>,
) -> Result<Vec<BlockType>, anyhow::Error> {
let block_futures: Vec<_> = (start_block..=end_block.unwrap_or(start_block))
.map(|block_number| {
let params = json!([
format!("0x{:x}", block_number),
true ]);
let client = self.http_client.clone();
async move {
let response = client
.send_raw_request("eth_getBlockByNumber", Some(params))
.await
.with_context(|| format!("Failed to get block: {}", block_number))?;
let block_data = response
.get("result")
.ok_or_else(|| anyhow::anyhow!("Missing 'result' field"))?;
if block_data.is_null() {
return Err(anyhow::anyhow!("Block not found"));
}
let block: EVMBlock = serde_json::from_value(block_data.clone())
.map_err(|e| anyhow::anyhow!("Failed to parse block: {}", e))?;
Ok(BlockType::EVM(Box::new(block)))
}
})
.collect();
futures::future::join_all(block_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
}
}