8
8
import requests
9
9
import xarray as xr
10
10
from airflow .decorators import task
11
+ from airflow .exceptions import AirflowFailException
11
12
from airflow .operators .empty import EmptyOperator
12
13
from airflow .sensors .external_task import ExternalTaskSensor
13
14
from confluent_kafka import Consumer , KafkaException
37
38
38
39
"email_on_failure" : False ,
39
40
"email_on_retry" : False ,
40
- "retries" : 1 ,
41
- "retry_delay" : pendulum .duration (minutes = 5 ),
41
+ "retries" : 5 ,
42
+ "retry_delay" : pendulum .duration (minutes = 1 ),
42
43
}
43
44
44
45
@@ -192,7 +193,8 @@ def df_to_db(df, engine, table_name):
192
193
dtype = {"location" : Geography (geometry_type = "POINT" , srid = 4326 )},
193
194
)
194
195
entry_id = df ["valid_time" ].unique ()
195
- entry_id = entry_id [0 ].strftime ("%Y-%m-%d %H:%M:%S" )
196
+ entry_id_datetime = pd .to_datetime (entry_id [0 ])
197
+ entry_id_str = entry_id_datetime .strftime ("%Y-%m-%d %H:%M:%S" )
196
198
print (f"Successfully wrote grib2 file for { entry_id } " )
197
199
except SQLAlchemyError as e :
198
200
print (f"An error occurred: { e } " )
@@ -207,30 +209,65 @@ def df_to_db(df, engine, table_name):
207
209
is_paused_upon_creation = False ,
208
210
) as dag :
209
211
210
- # ExternalTaskSensor to wait for gefs_wave_urls_to_kafka DAG to complete
211
- wait_for_gefs_wave_urls_to_kafka = ExternalTaskSensor (
212
- task_id = "wait_for_gefs_wave_urls_to_kafka" ,
213
- external_dag_id = "gefs_wave_urls_to_kafka" ,
214
- external_task_id = None , # `None` will wait for the entire task to complete
215
- timeout = 7200 , # Timeout before failing task
216
- poke_interval = 60 , # Seconds to wait between checks
217
- mode = "reschedule" , # Reschedule for long waits to free up worker slots
218
- )
219
-
220
- @task
221
- def consume_and_process_from_kafka ():
222
- consume_from_kafka (
223
- topic = topic ,
224
- engine = engine ,
225
- table_name = table_name ,
226
- bs = 8 ,
227
- sasl_username = sasl_username ,
228
- sasl_password = sasl_password ,
229
- )
212
+ def taskflow ():
213
+ conf = {
214
+ "bootstrap.servers" : "kafka:9092" ,
215
+ "group.id" : "airflow-consumers" ,
216
+ "enable.auto.commit" : False ,
217
+ "auto.offset.reset" : "earliest" , # consume from the start of topic
218
+ "security.protocol" : "SASL_PLAINTEXT" ,
219
+ "sasl.mechanisms" : "PLAIN" ,
220
+ "sasl.username" : sasl_username ,
221
+ "sasl.password" : sasl_password ,
222
+ "max.poll.interval.ms" : 900000 ,
223
+ }
224
+
225
+ # # ExternalTaskSensor to wait for gefs_wave_urls_to_kafka DAG to complete
226
+ # wait_for_gefs_wave_urls_to_kafka = ExternalTaskSensor(
227
+ # task_id="wait_for_gefs_wave_urls_to_kafka",
228
+ # external_dag_id="gefs_wave_urls_to_kafka",
229
+ # external_task_id=None, # `None` will wait for the entire task to complete
230
+ # timeout=7200, # Timeout before failing task
231
+ # poke_interval=60, # Seconds to wait between checks
232
+ # mode="reschedule", # Reschedule for long waits to free up worker slots
233
+ # )
234
+
235
+ @task
236
+ def check_for_messages ():
237
+ c = Consumer (conf )
238
+ c .subscribe ([topic ])
239
+ logging .info (f"{ conf } " )
240
+ # Poll for messages
241
+ msg = c .poll (30.0 )
242
+ c .close ()
243
+
244
+ if msg is None :
245
+ logging .info ("No new messages found. Task will be retried." )
246
+ raise AirflowFailException (
247
+ "No new messages found. Task will be explicitly failed to trigger retry."
248
+ )
249
+ else :
250
+ logging .info ("New messages found. Proceeding to consume and process." )
251
+ return True
252
+
253
+ @task
254
+ def consume_and_process_from_kafka ():
255
+ consume_from_kafka (
256
+ topic = topic ,
257
+ engine = engine ,
258
+ table_name = table_name ,
259
+ bs = 8 ,
260
+ sasl_username = sasl_username ,
261
+ sasl_password = sasl_password ,
262
+ )
263
+
264
+ check_result = check_for_messages ()
265
+ consume_task = consume_and_process_from_kafka ()
266
+ check_result >> consume_task
230
267
231
- data = consume_and_process_from_kafka ()
268
+ # wait_for_gefs_wave_urls_to_kafka >> data
232
269
233
- wait_for_gefs_wave_urls_to_kafka >> data
270
+ dag = taskflow ()
234
271
235
272
if __name__ == "__main__" :
236
273
dag .test ()
0 commit comments