-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
airflow: operator and dag/tasks to sync NTD data via DOT API and XLSX #3415
Conversation
fa56a2d
to
6ba2ef8
Compare
self.data = response | ||
self.logger.info( | ||
f"Downloaded {self.product} data for {self.year} with {len(self.data)} rows!" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know anything offhand about the failure modes of the NTD API, but it might be useful to check response status and log non-200 responses or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@charlie-costanzo I took a first pass through this and left just a couple comments. I'd be down to set up a time to talk through it more.
212fb10
to
aa421c2
Compare
aa421c2
to
b04ece4
Compare
It's probably okay, but I'm not entirely sure I understand why we are building operators and airflow dags for some of these data entities such as "2022_reporting/2022_capital_expenses_by_mode.yml" - is there an expectation the data will change? Shouldn't it just be a one off data pull? Edit: Actually i think they keep updating these 2022 datasets for some reason. |
operator: operators.NtdDataProductXLSXOperator | ||
|
||
product: 'raw_monthly_ridership_no_adjustments_or_estimates' | ||
xlsx_file_url: 'https://www.transit.dot.gov/sites/fta.dot.gov/files/2024-09/S%26S%20Time%20Series-May%202024-Major%20Only_240903.xlsx' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this file changes monthly. Are we planning on accounting for this and creating the needed automation for monthly downloads or is that something for a future effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Evan! Yep – we saw that all of these datasets are being updated regularly, even annual reports for previous years, so automation is built into this work. I just pushed changes with a placeholder value for the schedule - the first day of the month, every month. We can update that from here, but figured that was a reasonable suggestion. Let me know if you have any other thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @evansiroky – this PR is ready for merge, but my last remaining question relates to the scheduling of the DAG tasks in this PR. Based on the frequency that we see a lot of the endpoints updating (~monthly) I configured the DAG tasks to run once a month (first day of the month, every month, 3am PT), but open to suggestion here.
815a5b5
to
a66f904
Compare
else: | ||
logging.info( | ||
f"Downloaded {self.product} data for {self.year} with {len(response)} rows!" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (non-blocking): We should probably raise an exception when there is a non-200 response from the API (e.g., a 404 response will still return some JSON, but not in the structure that's valid for the table).
This can be done in a follow-on issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
caputured in this ticket:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say just make a follow-on issue related to the suggestion above, and then LGTM.
a66f904
to
90ba382
Compare
Description
This PR introduces new NTD data sources available through the federal Department of Transportation through their data API as well as XLSX file downloads.
Two Airflow operators were necessary for this work because although a large amount of NTD datasets are now available from the NTD API, there are still important datasets available only in XLSX format (monthly ridership, certain annual reports).
To accomplish this, two new Airflow operators (
scrape_ntd_api.py
andscrape_ntd_xlsx.py
), two associated dags (sync_ntd_data_api
andsync_ntd_data_xlsx
), and a selection of NTD table endpoints as dag tasks were created.Both operators utilize the
PartitionedGCSArtifact
class pattern used elsewhere in the pipeline.NTD Data Sources scraped and stored in this PR include:
We discovered that these tables are retroactively updated at a regular cadence, including annual reports for previous years, so a schedule has been configured to download from these endpoints on the first day of the month, every month.
Resolves #3402, part of Epic #3401
Type of change
How has this been tested?
Successful local Airflow runs, publishing to gcs buckets
Post-merge follow-ups