Skip to content

Commit

Permalink
update: aws_event_bridge rework
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Dec 19, 2024
1 parent 6e6b991 commit 3d57f39
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 36 deletions.
9 changes: 6 additions & 3 deletions crates/orchestrator/src/cli/cron/event_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use clap::Args;

/// CLI arguments for the aws event bridge.
#[derive(Debug, Clone, Args)]
#[group()]
pub struct AWSEventBridgeCliArgs {
/// Use the AWS Event Bridge client
/// Use the AWS Event Bridge Rule client
#[arg(long)]
pub aws_event_bridge: bool,
pub aws_event_bridge_rule: bool,

/// Use the AWS Event Bridge Schedule client
#[arg(long)]
pub aws_event_bridge_schedule: bool,

/// The name of the queue for the event bridge
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara_orchestrator_worker_trigger_queue"), help = "The name of the SNS queue to send messages to from the event bridge.")]
Expand Down
20 changes: 16 additions & 4 deletions crates/orchestrator/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl RunCmd {
),
group(
ArgGroup::new("cron")
.args(&["aws_event_bridge"])
.args(&["aws_event_bridge_rule", "aws_event_bridge_schedule"])
.required(true)
.multiple(false)
.requires("provider")
Expand Down Expand Up @@ -336,7 +336,7 @@ pub mod validate_params {
use crate::alerts::aws_sns::AWSSNSValidatedArgs;
use crate::cli::prover_layout::ProverLayoutCliArgs;
use crate::config::ServiceParams;
use crate::cron::event_bridge::AWSEventBridgeValidatedArgs;
use crate::cron::event_bridge::{AWSEventBridgeValidatedArgs, EventBridgeType};
use crate::data_storage::aws_s3::AWSS3ValidatedArgs;
use crate::database::mongodb::MongoDBValidatedArgs;
use crate::queue::sqs::AWSSQSValidatedArgs;
Expand Down Expand Up @@ -403,8 +403,19 @@ pub mod validate_params {
aws_event_bridge_args: &AWSEventBridgeCliArgs,
aws_config_args: &AWSConfigCliArgs,
) -> Result<CronValidatedArgs, String> {
if aws_event_bridge_args.aws_event_bridge && aws_config_args.aws {
if (aws_event_bridge_args.aws_event_bridge_rule || aws_event_bridge_args.aws_event_bridge_schedule)
&& aws_config_args.aws
{
let cron_type = if aws_event_bridge_args.aws_event_bridge_rule {
EventBridgeType::Rule
} else if aws_event_bridge_args.aws_event_bridge_schedule {
EventBridgeType::Schedule
} else {
panic!("No Cron service selected!")
};

Ok(CronValidatedArgs::AWSEventBridge(AWSEventBridgeValidatedArgs {
cron_type,
target_queue_name: aws_event_bridge_args
.target_queue_name
.clone()
Expand Down Expand Up @@ -841,7 +852,8 @@ pub mod validate_params {
#[case(false)]
fn test_validate_cron_params(#[case] is_aws: bool) {
let aws_event_bridge_args: AWSEventBridgeCliArgs = AWSEventBridgeCliArgs {
aws_event_bridge: is_aws,
aws_event_bridge_rule: is_aws,
aws_event_bridge_schedule: !is_aws,
target_queue_name: Some(String::from("test")),
cron_time: Some(String::from("12")),
trigger_rule_name: Some(String::from("test")),
Expand Down
105 changes: 77 additions & 28 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::time::Duration;

use async_std::task::sleep;
use async_trait::async_trait;
use aws_config::SdkConfig;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target as EventBridgeTarget};
use aws_sdk_scheduler::types::{FlexibleTimeWindow, FlexibleTimeWindowMode, Target};
use aws_sdk_scheduler::Client as SchedulerClient;
use aws_sdk_sqs::types::QueueAttributeName;
use aws_sdk_sqs::Client as SqsClient;
use color_eyre::eyre::Ok;
Expand All @@ -13,8 +12,21 @@ use super::{get_worker_trigger_message, TriggerArns};
use crate::cron::Cron;
use crate::queue::job_queue::WorkerTriggerType;

#[derive(Clone, Debug)]
pub enum EventBridgeType {
Rule,
Schedule,
}

#[derive(Clone, Debug)]
enum EventBridgeClient {
Rule(aws_sdk_eventbridge::Client),
Schedule(aws_sdk_scheduler::Client),
}

#[derive(Clone, Debug)]
pub struct AWSEventBridgeValidatedArgs {
pub cron_type: EventBridgeType,
pub target_queue_name: String,
pub cron_time: Duration,
pub trigger_rule_name: String,
Expand All @@ -26,7 +38,7 @@ pub struct AWSEventBridge {
target_queue_name: String,
cron_time: Duration,
trigger_rule_name: String,
client: SchedulerClient,
client: EventBridgeClient,
queue_client: SqsClient,
iam_client: aws_sdk_iam::Client,
trigger_role_name: String,
Expand All @@ -35,11 +47,16 @@ pub struct AWSEventBridge {

impl AWSEventBridge {
pub fn new_with_args(params: &AWSEventBridgeValidatedArgs, aws_config: &SdkConfig) -> Self {
let client = match params.cron_type {
EventBridgeType::Rule => EventBridgeClient::Rule(aws_sdk_eventbridge::Client::new(aws_config)),
EventBridgeType::Schedule => EventBridgeClient::Schedule(aws_sdk_scheduler::Client::new(aws_config)),
};

Self {
target_queue_name: params.target_queue_name.clone(),
cron_time: params.cron_time,
trigger_rule_name: params.trigger_rule_name.clone(),
client: aws_sdk_scheduler::Client::new(aws_config),
client,
queue_client: aws_sdk_sqs::Client::new(aws_config),
iam_client: aws_sdk_iam::Client::new(aws_config),
trigger_role_name: params.trigger_role_name.clone(),
Expand All @@ -66,6 +83,7 @@ impl Cron for AWSEventBridge {

// Create IAM role for EventBridge
let role_name = format!("{}-{}", self.trigger_role_name, uuid::Uuid::new_v4());
// TODO: might need to change this accordingly to support rule, skipping for now
let assume_role_policy = r#"{
"Version": "2012-10-17",
"Statement": [{
Expand Down Expand Up @@ -113,7 +131,7 @@ impl Cron for AWSEventBridge {
// Attach the policy to the role
self.iam_client.attach_role_policy().role_name(&role_name).policy_arn(&policy_arn).send().await?;

sleep(Duration::from_secs(60)).await;
// sleep(Duration::from_secs(60)).await;

Ok(TriggerArns { queue_arn: queue_arn.to_string(), role_arn: role_arn.to_string() })
}
Expand All @@ -123,30 +141,61 @@ impl Cron for AWSEventBridge {
trigger_type: &WorkerTriggerType,
trigger_arns: &TriggerArns,
) -> color_eyre::Result<()> {
let trigger_name = format!("{}-{}", self.trigger_rule_name, trigger_type);

// Set flexible time window (you can adjust this as needed)
let flexible_time_window = FlexibleTimeWindow::builder().mode(FlexibleTimeWindowMode::Off).build()?;

let message = get_worker_trigger_message(trigger_type.clone())?;

// Create target for SQS queue
let target = Target::builder()
.arn(trigger_arns.queue_arn.clone())
.role_arn(trigger_arns.role_arn.clone())
.input(message)
.build()?;

// Create the schedule
self.client
.create_schedule()
.name(trigger_name)
.schedule_expression_timezone("UTC")
.flexible_time_window(flexible_time_window)
.schedule_expression(duration_to_rate_string(self.cron_time))
.target(target)
.send()
.await?;
let trigger_name = format!("{}-{}", self.trigger_rule_name, trigger_type);
println!("trigger_nametrigger_nametrigger_name {}", trigger_name);

match self.client.clone() {
EventBridgeClient::Rule(client) => {
let input_transformer =
InputTransformer::builder().input_paths_map("time", "$.time").input_template(message).build()?;

client
.put_rule()
.name(trigger_name.clone())
.schedule_expression("rate(1 minute)")
.state(RuleState::Enabled)
.send()
.await?;

client
.put_targets()
.rule(trigger_name.clone())
.targets(
EventBridgeTarget::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(trigger_arns.queue_arn.clone())
.input_transformer(input_transformer.clone())
.build()?,
)
.send()
.await?;
}
EventBridgeClient::Schedule(client) => {
// Set flexible time window (you can adjust this as needed)
let flexible_time_window = FlexibleTimeWindow::builder().mode(FlexibleTimeWindowMode::Off).build()?;

let message = get_worker_trigger_message(trigger_type.clone())?;

// Create target for SQS queue
let target = Target::builder()
.arn(trigger_arns.queue_arn.clone())
.role_arn(trigger_arns.role_arn.clone())
.input(message)
.build()?;

// Create the schedule
client
.create_schedule()
.name(trigger_name)
.schedule_expression_timezone("UTC")
.flexible_time_window(flexible_time_window)
.schedule_expression(duration_to_rate_string(self.cron_time))
.target(target)
.send()
.await?;
}
};

Ok(())
}
Expand Down

0 comments on commit 3d57f39

Please sign in to comment.