openzeppelin_monitor/services/trigger/
service.rsuse std::{collections::HashMap, path::Path};
use anyhow::Context;
use async_trait::async_trait;
use crate::{
models::{Monitor, MonitorMatch, ScriptLanguage, TriggerTypeConfig},
repositories::{TriggerRepositoryTrait, TriggerService},
services::{notification::NotificationService, trigger::error::TriggerError},
};
#[async_trait]
pub trait TriggerExecutionServiceTrait {
async fn execute(
&self,
trigger_slugs: &[String],
variables: HashMap<String, String>,
monitor_match: &MonitorMatch,
trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
) -> Result<(), TriggerError>;
async fn load_scripts(
&self,
monitors: &[Monitor],
) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError>;
}
pub struct TriggerExecutionService<T: TriggerRepositoryTrait> {
trigger_service: TriggerService<T>,
notification_service: NotificationService,
}
impl<T: TriggerRepositoryTrait> TriggerExecutionService<T> {
pub fn new(
trigger_service: TriggerService<T>,
notification_service: NotificationService,
) -> Self {
Self {
trigger_service,
notification_service,
}
}
}
#[async_trait]
impl<T: TriggerRepositoryTrait + Send + Sync> TriggerExecutionServiceTrait
for TriggerExecutionService<T>
{
async fn execute(
&self,
trigger_slugs: &[String],
variables: HashMap<String, String>,
monitor_match: &MonitorMatch,
trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
) -> Result<(), TriggerError> {
use futures::future::join_all;
let futures = trigger_slugs.iter().map(|trigger_slug| async {
let trigger = self
.trigger_service
.get(trigger_slug)
.ok_or_else(|| TriggerError::not_found(trigger_slug.to_string(), None, None))?;
self.notification_service
.execute(&trigger, variables.clone(), monitor_match, trigger_scripts)
.await
.map_err(|e| TriggerError::execution_error_without_log(e.to_string(), None, None))
});
let results = join_all(futures).await;
let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
if errors.is_empty() {
Ok(())
} else {
Err(TriggerError::execution_error(
format!("Some trigger(s) failed ({} failure(s))", errors.len()),
Some(
TriggerError::execution_error(
format!(
"{:#?}",
errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
),
None,
None,
)
.into(),
),
None,
))
}
}
async fn load_scripts(
&self,
monitors: &[Monitor],
) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError> {
let mut scripts = HashMap::new();
for monitor in monitors {
if monitor.trigger_conditions.is_empty() && monitor.triggers.is_empty() {
continue;
}
for condition in &monitor.trigger_conditions {
let script_path = Path::new(&condition.script_path);
let content = tokio::fs::read_to_string(script_path)
.await
.with_context(|| {
format!("Failed to read script file: {}", condition.script_path)
})?;
scripts.insert(
format!("{}|{}", monitor.name, condition.script_path),
(condition.language.clone(), content),
);
}
for trigger in &monitor.triggers {
let trigger_config =
self.trigger_service.get(trigger.as_str()).ok_or_else(|| {
TriggerError::configuration_error(
format!("Failed to get trigger: {}", trigger),
None,
None,
)
})?;
let TriggerTypeConfig::Script {
language,
script_path,
arguments: _,
timeout_ms: _,
} = &trigger_config.config
else {
continue;
};
let script_path = Path::new(script_path);
let content = tokio::fs::read_to_string(script_path).await.map_err(|e| {
TriggerError::configuration_error(
format!(
"Failed to read script file {}: {}",
script_path.display(),
e
),
None,
None,
)
})?;
scripts.insert(
format!("{}|{}", monitor.name, script_path.display()),
(language.clone(), content),
);
}
}
Ok(scripts)
}
}