From c6a6bf69d6d55f0e12218f9c69794949094535bf Mon Sep 17 00:00:00 2001 From: britt <47340362+traintestbritt@users.noreply.github.com> Date: Wed, 23 Aug 2023 12:56:58 -0700 Subject: [PATCH 1/5] renamed loading scripts and added logic to writing script --- .../dags/geo_reference/building_footprints.py | 44 ------------------- .../geo_reference/load_building_footprints.py | 0 jobs/geo/building_footprints.py | 29 ------------ jobs/geo/load_building_footprints.py | 0 jobs/geo/write_building_footprints.py | 42 ++++++++++++++++++ 5 files changed, 42 insertions(+), 73 deletions(-) delete mode 100644 airflow/dags/geo_reference/building_footprints.py create mode 100644 airflow/dags/geo_reference/load_building_footprints.py delete mode 100644 jobs/geo/building_footprints.py create mode 100644 jobs/geo/load_building_footprints.py create mode 100644 jobs/geo/write_building_footprints.py diff --git a/airflow/dags/geo_reference/building_footprints.py b/airflow/dags/geo_reference/building_footprints.py deleted file mode 100644 index 29c196cb..00000000 --- a/airflow/dags/geo_reference/building_footprints.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Load building footprints to Snowflake.""" -from __future__ import annotations - -import os -from datetime import datetime - -from common.defaults import DEFAULT_ARGS - -from airflow.decorators import dag -from airflow.providers.amazon.aws.operators.batch import BatchOperator -from airflow.providers.amazon.aws.sensors.batch import BatchSensor - - -@dag( - description="Test DAG", - start_date=datetime(2023, 5, 23), - schedule_interval="@monthly", - default_args=DEFAULT_ARGS, - catchup=False, -) -def building_footprints_dag(): - """DAG for loading MS Building footprints dataset.""" - submit_batch_job = BatchOperator( - task_id="load_footprints", - job_name="california_building_footprints", - job_queue=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_QUEUE"], - job_definition=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_DEFINITION"], - overrides={ - "command": ["python", "-m", "jobs.geo.building_footprints"], - "resourceRequirements": [ - {"type": "VCPU", "value": "8"}, - {"type": "MEMORY", "value": "32768"}, - ], - }, - region_name="us-west-2", # TODO: can we make this unnecessary? - ) - _ = BatchSensor( - task_id="wait_for_batch_job", - job_id=submit_batch_job.output, - region_name="us-west-2", # TODO: can we make this unnecessary? - ) - - -run = building_footprints_dag() diff --git a/airflow/dags/geo_reference/load_building_footprints.py b/airflow/dags/geo_reference/load_building_footprints.py new file mode 100644 index 00000000..e69de29b diff --git a/jobs/geo/building_footprints.py b/jobs/geo/building_footprints.py deleted file mode 100644 index 264572c5..00000000 --- a/jobs/geo/building_footprints.py +++ /dev/null @@ -1,29 +0,0 @@ -from __future__ import annotations - -from jobs.utils.snowflake import gdf_to_snowflake, snowflake_connection_from_environment - - -def load_state_footprints(conn) -> None: - """Load Microsoft state building footprints dataset for California.""" - import geopandas - - print("Downloading data") - gdf = geopandas.read_file( - "https://usbuildingdata.blob.core.windows.net/usbuildings-v2/California.geojson.zip" - ) - - print("Writing data to snowflake") - gdf_to_snowflake( - gdf, - conn, - table_name="CALIFORNIA_BUILDING_FOOTPRINTS", - cluster=False, - ) - - -if __name__ == "__main__": - conn = snowflake_connection_from_environment( - schema="GEO_REFERENCE", - client_session_keep_alive=True, # This can be a slow job! Keep the session alive - ) - load_state_footprints(conn) diff --git a/jobs/geo/load_building_footprints.py b/jobs/geo/load_building_footprints.py new file mode 100644 index 00000000..e69de29b diff --git a/jobs/geo/write_building_footprints.py b/jobs/geo/write_building_footprints.py new file mode 100644 index 00000000..c3db528a --- /dev/null +++ b/jobs/geo/write_building_footprints.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from jobs.utils.snowflake import snowflake_connection_from_environment + + +def write_building_footprints(conn): + """Grab Microsoft state building footprint data for California from Snowflake and write to an S3 bucket.""" + import geopandas + import shapely + + sql_transform = """ + alter session set GEOGRAPHY_OUTPUT_FORMAT='WKB'; + """ + conn.cursor().execute(sql_transform) + + sql_table = """ + SELECT * + FROM ANALYTICS_DEV.ANALYTICS.GEO_REFERENCE__BUILDING_FOOTPRINTS_WITH_BLOCKS + """ + + df = conn.cursor().execute(sql_table).fetch_pandas_all() + gdf = geopandas.GeoDataFrame( + df.assign(geometry=df.geometry.apply(shapely.wkb.loads)) + ) + + # write parquet and shape files for every single county locally + + counties = gdf.county_fips.unique() + + for x in counties: + gdf[gdf.county_fips == x].to_parquet(f"test_{x}.parquet") + gdf[gdf.county_fips == x].to_file( + f"test_{x}.shp" + ) # this currently outputs .cpg, .dbf, .shx files in addition to .shp, not sure if that is intended + + +if __name__ == "__main__": + conn = snowflake_connection_from_environment( + schema="GEO_REFERENCE", + client_session_keep_alive=True, # This can be a slow job! Keep the session alive + ) + write_building_footprints(conn) From 0ae97176a798d8fcc4e50949f72c33de85f45a74 Mon Sep 17 00:00:00 2001 From: britt <47340362+traintestbritt@users.noreply.github.com> Date: Wed, 23 Aug 2023 13:02:21 -0700 Subject: [PATCH 2/5] readding code that was mysteriously deleted --- .../geo_reference/load_building_footprints.py | 44 +++++++++++++++++++ jobs/geo/load_building_footprints.py | 29 ++++++++++++ 2 files changed, 73 insertions(+) diff --git a/airflow/dags/geo_reference/load_building_footprints.py b/airflow/dags/geo_reference/load_building_footprints.py index e69de29b..29c196cb 100644 --- a/airflow/dags/geo_reference/load_building_footprints.py +++ b/airflow/dags/geo_reference/load_building_footprints.py @@ -0,0 +1,44 @@ +"""Load building footprints to Snowflake.""" +from __future__ import annotations + +import os +from datetime import datetime + +from common.defaults import DEFAULT_ARGS + +from airflow.decorators import dag +from airflow.providers.amazon.aws.operators.batch import BatchOperator +from airflow.providers.amazon.aws.sensors.batch import BatchSensor + + +@dag( + description="Test DAG", + start_date=datetime(2023, 5, 23), + schedule_interval="@monthly", + default_args=DEFAULT_ARGS, + catchup=False, +) +def building_footprints_dag(): + """DAG for loading MS Building footprints dataset.""" + submit_batch_job = BatchOperator( + task_id="load_footprints", + job_name="california_building_footprints", + job_queue=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_QUEUE"], + job_definition=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_DEFINITION"], + overrides={ + "command": ["python", "-m", "jobs.geo.building_footprints"], + "resourceRequirements": [ + {"type": "VCPU", "value": "8"}, + {"type": "MEMORY", "value": "32768"}, + ], + }, + region_name="us-west-2", # TODO: can we make this unnecessary? + ) + _ = BatchSensor( + task_id="wait_for_batch_job", + job_id=submit_batch_job.output, + region_name="us-west-2", # TODO: can we make this unnecessary? + ) + + +run = building_footprints_dag() diff --git a/jobs/geo/load_building_footprints.py b/jobs/geo/load_building_footprints.py index e69de29b..264572c5 100644 --- a/jobs/geo/load_building_footprints.py +++ b/jobs/geo/load_building_footprints.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from jobs.utils.snowflake import gdf_to_snowflake, snowflake_connection_from_environment + + +def load_state_footprints(conn) -> None: + """Load Microsoft state building footprints dataset for California.""" + import geopandas + + print("Downloading data") + gdf = geopandas.read_file( + "https://usbuildingdata.blob.core.windows.net/usbuildings-v2/California.geojson.zip" + ) + + print("Writing data to snowflake") + gdf_to_snowflake( + gdf, + conn, + table_name="CALIFORNIA_BUILDING_FOOTPRINTS", + cluster=False, + ) + + +if __name__ == "__main__": + conn = snowflake_connection_from_environment( + schema="GEO_REFERENCE", + client_session_keep_alive=True, # This can be a slow job! Keep the session alive + ) + load_state_footprints(conn) From b99aacc4b6c6da5d7333ec68abe9ec8e8fd32b9b Mon Sep 17 00:00:00 2001 From: britt <47340362+traintestbritt@users.noreply.github.com> Date: Thu, 24 Aug 2023 13:01:54 -0700 Subject: [PATCH 3/5] refactored script --- jobs/geo/write_building_footprints.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/jobs/geo/write_building_footprints.py b/jobs/geo/write_building_footprints.py index c3db528a..a1e60767 100644 --- a/jobs/geo/write_building_footprints.py +++ b/jobs/geo/write_building_footprints.py @@ -5,13 +5,16 @@ def write_building_footprints(conn): """Grab Microsoft state building footprint data for California from Snowflake and write to an S3 bucket.""" + import os + from zipfile import ZipFile + import geopandas import shapely - sql_transform = """ + sql_alter = """ alter session set GEOGRAPHY_OUTPUT_FORMAT='WKB'; """ - conn.cursor().execute(sql_transform) + conn.cursor().execute(sql_alter) sql_table = """ SELECT * @@ -28,10 +31,23 @@ def write_building_footprints(conn): counties = gdf.county_fips.unique() for x in counties: - gdf[gdf.county_fips == x].to_parquet(f"test_{x}.parquet") + gdf[gdf.county_fips == x].to_parquet(f"county_fips_{x}.parquet") gdf[gdf.county_fips == x].to_file( - f"test_{x}.shp" - ) # this currently outputs .cpg, .dbf, .shx files in addition to .shp, not sure if that is intended + f"county_fips_{x}.shp", driver="ESRI Shapefile" + ) + + with ZipFile(f"county_fips_{x}_shape_files.zip", "w") as zipped: + zipped.write(f"county_fips_{x}.cpg") + os.remove(f"county_fips_{x}.cpg") + + zipped.write(f"county_fips_{x}.dbf") + os.remove(f"county_fips_{x}.dbf") + + zipped.write(f"county_fips_{x}.shp") + os.remove(f"county_fips_{x}.shp") + + zipped.write(f"county_fips_{x}.shx") + os.remove(f"county_fips_{x}.shx") if __name__ == "__main__": From b482f8883c9fff2d90a7de628e54dd60f39ece3c Mon Sep 17 00:00:00 2001 From: britt <47340362+traintestbritt@users.noreply.github.com> Date: Thu, 31 Aug 2023 13:35:22 -0700 Subject: [PATCH 4/5] working script! --- jobs/geo/write_building_footprints.py | 63 ++++++++++++++------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/jobs/geo/write_building_footprints.py b/jobs/geo/write_building_footprints.py index a1e60767..f8a03b18 100644 --- a/jobs/geo/write_building_footprints.py +++ b/jobs/geo/write_building_footprints.py @@ -1,13 +1,12 @@ from __future__ import annotations +import s3fs + from jobs.utils.snowflake import snowflake_connection_from_environment def write_building_footprints(conn): """Grab Microsoft state building footprint data for California from Snowflake and write to an S3 bucket.""" - import os - from zipfile import ZipFile - import geopandas import shapely @@ -16,38 +15,42 @@ def write_building_footprints(conn): """ conn.cursor().execute(sql_alter) - sql_table = """ - SELECT * + counties = """ + SELECT DISTINCT "county_fips" FROM ANALYTICS_DEV.ANALYTICS.GEO_REFERENCE__BUILDING_FOOTPRINTS_WITH_BLOCKS """ - df = conn.cursor().execute(sql_table).fetch_pandas_all() - gdf = geopandas.GeoDataFrame( - df.assign(geometry=df.geometry.apply(shapely.wkb.loads)) - ) - - # write parquet and shape files for every single county locally - - counties = gdf.county_fips.unique() - - for x in counties: - gdf[gdf.county_fips == x].to_parquet(f"county_fips_{x}.parquet") - gdf[gdf.county_fips == x].to_file( - f"county_fips_{x}.shp", driver="ESRI Shapefile" + counties = conn.cursor().execute(counties).fetchall() + counties = [x[0] for x in counties if x[0] is not None] + + for county in counties: + sql_table = f""" + SELECT * + FROM ANALYTICS_DEV.ANALYTICS.GEO_REFERENCE__BUILDING_FOOTPRINTS_WITH_BLOCKS + WHERE "county_fips" = {county} + """ + df = conn.cursor().execute(sql_table).fetch_pandas_all() + gdf = geopandas.GeoDataFrame( + df.assign(geometry=df.geometry.apply(shapely.wkb.loads)) ) + gdf.to_parquet(f"footprints_with_blocks_for_county_fips_{county}.parquet") + gdf.to_file( + f"footprints_with_blocks_for_county_fips_{county}.shp.zip", + driver="ESRI Shapefile", + ) + print(f"loading footprints_with_blocks_for_county_fips_{county}") - with ZipFile(f"county_fips_{x}_shape_files.zip", "w") as zipped: - zipped.write(f"county_fips_{x}.cpg") - os.remove(f"county_fips_{x}.cpg") - - zipped.write(f"county_fips_{x}.dbf") - os.remove(f"county_fips_{x}.dbf") - - zipped.write(f"county_fips_{x}.shp") - os.remove(f"county_fips_{x}.shp") - - zipped.write(f"county_fips_{x}.shx") - os.remove(f"county_fips_{x}.shx") + s3 = s3fs.S3FileSystem(anon=False) + s3.put( + f"footprints_with_blocks_for_county_fips_{county}.parquet", + "dof-demographics-dev-us-west-2-public/", + recursive=True, + ) + s3.put( + f"footprints_with_blocks_for_county_fips_{county}.shp.zip", + "dof-demographics-dev-us-west-2-public/", + recursive=True, + ) if __name__ == "__main__": From cfb7a09013569911d2d6d2e3e4a1f1e321ec3ea9 Mon Sep 17 00:00:00 2001 From: britt <47340362+traintestbritt@users.noreply.github.com> Date: Fri, 1 Sep 2023 11:40:22 -0700 Subject: [PATCH 5/5] refactored writing script, may be final --- jobs/geo/write_building_footprints.py | 36 ++++++++++++++++----------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/jobs/geo/write_building_footprints.py b/jobs/geo/write_building_footprints.py index f8a03b18..f81da578 100644 --- a/jobs/geo/write_building_footprints.py +++ b/jobs/geo/write_building_footprints.py @@ -1,13 +1,14 @@ from __future__ import annotations -import s3fs - from jobs.utils.snowflake import snowflake_connection_from_environment def write_building_footprints(conn): - """Grab Microsoft state building footprint data for California from Snowflake and write to an S3 bucket.""" + """Grab Microsoft building footprints data enriched with Census TIGER Blocks data for California from Snowflake and write to an S3 bucket.""" + import os + import geopandas + import s3fs import shapely sql_alter = """ @@ -18,12 +19,13 @@ def write_building_footprints(conn): counties = """ SELECT DISTINCT "county_fips" FROM ANALYTICS_DEV.ANALYTICS.GEO_REFERENCE__BUILDING_FOOTPRINTS_WITH_BLOCKS + ORDER BY 1 ASC """ counties = conn.cursor().execute(counties).fetchall() counties = [x[0] for x in counties if x[0] is not None] - for county in counties: + for index, county in enumerate(counties): sql_table = f""" SELECT * FROM ANALYTICS_DEV.ANALYTICS.GEO_REFERENCE__BUILDING_FOOTPRINTS_WITH_BLOCKS @@ -33,25 +35,29 @@ def write_building_footprints(conn): gdf = geopandas.GeoDataFrame( df.assign(geometry=df.geometry.apply(shapely.wkb.loads)) ) - gdf.to_parquet(f"footprints_with_blocks_for_county_fips_{county}.parquet") - gdf.to_file( - f"footprints_with_blocks_for_county_fips_{county}.shp.zip", - driver="ESRI Shapefile", + + gdf = gdf[gdf.geometry.geom_type != "GeometryCollection"] + + file_prefix = f"footprints_with_blocks_for_county_fips_{county}" + gdf.to_parquet(f"{file_prefix}.parquet") + gdf.to_file(f"{file_prefix}.shp.zip") + + print( + f"Loading {file_prefix}. This is number {index+1} out of {len(counties)} counties." ) - print(f"loading footprints_with_blocks_for_county_fips_{county}") s3 = s3fs.S3FileSystem(anon=False) s3.put( - f"footprints_with_blocks_for_county_fips_{county}.parquet", - "dof-demographics-dev-us-west-2-public/", - recursive=True, + f"{file_prefix}.parquet", + "s3://dof-demographics-dev-us-west-2-public/parquet/", ) s3.put( - f"footprints_with_blocks_for_county_fips_{county}.shp.zip", - "dof-demographics-dev-us-west-2-public/", - recursive=True, + f"{file_prefix}.shp.zip", "s3://dof-demographics-dev-us-west-2-public/shp/" ) + os.remove(f"{file_prefix}.parquet") + os.remove(f"{file_prefix}.shp.zip") + if __name__ == "__main__": conn = snowflake_connection_from_environment(