|
1 |
| -import logging |
2 |
| -import os |
3 |
| -import re |
4 |
| -import tempfile |
5 |
| - |
6 |
| -import pandas as pd |
7 |
| -import pendulum |
8 |
| -import requests |
9 |
| -import xarray as xr |
10 |
| -from airflow.decorators import task |
11 |
| -from confluent_kafka import Consumer, KafkaException |
12 |
| -from geoalchemy2 import WKTElement |
13 |
| -from geoalchemy2.types import Geography |
14 |
| -from shapely.geometry import Point |
15 |
| -from sqlalchemy import create_engine |
16 |
| -from sqlalchemy.exc import SQLAlchemyError |
17 |
| - |
18 |
| -from airflow import DAG |
19 |
| - |
20 |
| -sasl_username = os.environ.get("KAFKA_DEFAULT_USERS") |
21 |
| -sasl_password = os.environ.get("KAFKA_DEFAULT_PASSWORDS") |
22 |
| - |
23 |
| -DATABASE_URL = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_CONN") |
24 |
| -engine = create_engine(DATABASE_URL) |
25 |
| - |
26 |
| -table_name = "wave_forecast" |
27 |
| -topic = "gefs_wave_urls" |
28 |
| - |
29 |
| -start_date = pendulum.datetime(2024, 1, 1) |
30 |
| - |
31 |
| -default_args = { |
32 |
| - "owner": "airflow", |
33 |
| - "depends_on_past": False, |
34 |
| - "start_date": start_date, |
35 |
| - |
36 |
| - "email_on_failure": False, |
37 |
| - "email_on_retry": False, |
38 |
| - "retries": 1, |
39 |
| - "retry_delay": pendulum.duration(minutes=5), |
40 |
| -} |
41 |
| - |
42 |
| - |
43 |
| -# Will need to revisit this in the future. This is very basic fault handling, |
44 |
| -# where a single url runs through at a time, such that if there is a failure, |
45 |
| -# it will not be committed to the offset and a retry will resume at the correct message |
46 |
| -@task |
47 |
| -def consume_from_kafka( |
48 |
| - topic, engine, table_name, bs=1, sasl_username=sasl_username, sasl_password=sasl_password |
49 |
| -): |
50 |
| - """ |
51 |
| - Consume messages from a Kafka topic. |
52 |
| -
|
53 |
| - Args: |
54 |
| - topic (str): The name of the Kafka topic to consume from. |
55 |
| - bs (int, optional): The batch size of messages to consume at once. Defaults to 1. |
56 |
| -
|
57 |
| - Returns: |
58 |
| - list: A list of consumed messages. |
59 |
| -
|
60 |
| - Raises: |
61 |
| - KafkaException: If there is an error consuming from the Kafka topic. |
62 |
| - """ |
63 |
| - conf = { |
64 |
| - "bootstrap.servers": "kafka:9092", |
65 |
| - "group.id": "airflow-consumers", |
66 |
| - "enable.auto.commit": False, |
67 |
| - "auto.offset.reset": "earliest", # consume from the start of topic |
68 |
| - "security.protocol": "SASL_PLAINTEXT", |
69 |
| - "sasl.mechanisms": "PLAIN", |
70 |
| - "sasl.username": sasl_username, |
71 |
| - "sasl.password": sasl_password, |
72 |
| - } |
73 |
| - |
74 |
| - c = Consumer(conf) |
75 |
| - |
76 |
| - c.subscribe([topic]) |
77 |
| - try: |
78 |
| - pattern = re.compile( |
79 |
| - r"https://nomads\.ncep\.noaa\.gov/pub/data/nccf/com/gens/prod/gefs\.(\d{8})/00/" |
80 |
| - ) |
81 |
| - |
82 |
| - processed_count = 0 |
83 |
| - while True: |
84 |
| - msg = c.poll(9.0) |
85 |
| - if msg is None: |
86 |
| - logging.info(f"No more messages in topic {topic}") |
87 |
| - break |
88 |
| - if msg.error(): |
89 |
| - logging.error(f"Error consuming from topic {topic}: {msg.error()}") |
90 |
| - raise KafkaException(msg.error()) |
91 |
| - else: |
92 |
| - message = msg.value().decode("utf-8") |
93 |
| - match = pattern.match(message) |
94 |
| - date = match.group(1) |
95 |
| - current_date = pendulum.now().format("YYYYMMDD") |
96 |
| - if date == current_date and processed_count < bs: |
97 |
| - logging.info(f"Beginning processing of {message}") |
98 |
| - df = url_to_df(message) |
99 |
| - df_to_db(df, engine, table_name) |
100 |
| - # Commit the offset after successful processing |
101 |
| - c.commit() |
102 |
| - logging.info(f"Consumed and processed message from {topic}") |
103 |
| - processed_count += 1 |
104 |
| - else: |
105 |
| - c.commit() |
106 |
| - logging.info(f"Skipping processing and updating offset for: {message}") |
107 |
| - |
108 |
| - finally: |
109 |
| - c.close() |
110 |
| - logging.info("Consumer closed") |
111 |
| - |
112 |
| - |
113 |
| -def url_to_df(url): |
114 |
| - """ |
115 |
| - Fetches data from the specified URL and returns it as a pandas |
116 |
| - DataFrame. Xarray is used as an intermediary to utilize decoding with `cfgrib`. |
117 |
| - Rows with no meteorological data are dropped to decrease extra load. |
118 |
| -
|
119 |
| - Args: |
120 |
| - target (str): The target URL to fetch the data from. |
121 |
| -
|
122 |
| - Returns: |
123 |
| - pandas.DataFrame: The fetched data as a pandas DataFrame, with NaN |
124 |
| - swell values dropped and index reset. |
125 |
| - """ |
126 |
| - response = requests.get(f"{url}") |
127 |
| - if response.status_code == 200: |
128 |
| - with tempfile.NamedTemporaryFile() as tmp: |
129 |
| - tmp.write(response.content) |
130 |
| - tmp.flush() |
131 |
| - |
132 |
| - with xr.open_dataset(tmp.name, engine="cfgrib") as ds: |
133 |
| - data = ds.load() |
134 |
| - df = data.to_dataframe() |
135 |
| - df.reset_index(level=["latitude", "longitude"], inplace=True) |
136 |
| - df.drop(columns="surface", inplace=True) |
137 |
| - df.dropna( |
138 |
| - subset=[ |
139 |
| - "swh", |
140 |
| - "perpw", |
141 |
| - "dirpw", |
142 |
| - "shww", |
143 |
| - "mpww", |
144 |
| - "wvdir", |
145 |
| - "ws", |
146 |
| - "wdir", |
147 |
| - "swper", |
148 |
| - "swell", |
149 |
| - ], |
150 |
| - how="all", |
151 |
| - inplace=True, |
152 |
| - ) |
153 |
| - # Adjust longitude scaling to domain of -180 to 180 |
154 |
| - df["longitude"] = ( |
155 |
| - df["longitude"].apply(lambda x: x - 360 if x > 180 else x).round(2) |
156 |
| - ) |
157 |
| - # Create a point for postgis for indexing |
158 |
| - df["location"] = df.apply( |
159 |
| - lambda row: Point(row["longitude"], row["latitude"]), axis=1 |
160 |
| - ) |
161 |
| - # Give a mercator value for the point where `srid` defines the projection scheme |
162 |
| - df["location"] = df["location"].apply(lambda loc: WKTElement(loc.wkt, srid=4326)) |
163 |
| - df["step"] = df["step"].dt.total_seconds() / 3600.0 |
164 |
| - df["step"] = df["step"].astype(str) + " hours" |
165 |
| - return df |
166 |
| - |
167 |
| - else: |
168 |
| - print(f"Failed to get data: {response.status_code}") |
169 |
| - |
170 |
| - |
171 |
| -def df_to_db(df, engine, table_name): |
172 |
| - """ |
173 |
| - Commit a DataFrame to the database. |
174 |
| -
|
175 |
| - Args: |
176 |
| - df (pandas.DataFrame): The DataFrame to be committed. |
177 |
| -
|
178 |
| - Raises: |
179 |
| - SQLAlchemyError: If an error occurs while committing the DataFrame |
180 |
| - to the database. |
181 |
| - """ |
182 |
| - with engine.begin() as connection: |
183 |
| - try: |
184 |
| - df["entry_updated"] = pendulum.now("UTC") |
185 |
| - df.to_sql( |
186 |
| - table_name, |
187 |
| - con=connection, |
188 |
| - if_exists="append", |
189 |
| - index=False, |
190 |
| - dtype={"location": Geography(geometry_type="POINT", srid=4326)}, |
191 |
| - ) |
192 |
| - entry_id = df["valid_time"].unique() |
193 |
| - entry_id = entry_id[0].strftime("%Y-%m-%d %H:%M:%S") |
194 |
| - print(f"Successfully wrote grib2 file for {entry_id}") |
195 |
| - except SQLAlchemyError as e: |
196 |
| - print(f"An error occurred: {e}") |
197 |
| - |
198 |
| - |
199 |
| -with DAG( |
200 |
| - "debug_gefs_wave_etl_from_kafka", |
201 |
| - default_args=default_args, |
202 |
| - description="Get GEFS grib2 urls from topic and batch process to postgis", |
203 |
| - schedule_interval=None, |
204 |
| - catchup=False, |
205 |
| -) as dag: |
206 |
| - data = consume_from_kafka( |
207 |
| - topic=topic, |
208 |
| - engine=engine, |
209 |
| - table_name=table_name, |
210 |
| - bs=8, |
211 |
| - sasl_username=sasl_username, |
212 |
| - sasl_password=sasl_password, |
213 |
| - ) |
214 |
| - |
215 |
| -if __name__ == "__main__": |
216 |
| - dag.test() |
0 commit comments