openzeppelin_monitor/services/blockchain/transports/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//! Network transport implementations for blockchain clients.
//!
//! Provides concrete implementations for different blockchain network protocols:
//!
//! - Generic HTTP transport for all chains

mod evm {
	pub mod http;
}
mod stellar {
	pub mod http;
}

mod endpoint_manager;
mod http;

pub use endpoint_manager::EndpointManager;
pub use evm::http::EVMTransportClient;
pub use http::HttpTransportClient;
pub use stellar::http::StellarTransportClient;

use reqwest_middleware::ClientWithMiddleware;
use reqwest_retry::{
	default_on_request_failure, default_on_request_success, policies::ExponentialBackoff,
	Retryable, RetryableStrategy,
};
use serde::Serialize;
use serde_json::{json, Value};

/// HTTP status codes that trigger RPC endpoint rotation
/// - 429: Too Many Requests - indicates rate limiting from the current endpoint
pub const ROTATE_ON_ERROR_CODES: [u16; 1] = [429];

/// Base trait for all blockchain transport clients
#[async_trait::async_trait]
pub trait BlockchainTransport: Send + Sync {
	/// Get the current URL being used by the transport
	async fn get_current_url(&self) -> String;

	/// Send a raw request to the blockchain
	async fn send_raw_request<P>(
		&self,
		method: &str,
		params: Option<P>,
	) -> Result<Value, anyhow::Error>
	where
		P: Into<Value> + Send + Clone + Serialize;

	/// Customizes the request for specific blockchain requirements
	async fn customize_request<P>(&self, method: &str, params: Option<P>) -> Value
	where
		P: Into<Value> + Send + Clone + Serialize,
	{
		// Default implementation for JSON-RPC
		json!({
			"jsonrpc": "2.0",
			"id": 1,
			"method": method,
			"params": params.map(|p| p.into())
		})
	}

	/// Sets the retry policy for the transport
	fn set_retry_policy(
		&mut self,
		retry_policy: ExponentialBackoff,
		retry_strategy: Option<TransientErrorRetryStrategy>,
	) -> Result<(), anyhow::Error>;

	/// Update endpoint manager with a new client
	fn update_endpoint_manager_client(
		&mut self,
		client: ClientWithMiddleware,
	) -> Result<(), anyhow::Error>;
}

/// Extension trait for transports that support URL rotation
#[async_trait::async_trait]
pub trait RotatingTransport: BlockchainTransport {
	/// Attempts to establish a connection with a new URL
	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error>;

	/// Updates the client with a new URL
	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error>;
}

/// A default retry strategy that retries on requests based on the status code
/// This can be used to customise the retry strategy
pub struct TransientErrorRetryStrategy;
impl RetryableStrategy for TransientErrorRetryStrategy {
	fn handle(
		&self,
		res: &Result<reqwest::Response, reqwest_middleware::Error>,
	) -> Option<Retryable> {
		match res {
			Ok(success) => default_on_request_success(success),
			Err(error) => default_on_request_failure(error),
		}
	}
}