openzeppelin_monitor/services/blockwatcher/
storage.rsuse async_trait::async_trait;
use glob::glob;
use std::path::PathBuf;
use crate::models::BlockType;
#[async_trait]
pub trait BlockStorage: Clone + Send + Sync {
async fn get_last_processed_block(
&self,
network_id: &str,
) -> Result<Option<u64>, anyhow::Error>;
async fn save_last_processed_block(
&self,
network_id: &str,
block: u64,
) -> Result<(), anyhow::Error>;
async fn save_blocks(
&self,
network_id: &str,
blocks: &[BlockType],
) -> Result<(), anyhow::Error>;
async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error>;
}
#[derive(Clone)]
pub struct FileBlockStorage {
storage_path: PathBuf,
}
impl FileBlockStorage {
pub fn new(storage_path: PathBuf) -> Self {
FileBlockStorage { storage_path }
}
}
impl Default for FileBlockStorage {
fn default() -> Self {
FileBlockStorage::new(PathBuf::from("data"))
}
}
#[async_trait]
impl BlockStorage for FileBlockStorage {
async fn get_last_processed_block(
&self,
network_id: &str,
) -> Result<Option<u64>, anyhow::Error> {
let file_path = self
.storage_path
.join(format!("{}_last_block.txt", network_id));
if !file_path.exists() {
return Ok(None);
}
let content = tokio::fs::read_to_string(file_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to read last processed block: {}", e))?;
let block_number = content
.trim()
.parse::<u64>()
.map_err(|e| anyhow::anyhow!("Failed to parse last processed block: {}", e))?;
Ok(Some(block_number))
}
async fn save_last_processed_block(
&self,
network_id: &str,
block: u64,
) -> Result<(), anyhow::Error> {
let file_path = self
.storage_path
.join(format!("{}_last_block.txt", network_id));
tokio::fs::write(file_path, block.to_string())
.await
.map_err(|e| anyhow::anyhow!("Failed to save last processed block: {}", e))?;
Ok(())
}
async fn save_blocks(
&self,
network_slug: &str,
blocks: &[BlockType],
) -> Result<(), anyhow::Error> {
let file_path = self.storage_path.join(format!(
"{}_blocks_{}.json",
network_slug,
chrono::Utc::now().timestamp()
));
let json = serde_json::to_string(blocks)
.map_err(|e| anyhow::anyhow!("Failed to serialize blocks: {}", e))?;
tokio::fs::write(file_path, json)
.await
.map_err(|e| anyhow::anyhow!("Failed to save blocks: {}", e))?;
Ok(())
}
async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error> {
let pattern = self
.storage_path
.join(format!("{}_blocks_*.json", network_slug))
.to_string_lossy()
.to_string();
for entry in glob(&pattern)
.map_err(|e| anyhow::anyhow!("Failed to parse blocks: {}", e))?
.flatten()
{
tokio::fs::remove_file(entry)
.await
.map_err(|e| anyhow::anyhow!("Failed to delete blocks: {}", e))?;
}
Ok(())
}
async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error> {
let file_path = self
.storage_path
.join(format!("{}_missed_blocks.txt", network_id));
let mut file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(file_path)
.await
.map_err(|e| anyhow::anyhow!("Failed to create missed block file: {}", e))?;
tokio::io::AsyncWriteExt::write_all(&mut file, format!("{}\n", block).as_bytes())
.await
.map_err(|e| anyhow::anyhow!("Failed to save missed block: {}", e))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile;
#[tokio::test]
async fn test_get_last_processed_block() {
let temp_dir = tempfile::tempdir().unwrap();
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
let existing_file = temp_dir.path().join("existing_last_block.txt");
tokio::fs::write(&existing_file, "100").await.unwrap();
let result = storage.get_last_processed_block("existing").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(100));
let result = storage.get_last_processed_block("non_existent").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), None);
let invalid_file = temp_dir.path().join("invalid_last_block.txt");
tokio::fs::write(&invalid_file, "not a number")
.await
.unwrap();
let result = storage.get_last_processed_block("invalid").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Failed to parse last processed block"));
assert!(err.to_string().contains("invalid"));
let valid_file = temp_dir.path().join("valid_last_block.txt");
tokio::fs::write(&valid_file, "123").await.unwrap();
let result = storage.get_last_processed_block("valid").await;
assert_eq!(result.unwrap(), Some(123));
}
#[tokio::test]
async fn test_save_last_processed_block() {
let temp_dir = tempfile::tempdir().unwrap();
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
let result = storage.save_last_processed_block("test", 100).await;
assert!(result.is_ok());
let content = tokio::fs::read_to_string(temp_dir.path().join("test_last_block.txt"))
.await
.unwrap();
assert_eq!(content, "100");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let readonly_dir = temp_dir.path().join("readonly");
tokio::fs::create_dir(&readonly_dir).await.unwrap();
let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
let readonly_storage = FileBlockStorage::new(readonly_dir);
let result = readonly_storage
.save_last_processed_block("test", 100)
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Failed to save last processed block"));
assert!(err.to_string().contains("Permission denied"));
}
}
#[tokio::test]
async fn test_save_blocks() {
let temp_dir = tempfile::tempdir().unwrap();
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
let result = storage.save_blocks("test", &[]).await;
assert!(result.is_ok());
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let readonly_dir = temp_dir.path().join("readonly");
tokio::fs::create_dir(&readonly_dir).await.unwrap();
let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
let readonly_storage = FileBlockStorage::new(readonly_dir);
let result = readonly_storage.save_blocks("test", &[]).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Failed to save blocks"));
assert!(err.to_string().contains("Permission denied"));
}
}
#[tokio::test]
async fn test_delete_blocks() {
let temp_dir = tempfile::tempdir().unwrap();
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
tokio::fs::write(temp_dir.path().join("test_blocks_1.json"), "[]")
.await
.unwrap();
tokio::fs::write(temp_dir.path().join("test_blocks_2.json"), "[]")
.await
.unwrap();
let result = storage.delete_blocks("test").await;
assert!(result.is_ok());
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let readonly_dir = temp_dir.path().join("readonly");
tokio::fs::create_dir(&readonly_dir).await.unwrap();
tokio::fs::write(readonly_dir.join("test_blocks_1.json"), "[]")
.await
.unwrap();
let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
perms.set_mode(0o555); std::fs::set_permissions(&readonly_dir, perms).unwrap();
let readonly_storage = FileBlockStorage::new(readonly_dir);
let result = readonly_storage.delete_blocks("test").await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Failed to delete blocks"));
assert!(err.to_string().contains("Permission denied"));
}
}
#[tokio::test]
async fn test_save_missed_block() {
let temp_dir = tempfile::tempdir().unwrap();
let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
let result = storage.save_missed_block("test", 100).await;
assert!(result.is_ok());
let content = tokio::fs::read_to_string(temp_dir.path().join("test_missed_blocks.txt"))
.await
.unwrap();
assert_eq!(content, "100\n");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let readonly_dir = temp_dir.path().join("readonly");
tokio::fs::create_dir(&readonly_dir).await.unwrap();
let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
let readonly_storage = FileBlockStorage::new(readonly_dir);
let result = readonly_storage.save_missed_block("test", 100).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Failed to create missed block file"));
assert!(err.to_string().contains("Permission denied"));
}
}
}