Skip to content

Commit

Permalink
Feat : setup scripts (madara-alliance#181)
Browse files Browse the repository at this point in the history
* feat : added basic scripts for setup and functions

* changelog

* chore : refactor

* chore : refactor implementation

* chore : refactored code according to comments

* feat : refactor

* feat : refactor cron provider
ocdbytes authored Nov 12, 2024
1 parent c66c0f3 commit a9d0e30
Showing 17 changed files with 436 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- setup functions added for cloud and db
- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-credential-types = { version = "1.2.1", features = [
"hardcoded-credentials",
] }
aws-sdk-eventbridge.workspace = true
aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = { workspace = true }
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
@@ -32,4 +32,11 @@ impl Alerts for AWSSNS {
self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?;
Ok(())
}

async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> {
let response = self.client.create_topic().name(topic_name).send().await?;
let topic_arn = response.topic_arn().expect("Topic Not found");
log::info!("SNS topic created. Topic ARN: {}", topic_arn);
Ok(())
}
}
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use mockall::automock;
use utils::settings::Settings;

pub mod aws_sns;

@@ -8,4 +9,10 @@ pub mod aws_sns;
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> color_eyre::Result<()> {
let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME");
self.create_alert(&sns_topic_name).await?;
Ok(())
}
}
111 changes: 111 additions & 0 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;

use crate::cron::Cron;
use crate::setup::SetupConfig;

pub struct AWSEventBridge {}

#[async_trait]
#[allow(unreachable_patterns)]
impl Cron for AWSEventBridge {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
event_bridge_client
.put_rule()
.name(&trigger_rule_name)
.schedule_expression(duration_to_rate_string(cron_time))
.state(RuleState::Enabled)
.send()
.await?;

Ok(())
}
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
let sqs_client = aws_sdk_sqs::Client::new(config);
let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?;

event_bridge_client
.put_targets()
.rule(trigger_rule_name)
.targets(
Target::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

fn duration_to_rate_string(duration: Duration) -> String {
let total_secs = duration.as_secs();
let total_mins = duration.as_secs() / 60;
let total_hours = duration.as_secs() / 3600;
let total_days = duration.as_secs() / 86400;

if total_days > 0 {
format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" })
} else if total_hours > 0 {
format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" })
} else if total_mins > 0 {
format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" })
} else {
format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" })
}
}

#[cfg(test)]
mod event_bridge_utils_test {
use rstest::rstest;

use super::*;

#[rstest]
fn test_duration_to_rate_string() {
assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)");
assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)");
assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)");
assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)");
assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)");
}
}
57 changes: 57 additions & 0 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use async_trait::async_trait;
use lazy_static::lazy_static;

use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType};
use crate::setup::SetupConfig;

pub mod event_bridge;

lazy_static! {
pub static ref CRON_DURATION: Duration = Duration::from_mins(1);
// TODO : we can take this from clap.
pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue");
pub static ref WORKER_TRIGGERS: Vec<WorkerTriggerType> = vec![
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState
];
pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled");
}

#[async_trait]
pub trait Cron {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> {
self.create_cron(&config, *CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?;
for triggers in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(
&config,
TARGET_QUEUE_NAME.clone(),
get_worker_trigger_message(triggers.clone())?,
WORKER_TRIGGER_RULE_NAME.clone(),
)
.await?;
}
Ok(())
}
}

fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result<String> {
let message = WorkerTriggerMessage { worker: worker_trigger_type };
Ok(serde_json::to_string(&message)?)
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ impl DataStorage for AWSS3 {
Ok(())
}

async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> {
async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
self.client.create_bucket().bucket(bucket_name).send().await?;
Ok(())
}
6 changes: 5 additions & 1 deletion crates/orchestrator/src/data_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,11 @@ use utils::settings::Settings;
pub trait DataStorage: Send + Sync {
async fn get_data(&self, key: &str) -> Result<Bytes>;
async fn put_data(&self, data: Bytes, key: &str) -> Result<()>;
async fn build_test_bucket(&self, bucket_name: &str) -> Result<()>;
async fn create_bucket(&self, bucket_name: &str) -> Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> Result<()> {
let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME");
self.create_bucket(&bucket_name).await
}
}

/// **DataStorageConfig** : Trait method to represent the config struct needed for
5 changes: 5 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![feature(duration_constructors)]

/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
/// Controllers for the routes
pub mod controllers;
pub mod cron;
/// Contains the trait that implements the fetching functions
/// for blob and SNOS data from cloud for a particular block.
pub mod data_storage;
@@ -20,6 +23,8 @@ pub mod metrics;
pub mod queue;
/// Contains the routes for the service
pub mod routes;
/// Contains setup functions to set up db and cloud.
pub mod setup;
/// Contains telemetry collection services. (Metrics/Logs/Traces)
pub mod telemetry;
#[cfg(test)]
81 changes: 80 additions & 1 deletion crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -6,11 +6,82 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::Result as EyreResult;
use lazy_static::lazy_static;
use mockall::automock;
use omniqueue::{Delivery, QueueError};

use crate::config::Config;
use crate::jobs::JobError;
use crate::setup::SetupConfig;

#[derive(Clone)]
pub struct DlqConfig<'a> {
pub max_receive_count: i32,
pub dlq_name: &'a str,
}

#[derive(Clone)]
pub struct QueueConfig<'a> {
pub name: String,
pub visibility_timeout: i32,
pub dlq_config: Option<DlqConfig<'a>>,
}

lazy_static! {
pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue");
pub static ref QUEUES: Vec<QueueConfig<'static>> = vec![
QueueConfig {
name: String::from("madara_orchestrator_snos_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_snos_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_job_handle_failure_queue"),
visibility_timeout: 300,
dlq_config: None
},
QueueConfig {
name: String::from("madara_orchestrator_worker_trigger_queue"),
visibility_timeout: 300,
dlq_config: None
},
];
}

/// Queue Provider Trait
///
@@ -21,7 +92,15 @@ use crate::jobs::JobError;
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError>;
async fn consume_message_from_queue(&self, queue: String) -> Result<Delivery, QueueError>;
async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> EyreResult<()>;
async fn setup(&self, config: SetupConfig) -> EyreResult<()> {
// Creating the queues :
for queue in QUEUES.iter() {
self.create_queue(queue, &config).await?;
}
Ok(())
}
}

pub async fn init_consumers(config: Arc<Config>) -> Result<(), JobError> {
60 changes: 59 additions & 1 deletion crates/orchestrator/src/queue/sqs/mod.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,9 @@ use std::collections::HashMap;
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_sqs::types::QueueAttributeName;
use aws_sdk_sqs::Client;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use lazy_static::lazy_static;
use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer};
@@ -14,7 +17,9 @@ use crate::queue::job_queue::{
PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE,
UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE,
};
use crate::queue::QueueProvider;
use crate::queue::{QueueConfig, QueueProvider};
use crate::setup::SetupConfig;

pub struct SqsQueue;

lazy_static! {
@@ -35,6 +40,7 @@ lazy_static! {
]);
}

#[allow(unreachable_patterns)]
#[async_trait]
impl QueueProvider for SqsQueue {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> Result<()> {
@@ -54,6 +60,58 @@ impl QueueProvider for SqsQueue {
let mut consumer = get_consumer(queue_url).await?;
consumer.receive().await
}

async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported SQS configuration"),
};
let sqs_client = Client::new(config);
let res = sqs_client.create_queue().queue_name(&queue_config.name).send().await?;
let queue_url = res.queue_url().ok_or_else(|| eyre!("Not able to get queue url from result"))?;

let mut attributes = HashMap::new();
attributes.insert(QueueAttributeName::VisibilityTimeout, queue_config.visibility_timeout.to_string());

if let Some(dlq_config) = &queue_config.dlq_config {
let dlq_url = Self::get_queue_url_from_client(dlq_config.dlq_name, &sqs_client).await?;
let dlq_arn = Self::get_queue_arn(&sqs_client, &dlq_url).await?;
let policy = format!(
r#"{{"deadLetterTargetArn":"{}","maxReceiveCount":"{}"}}"#,
dlq_arn, &dlq_config.max_receive_count
);
attributes.insert(QueueAttributeName::RedrivePolicy, policy);
}

sqs_client.set_queue_attributes().queue_url(queue_url).set_attributes(Some(attributes)).send().await?;

Ok(())
}
}

impl SqsQueue {
/// To get the queue url from the given queue name
async fn get_queue_url_from_client(queue_name: &str, sqs_client: &Client) -> Result<String> {
Ok(sqs_client
.get_queue_url()
.queue_name(queue_name)
.send()
.await?
.queue_url()
.ok_or_else(|| eyre!("Unable to get queue url from the given queue_name."))?
.to_string())
}

async fn get_queue_arn(client: &Client, queue_url: &str) -> Result<String> {
let attributes = client
.get_queue_attributes()
.queue_url(queue_url)
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;

Ok(attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap().to_string())
}
}

/// To fetch the queue URL from the environment variables
97 changes: 97 additions & 0 deletions crates/orchestrator/src/setup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::process::Command;
use std::sync::Arc;

use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::{from_env, Region, SdkConfig};
use aws_credential_types::provider::ProvideCredentials;
use utils::env_utils::get_env_var_or_panic;
use utils::settings::env::EnvSettingsProvider;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::config::{get_aws_config, ProviderConfig};
use crate::cron::Cron;
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::DataStorage;
use crate::queue::QueueProvider;

#[derive(Clone)]
pub enum SetupConfig {
AWS(SdkConfig),
}

pub enum ConfigType {
AWS,
}

async fn setup_config(client_type: ConfigType) -> SetupConfig {
match client_type {
ConfigType::AWS => {
let region_provider = Region::new(get_env_var_or_panic("AWS_REGION"));
let creds = EnvironmentVariableCredentialsProvider::new().provide_credentials().await.unwrap();
SetupConfig::AWS(from_env().region(region_provider).credentials_provider(creds).load().await)
}
}
}

// TODO : move this to main.rs after moving to clap.
pub async fn setup_cloud() -> color_eyre::Result<()> {
log::info!("Setting up cloud.");
let settings_provider = EnvSettingsProvider {};
let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await)));

log::info!("Setting up data storage.");
match get_env_var_or_panic("DATA_STORAGE").as_str() {
"s3" => {
let s3 = Box::new(AWSS3::new_with_settings(&settings_provider, provider_config.clone()).await);
s3.setup(Box::new(settings_provider.clone())).await?
}
_ => panic!("Unsupported Storage Client"),
}
log::info!("Data storage setup completed ✅");

log::info!("Setting up queues");
match get_env_var_or_panic("QUEUE_PROVIDER").as_str() {
"sqs" => {
let config = setup_config(ConfigType::AWS).await;
let sqs = Box::new(crate::queue::sqs::SqsQueue {});
sqs.setup(config).await?
}
_ => panic!("Unsupported Queue Client"),
}
log::info!("Queues setup completed ✅");

log::info!("Setting up cron");
match get_env_var_or_panic("CRON_PROVIDER").as_str() {
"event_bridge" => {
let config = setup_config(ConfigType::AWS).await;
let event_bridge = Box::new(crate::cron::event_bridge::AWSEventBridge {});
event_bridge.setup(config).await?
}
_ => panic!("Unsupported Event Bridge Client"),
}
log::info!("Cron setup completed ✅");

log::info!("Setting up alerts.");
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => {
let sns = Box::new(AWSSNS::new_with_settings(&settings_provider, provider_config).await);
sns.setup(Box::new(settings_provider)).await?
}
_ => panic!("Unsupported Alert Client"),
}
log::info!("Alerts setup completed ✅");

Ok(())
}

pub async fn setup_db() -> color_eyre::Result<()> {
// We run the js script in the folder root:
log::info!("Setting up database.");

Command::new("node").arg("migrate-mongo-config.js").output()?;

log::info!("Database setup completed ✅");

Ok(())
}
4 changes: 1 addition & 3 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
@@ -367,9 +367,7 @@ pub mod implement_client {
ConfigType::Actual => {
let storage = get_storage_client(provider_config).await;
match get_env_var_or_panic("DATA_STORAGE").as_str() {
"s3" => {
storage.as_ref().build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap()
}
"s3" => storage.as_ref().create_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(),
_ => panic!("Unsupported Storage Client"),
}
storage
2 changes: 1 addition & 1 deletion crates/utils/src/settings/mod.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,6 @@ pub enum SettingsProviderError {
Internal(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
}

pub trait Settings {
pub trait Settings: Send {
fn get_settings_or_panic(&self, name: &'static str) -> String;
}
2 changes: 1 addition & 1 deletion e2e-tests/tests.rs
Original file line number Diff line number Diff line change
@@ -437,6 +437,6 @@ pub async fn put_job_data_in_db_proving(mongo_db: &MongoDbServer, l2_block_numbe
/// To set up s3 files needed for e2e test (test_orchestrator_workflow)
#[allow(clippy::borrowed_box)]
pub async fn setup_s3(s3_client: &Box<dyn DataStorage + Send + Sync>) -> color_eyre::Result<()> {
s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap();
s3_client.create_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap();
Ok(())
}

0 comments on commit a9d0e30

Please sign in to comment.