Use Amazon Managed Workflows for Apache Airflow (MWAA) to design and run a serverless workflow that coordinates Amazon Elastic Map Reduce (EMR) jobs. Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto.
Apache Airflow 1.10.12 on Amazon MWAA
Copy the file into your DAGs folder, and ensure you have connectivity to Amazon EMR. For the latter, When using an AWS IAM role to connect to Amazon EMR, either with Amazon MWAA’s Execution Role or an assumed role in Amazon EC2, you will need to provide AmazonElasticMapReduceFullAccess Policy access to that role via the AWS IAM console. You will also need Amazon EMR configured on your AWS account.
If using the 2.0 version on 1.10.12 use:
None.
All DAGs begin with the default import line:
from airflow import DAG
Then we will import the operators and sensors we will use in our workflow:
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
We’ll then import a couple of time utilities
from airflow.utils.dates import days_ago
from datetime import timedelta
Now we’ll define some default arguments to use in our tasks:
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
}
Next we’ll define what our EMR cluster should look like. This will use the mykeypair Key Pair, EMR_DefaultRole, and EMR_EC2_DefaultRole created above
JOB_FLOW_OVERRIDES = {
'Name': 'my-demo-cluster',
'ReleaseLabel': 'emr-5.30.1',
'Applications': [
{
'Name': 'Spark'
},
],
'Instances': {
'InstanceGroups': [
{
'Name': "Master nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1,
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 2,
}
],
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2KeyName': 'mykeypair',
},
'VisibleToAllUsers': True,
'JobFlowRole': 'EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole'
}
We’ll also need to define the EMR job step we wish to run. In this case we’ll use the built-in calculate_pi sample
SPARK_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]
Next we’ll create our DAG object:
with DAG(
dag_id='emr-job-dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval='@once',
tags=['emr'],
) as dag:
We’ll define an EmrCreateJobFlowOperator that will create a new cluster
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
emr_conn_id='aws_default',
job_flow_overrides=JOB_FLOW_OVERRIDES
)
Then an EmrAddStepsOperator to add our calculate_pi step
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
And an EmrStepSensor that will wait for our step to complete
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
Finally we have to tell our DAG what order to run our operators
cluster_creator >> step_adder >> step_checker
See CONTRIBUTING for more information.
This library is licensed under the MIT-0 License. See the LICENSE file.