-
Notifications
You must be signed in to change notification settings - Fork 6
/
mapr_tasks_dag.py
115 lines (95 loc) · 4.02 KB
/
mapr_tasks_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
###
Sample DAG, which declares MapR Spark, Spark SQL and Hive tasks.
"""
import airflow
import json
import os
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.hooks.hive_hooks import HiveCliHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1, hour=1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'provide_context': True
}
dag = DAG(
'mapr_tasks_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
get_last_commit_task = SimpleHttpOperator(
task_id='get_last_commit_task',
http_conn_id='http_github',
endpoint='/repos/mapr-demos/mapr-music/commits/master',
method='GET',
xcom_push=True,
dag=dag)
def query_hive(**kwargs):
ti = kwargs['ti']
# get sha of latest commit
v1 = ti.xcom_pull(key=None, task_ids='get_last_commit_task')
json_value = json.loads(v1)
sha = json_value['sha']
hive_cli = HiveCliHook()
hql = "select * from mapr_music_updates where commit_sha = '" + sha + "';"
latest_commit = hive_cli.run_cli(hql)
changed = latest_commit.find(sha) == -1
ti.xcom_push(key='sha', value=sha)
ti.xcom_push(key='is_changed', value=changed)
return 'reimport_dataset_task' if changed else 'skip_reimport_dataset_task'
check_last_commit_task = BranchPythonOperator(
task_id='check_last_commit_task',
python_callable=query_hive,
dag=dag)
reimport_dataset_task = BashOperator(
task_id='reimport_dataset_task',
bash_command="""rm -rf ~/mapr-music;
( cd ~ ; git clone https://github.com/mapr-demos/mapr-music );
~/mapr-music/bin/import-dataset.sh --path ~/mapr-music/dataset/ --recreate""",
dag=dag)
spark_compute_statistics_task = SparkSubmitOperator(
task_id='spark_compute_statistics_task',
application=os.environ['MAPR_DAG_SPARK_JOB_PATH'],
java_class='com.mapr.example.StatisticsJob',
application_args=["/apps/mapr-airflow",
"{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}"],
dag=dag)
insert_reimport_record = HiveOperator(
task_id='insert_reimport_record',
hql="insert into table mapr_music_updates values ('{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}', '/apps/mapr-airflow/{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}');",
dag=dag)
skip_reimport_dataset_task = DummyOperator(task_id='skip_reimport_dataset_task',
dag=dag)
finish_task = DummyOperator(task_id='skip_reimport_dataset_task', dag=dag)
drill_artist_albums_task = BashOperator(
task_id='drill_artist_albums_task',
bash_command=os.environ[
'MAPR_DAG_DRILL_SCRIPT_PATH'] + " table{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}",
dag=dag)
spark_top_artists_task = SparkSubmitOperator(
task_id='spark_top_artists_task',
application=os.environ['MAPR_DAG_SPARK_JOB_PATH'],
java_class='com.mapr.example.TopArtistsJob',
application_args=[
"/tmp/table{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}",
"/apps/mapr-airflow",
"{{ task_instance.xcom_pull(key='sha', task_ids='check_last_commit_task') }}"],
dag=dag)
get_last_commit_task >> check_last_commit_task >> reimport_dataset_task >> \
drill_artist_albums_task >> spark_top_artists_task >> \
spark_compute_statistics_task >> insert_reimport_record
check_last_commit_task >> skip_reimport_dataset_task
dag.doc_md = __doc__