#![allow(clippy::result_large_err)]
use std::{collections::HashMap, marker::PhantomData, path::Path};
use async_trait::async_trait;
use crate::{
models::{ConfigLoader, Monitor, Network, ScriptLanguage, Trigger},
repositories::{
error::RepositoryError,
network::{NetworkRepository, NetworkRepositoryTrait, NetworkService},
trigger::{TriggerRepository, TriggerRepositoryTrait, TriggerService},
},
};
const LANGUAGE_EXTENSIONS: &[(&ScriptLanguage, &str)] = &[
(&ScriptLanguage::Python, "py"),
(&ScriptLanguage::JavaScript, "js"),
(&ScriptLanguage::Bash, "sh"),
];
#[derive(Clone)]
pub struct MonitorRepository<
N: NetworkRepositoryTrait + Send + 'static,
T: TriggerRepositoryTrait + Send + 'static,
> {
pub monitors: HashMap<String, Monitor>,
_network_repository: PhantomData<N>,
_trigger_repository: PhantomData<T>,
}
impl<
N: NetworkRepositoryTrait + Send + Sync + 'static,
T: TriggerRepositoryTrait + Send + Sync + 'static,
> MonitorRepository<N, T>
{
pub async fn new(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Self, RepositoryError> {
let monitors = Self::load_all(path, network_service, trigger_service).await?;
Ok(MonitorRepository {
monitors,
_network_repository: PhantomData,
_trigger_repository: PhantomData,
})
}
pub fn new_with_monitors(monitors: HashMap<String, Monitor>) -> Self {
MonitorRepository {
monitors,
_network_repository: PhantomData,
_trigger_repository: PhantomData,
}
}
pub fn validate_monitor_references(
monitors: &HashMap<String, Monitor>,
triggers: &HashMap<String, Trigger>,
networks: &HashMap<String, Network>,
) -> Result<(), RepositoryError> {
let mut validation_errors = Vec::new();
let mut metadata = HashMap::new();
for (monitor_name, monitor) in monitors {
for trigger_id in &monitor.triggers {
if !triggers.contains_key(trigger_id) {
validation_errors.push(format!(
"Monitor '{}' references non-existent trigger '{}'",
monitor_name, trigger_id
));
metadata.insert(
format!("monitor_{}_invalid_trigger", monitor_name),
trigger_id.clone(),
);
}
}
for network_slug in &monitor.networks {
if !networks.contains_key(network_slug) {
validation_errors.push(format!(
"Monitor '{}' references non-existent network '{}'",
monitor_name, network_slug
));
metadata.insert(
format!("monitor_{}_invalid_network", monitor_name),
network_slug.clone(),
);
}
}
for condition in &monitor.trigger_conditions {
let script_path = Path::new(&condition.script_path);
if !script_path.exists() {
validation_errors.push(format!(
"Monitor '{}' has a custom filter script that does not exist: {}",
monitor_name, condition.script_path
));
}
let expected_extension = match LANGUAGE_EXTENSIONS
.iter()
.find(|(lang, _)| *lang == &condition.language)
.map(|(_, ext)| *ext)
{
Some(ext) => ext,
None => {
validation_errors.push(format!(
"Monitor '{}' uses unsupported script language {:?}",
monitor_name, condition.language
));
continue;
}
};
match script_path.extension().and_then(|ext| ext.to_str()) {
Some(ext) if ext == expected_extension => (), _ => validation_errors.push(format!(
"Monitor '{}' has a custom filter script with invalid extension - must be \
.{} for {:?} language: {}",
monitor_name, expected_extension, condition.language, condition.script_path
)),
}
if condition.timeout_ms == 0 {
validation_errors.push(format!(
"Monitor '{}' should have a custom filter timeout_ms greater than 0",
monitor_name
));
}
}
}
if !validation_errors.is_empty() {
return Err(RepositoryError::validation_error(
format!(
"Configuration validation failed:\n{}",
validation_errors.join("\n"),
),
None,
Some(metadata),
));
}
Ok(())
}
}
#[async_trait]
pub trait MonitorRepositoryTrait<
N: NetworkRepositoryTrait + Send + 'static,
T: TriggerRepositoryTrait + Send + 'static,
>: Clone + Send
{
async fn new(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Self, RepositoryError>
where
Self: Sized;
async fn load_all(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<HashMap<String, Monitor>, RepositoryError>;
async fn load_from_path(
&self,
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Monitor, RepositoryError>;
fn get(&self, monitor_id: &str) -> Option<Monitor>;
fn get_all(&self) -> HashMap<String, Monitor>;
}
#[async_trait]
impl<
N: NetworkRepositoryTrait + Send + Sync + 'static,
T: TriggerRepositoryTrait + Send + Sync + 'static,
> MonitorRepositoryTrait<N, T> for MonitorRepository<N, T>
{
async fn new(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Self, RepositoryError> {
MonitorRepository::new(path, network_service, trigger_service).await
}
async fn load_all(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<HashMap<String, Monitor>, RepositoryError> {
let monitors = Monitor::load_all(path).await.map_err(|e| {
RepositoryError::load_error(
"Failed to load monitors",
Some(Box::new(e)),
Some(HashMap::from([(
"path".to_string(),
path.map_or_else(|| "default".to_string(), |p| p.display().to_string()),
)])),
)
})?;
let networks = match network_service {
Some(service) => service.get_all(),
None => {
NetworkRepository::new(None)
.await
.map_err(|e| {
RepositoryError::load_error(
"Failed to load networks for monitor validation",
Some(Box::new(e)),
None,
)
})?
.networks
}
};
let triggers = match trigger_service {
Some(service) => service.get_all(),
None => {
TriggerRepository::new(None)
.await
.map_err(|e| {
RepositoryError::load_error(
"Failed to load triggers for monitor validation",
Some(Box::new(e)),
None,
)
})?
.triggers
}
};
Self::validate_monitor_references(&monitors, &triggers, &networks)?;
Ok(monitors)
}
async fn load_from_path(
&self,
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Monitor, RepositoryError> {
match path {
Some(path) => {
let monitor = Monitor::load_from_path(path).await.map_err(|e| {
RepositoryError::load_error(
"Failed to load monitors",
Some(Box::new(e)),
Some(HashMap::from([(
"path".to_string(),
path.display().to_string(),
)])),
)
})?;
let networks = match network_service {
Some(service) => service.get_all(),
None => NetworkRepository::new(None).await?.networks,
};
let triggers = match trigger_service {
Some(service) => service.get_all(),
None => TriggerRepository::new(None).await?.triggers,
};
let monitors = HashMap::from([(monitor.name.clone(), monitor)]);
Self::validate_monitor_references(&monitors, &triggers, &networks)?;
match monitors.values().next() {
Some(monitor) => Ok(monitor.clone()),
None => Err(RepositoryError::load_error("No monitors found", None, None)),
}
}
None => Err(RepositoryError::load_error(
"Failed to load monitors",
None,
None,
)),
}
}
fn get(&self, monitor_id: &str) -> Option<Monitor> {
self.monitors.get(monitor_id).cloned()
}
fn get_all(&self) -> HashMap<String, Monitor> {
self.monitors.clone()
}
}
#[derive(Clone)]
pub struct MonitorService<
M: MonitorRepositoryTrait<N, T> + Send,
N: NetworkRepositoryTrait + Send + Sync + 'static,
T: TriggerRepositoryTrait + Send + Sync + 'static,
> {
repository: M,
_network_repository: PhantomData<N>,
_trigger_repository: PhantomData<T>,
}
impl<
M: MonitorRepositoryTrait<N, T> + Send,
N: NetworkRepositoryTrait + Send + Sync + 'static,
T: TriggerRepositoryTrait + Send + Sync + 'static,
> MonitorService<M, N, T>
{
pub async fn new(
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<MonitorService<M, N, T>, RepositoryError> {
let repository = M::new(path, network_service, trigger_service).await?;
Ok(MonitorService {
repository,
_network_repository: PhantomData,
_trigger_repository: PhantomData,
})
}
pub async fn new_with_path(
path: Option<&Path>,
) -> Result<MonitorService<M, N, T>, RepositoryError> {
let repository = M::new(path, None, None).await?;
Ok(MonitorService {
repository,
_network_repository: PhantomData,
_trigger_repository: PhantomData,
})
}
pub fn new_with_repository(repository: M) -> Result<Self, RepositoryError> {
Ok(MonitorService {
repository,
_network_repository: PhantomData,
_trigger_repository: PhantomData,
})
}
pub fn get(&self, monitor_id: &str) -> Option<Monitor> {
self.repository.get(monitor_id)
}
pub fn get_all(&self) -> HashMap<String, Monitor> {
self.repository.get_all()
}
pub async fn load_from_path(
&self,
path: Option<&Path>,
network_service: Option<NetworkService<N>>,
trigger_service: Option<TriggerService<T>>,
) -> Result<Monitor, RepositoryError> {
self.repository
.load_from_path(path, network_service, trigger_service)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{models::ScriptLanguage, utils::tests::builders::evm::monitor::MonitorBuilder};
use std::fs;
use tempfile::TempDir;
#[test]
fn test_validate_custom_trigger_conditions() {
let temp_dir = TempDir::new().unwrap();
let script_path = temp_dir.path().join("test_script.py");
fs::write(&script_path, "print('test')").unwrap();
let mut monitors = HashMap::new();
let triggers = HashMap::new();
let networks = HashMap::new();
let monitor = MonitorBuilder::new()
.name("test_monitor")
.networks(vec![])
.trigger_condition(
script_path.to_str().unwrap(),
1000,
ScriptLanguage::Python,
None,
)
.build();
monitors.insert("test_monitor".to_string(), monitor);
let result =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
);
assert!(result.is_ok());
let monitor_bad_path = MonitorBuilder::new()
.name("test_monitor_bad_path")
.trigger_condition("non_existent_script.py", 1000, ScriptLanguage::Python, None)
.build();
monitors.insert("test_monitor_bad_path".to_string(), monitor_bad_path);
let err =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
)
.unwrap_err();
assert!(err.to_string().contains("does not exist"));
let wrong_ext_path = temp_dir.path().join("test_script.js");
fs::write(&wrong_ext_path, "print('test')").unwrap();
let monitor_wrong_ext = MonitorBuilder::new()
.name("test_monitor_wrong_ext")
.trigger_condition(
wrong_ext_path.to_str().unwrap(),
1000,
ScriptLanguage::Python,
None,
)
.build();
monitors.clear();
monitors.insert("test_monitor_wrong_ext".to_string(), monitor_wrong_ext);
let err =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
)
.unwrap_err();
assert!(err.to_string().contains(
"Monitor 'test_monitor_wrong_ext' has a custom filter script with invalid extension - \
must be .py for Python language"
));
let monitor_zero_timeout = MonitorBuilder::new()
.name("test_monitor_zero_timeout")
.trigger_condition(
script_path.to_str().unwrap(),
0,
ScriptLanguage::Python,
None,
)
.build();
monitors.clear();
monitors.insert(
"test_monitor_zero_timeout".to_string(),
monitor_zero_timeout,
);
let err =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
)
.unwrap_err();
assert!(err.to_string().contains("timeout_ms greater than 0"));
}
#[tokio::test]
async fn test_load_error_messages() {
let invalid_path = Path::new("/non/existent/path");
let result = MonitorRepository::<NetworkRepository, TriggerRepository>::load_all(
Some(invalid_path),
None,
None,
)
.await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
RepositoryError::LoadError(message) => {
assert!(message.to_string().contains("Failed to load monitors"));
}
_ => panic!("Expected RepositoryError::LoadError"),
}
}
#[test]
fn test_network_validation_error() {
let mut monitors = HashMap::new();
let monitor = MonitorBuilder::new()
.name("test_monitor")
.networks(vec!["non_existent_network".to_string()])
.build();
monitors.insert("test_monitor".to_string(), monitor);
let networks = HashMap::new();
let triggers = HashMap::new();
let result =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("references non-existent network"));
}
#[test]
fn test_trigger_validation_error() {
let mut monitors = HashMap::new();
let monitor = MonitorBuilder::new()
.name("test_monitor")
.triggers(vec!["non_existent_trigger".to_string()])
.build();
monitors.insert("test_monitor".to_string(), monitor);
let networks = HashMap::new();
let triggers = HashMap::new();
let result =
MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
&monitors, &triggers, &networks,
);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("references non-existent trigger"));
}
#[tokio::test]
async fn test_load_from_path_error_handling() {
let temp_dir = TempDir::new().unwrap();
let invalid_path = temp_dir.path().join("non_existent_monitor.json");
let repository =
MonitorRepository::<NetworkRepository, TriggerRepository>::new_with_monitors(
HashMap::new(),
);
let result = repository
.load_from_path(Some(&invalid_path), None, None)
.await;
assert!(result.is_err());
let err = result.unwrap_err();
match err {
RepositoryError::LoadError(message) => {
assert!(message.to_string().contains("Failed to load monitors"));
assert!(message
.to_string()
.contains(&invalid_path.display().to_string()));
}
_ => panic!("Expected RepositoryError::LoadError"),
}
}
}