diff --git a/airflow/dags/geo_reference/load_building_footprints.py b/airflow/dags/geo_reference/load_building_footprints.py index 29c196cb..e5d9cc4c 100644 --- a/airflow/dags/geo_reference/load_building_footprints.py +++ b/airflow/dags/geo_reference/load_building_footprints.py @@ -26,7 +26,7 @@ def building_footprints_dag(): job_queue=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_QUEUE"], job_definition=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_DEFINITION"], overrides={ - "command": ["python", "-m", "jobs.geo.building_footprints"], + "command": ["python", "-m", "jobs.geo.load_building_footprints"], "resourceRequirements": [ {"type": "VCPU", "value": "8"}, {"type": "MEMORY", "value": "32768"}, diff --git a/jobs/geo/write_building_footprints.py b/jobs/geo/write_building_footprints.py index f81da578..f9b94e07 100644 --- a/jobs/geo/write_building_footprints.py +++ b/jobs/geo/write_building_footprints.py @@ -38,7 +38,7 @@ def write_building_footprints(conn): gdf = gdf[gdf.geometry.geom_type != "GeometryCollection"] - file_prefix = f"footprints_with_blocks_for_county_fips_{county}" + file_prefix = f"footprints_with_tiger_for_county_fips_{county}" gdf.to_parquet(f"{file_prefix}.parquet") gdf.to_file(f"{file_prefix}.shp.zip") diff --git a/transform/macros/_macros.yml b/transform/macros/_macros.yml new file mode 100644 index 00000000..8b4231aa --- /dev/null +++ b/transform/macros/_macros.yml @@ -0,0 +1,45 @@ +version: 2 + +macros: + - name: spatial_join_with_deduplication + description: | + Macro to perform a spatial join between two relations with deduplication of the + geometries in the left table. For all left geometries that satisfy the predicate + for more than one geometry in the right table, we compute their intersection and + then choose the left geometry with the greatest intersection. + arguments: + - name: left_model + type: string + description: The left model to join. Can be a relation or CTE. + - name: right_model + type: string + description: The right model to join. Can be a relation or CTE. + - name: left_cols + type: list of strings + description: | + List columns to keep from the left table + (excluding the geometry column, which is always retained) + - name: right_cols + type: list of strings + description: | + List of columns to keep from the right table + (excluding the geometry column, which is never retained). + Cannot share any names with left_cols + - name: left_geom + type: string + description: The name of the left geometry column, defaults to "geometry" + - name: right_geom + type: string + description: The name of the right geometry column, defaults to "geometry" + - name: op + description: | + The spatial predicate function to choose, + defaults to "st_intersects" + - name: kind + type: string + description: The kind of join, either "left" or "inner". Defaults to "left" + - name: prefix + type: string + description: | + An optional prefix to give to temporary CTEs to improve legibility and + avoid name collisions. diff --git a/transform/macros/map_class_fp.sql b/transform/macros/map_class_fp.sql index 7ab371dc..95690ab5 100644 --- a/transform/macros/map_class_fp.sql +++ b/transform/macros/map_class_fp.sql @@ -12,10 +12,10 @@ } -%} case - {% for k, v in class_fips_dict.items() %} + {% for k, v in class_fips_dict.items() -%} when "{{ class_fips }}" = '{{ k }}' then '{{ v }}' - {% endfor %} + {% endfor -%} end {%- endmacro %} diff --git a/transform/macros/spatial_join_with_deduplication.sql b/transform/macros/spatial_join_with_deduplication.sql new file mode 100644 index 00000000..80637b09 --- /dev/null +++ b/transform/macros/spatial_join_with_deduplication.sql @@ -0,0 +1,63 @@ +{# Macro to perform a spatial join between two relations with deduplication of the + geometries in the left table. For all left geometries that satisfy the predicate for + more than one geometry in the right table, we compute their intersection and then + choose the left geometry with the greatest intersection. +#} + +{% macro spatial_join_with_deduplication(left_model, right_model, left_cols, right_cols, left_geom="geometry", right_geom="geometry", op="st_intersects", kind="left", prefix="") %} + +with {{ prefix }}_left_model_with_id as ( + select + /* Generate a temporary ID for footprints. We will need this to group/partition + by unique footprints further down. We could use a UUID, but integers are + cheaper to generate and compare. */ + *, seq4() as _tmp_sjoin_id + from {{ left_model }} +), + +{{ prefix }}_joined as ( + select + {% for lcol in left_cols -%} + {{ prefix }}_left_model_with_id.{{ lcol }}, + {% endfor -%} + {% for rcol in right_cols -%} + {{ right_model }}.{{ rcol }}, + {% endfor -%} + {{ prefix }}_left_model_with_id.{{ left_geom }}, + /* We don't actually need the intersection for every geometry, only for the + ones that intersect more than one. However, in order to establish which + ones intersect more than one, we need a windowed COUNT partitioned by + _tmp_sjoin_id. This is an expensive operation, as it likely triggers a shuffle + (even though it should already be sorted by _tmp_id). In testing we've found + that it's cheaper to just do the intersection for all the geometries. */ + st_area( + st_intersection({{ prefix }}_left_model_with_id.{{ left_geom }}, {{ right_model }}.{{ right_geom }}) + ) as _tmp_sjoin_intersection, + {{ prefix }}_left_model_with_id._tmp_sjoin_id + from {{ prefix }}_left_model_with_id + {{ kind }} join {{ right_model }} + on {{ op }}({{ prefix }}_left_model_with_id.{{ left_geom }}, {{ right_model }}.{{ right_geom }}) +), + +{{ prefix }}_deduplicated as ( + select + -- Snowflake doesn't support geometries in max_by. It should, but it doesn't. + -- Fortunately, we know that the geometries are identical when partitioned + -- by _tmp_sjoin_id, so we can just choose any_value. + any_value({{ left_geom }}) as {{ left_geom }}, + {% for lcol in left_cols -%} + -- max_by returns null if all the values in a group are null. So if we have a left + -- join, we need to guard against nulls with a coalesce to return the single value + max_by({{ lcol }}, coalesce(_tmp_sjoin_intersection, 1.0)) as {{ lcol }}, + {% endfor -%} + {% for rcol in right_cols -%} + -- max_by returns null if all the values in a group are null. So if we have a left + -- join, we need to guard against nulls with a coalesce to return the single value + max_by({{ rcol }}, coalesce(_tmp_sjoin_intersection, 1.0)) as {{ rcol }}{{ "," if not loop.last }} + {% endfor -%} + from {{ prefix }}_joined + group by _tmp_sjoin_id +) + +select * from {{ prefix }}_deduplicated +{%- endmacro -%} diff --git a/transform/models/marts/geo_reference/_geo_reference__models.yml b/transform/models/marts/geo_reference/_geo_reference__models.yml index e3e33a5c..bfe72efd 100644 --- a/transform/models/marts/geo_reference/_geo_reference__models.yml +++ b/transform/models/marts/geo_reference/_geo_reference__models.yml @@ -14,10 +14,10 @@ sources: - name: places models: - - name: geo_reference__building_footprints_with_blocks + - name: geo_reference__building_footprints_with_tiger description: | - This data table is a join of the TIGER shapefile, Blocks, - with the Microsoft Building Footprints data for the state of CA. + This data table is a join of the TIGER data for blocks, tracts, counties, and + places with the Microsoft Building Footprints data for the state of CA. columns: - name: release description: The version of the data @@ -29,39 +29,19 @@ models: description: 2020 Census tract code - name: block description: 2020 Census tabulation block number - - name: geoid + - name: block_geoid description: > Census block identifier; a concatenation of 2020 Census state FIPS code, 2020 Census county FIPS code, 2020 Census tract code, and 2020 Census block number - - name: name - description: > - 2020 Census tabulation block name; a concatenation of 'Block' - and the tabulation block number - - name: geometry - description: The spatial component of geographic features - tests: - - dbt_utils.equal_rowcount: - compare_model: source('building_footprints', 'california_building_footprints') - - name: geo_reference__building_footprints_with_places - description: | - This data table is a join of the TIGER shapefile - , Places, with the Microsoft Building Footprints - data for the state of CA. - columns: - - name: release - description: The version of the data - - name: capture_dates_range - description: > - Each building footprint has a capture date tag associated from 2019-2020 - - name: place_fp + - name: place_fips description: Current place FIPS code - name: place_ns description: Current place GNIS code - - name: geoid + - name: place_geoid description: > Place identifier; a concatenation of the current state FIPS code and place FIPS code - - name: name + - name: place_name description: > Current name and the translated legal/statistical area description for place @@ -70,7 +50,4 @@ models: - name: class_fips description: Current FIPS class definition - name: geometry - description: The spatial component of geographic features - tests: - - dbt_utils.equal_rowcount: - compare_model: source('building_footprints', 'california_building_footprints') + description: The footprint geometry diff --git a/transform/models/marts/geo_reference/geo_reference__building_footprints_with_blocks.sql b/transform/models/marts/geo_reference/geo_reference__building_footprints_with_blocks.sql deleted file mode 100644 index 999b556f..00000000 --- a/transform/models/marts/geo_reference/geo_reference__building_footprints_with_blocks.sql +++ /dev/null @@ -1,59 +0,0 @@ -with footprints as ( - select - "release", - "capture_dates_range", - "geometry", - /* Generate a temporary ID for footprints. We will need this to group/partition - by unique footprints further down. We could use a UUID, but integers are - cheaper to generate and compare. */ - seq4() as _tmp_id - from {{ source('building_footprints', 'california_building_footprints') }} -), - -blocks_source as ( - select * from {{ source('tiger_2022', 'blocks') }} -), - -blocks as ( - select - "COUNTYFP20" as "county_fips", - "TRACTCE20" as "tract", - "BLOCKCE20" as "block", - "GEOID20" as "geoid", - "NAME20" as "name", - "geometry" - from blocks_source -), - -footprints_and_blocks_joined as ( - select - footprints.*, - blocks.* exclude "geometry", - /* We don't actually need the intersection for every footprint, only for the - ones that intersect more than one block. However, in order to establish which - ones intersect more than one block, we need a windowed COUNT partitioned by - _tmp_id. This is an expensive operation, as it likely triggers a shuffle - (even though it should already be sorted by _tmp_id). In testing we've found - that it's cheaper to just do the intersection for all the footprints. */ - st_area(st_intersection(footprints."geometry", blocks."geometry")) - as _tmp_intersection - from footprints - left join blocks on st_intersects(footprints."geometry", blocks."geometry") -), - -footprints_and_blocks_joined_dedupe as ( - select - -- Snowflake doesn't support geometries in max_by. It should, but it doesn't. - -- Fortunately, we know that the geometries are identical when partitioned - -- by _tmp_id, so we can just choose any_value. - any_value("geometry") as "geometry", - max_by("county_fips", _tmp_intersection) as "county_fips", - max_by("tract", _tmp_intersection) as "tract", - max_by("block", _tmp_intersection) as "block", - max_by("geoid", _tmp_intersection) as "geoid", - max_by("name", _tmp_intersection) as "name" - from footprints_and_blocks_joined - group by _tmp_id -) - -select * from footprints_and_blocks_joined_dedupe diff --git a/transform/models/marts/geo_reference/geo_reference__building_footprints_with_places.sql b/transform/models/marts/geo_reference/geo_reference__building_footprints_with_places.sql deleted file mode 100644 index 87ea1b58..00000000 --- a/transform/models/marts/geo_reference/geo_reference__building_footprints_with_places.sql +++ /dev/null @@ -1,61 +0,0 @@ -with footprints as ( - select - "release", - "capture_dates_range", - "geometry", - /* Generate a temporary ID for footprints. We will need this to group/partition - by unique footprints further down. We could use a UUID, but integers are - cheaper to generate and compare. */ - seq4() as _tmp_id - from {{ source('building_footprints', 'california_building_footprints') }} -), - -places_source as ( - select * from {{ source('tiger_2022', 'places') }} -), - -places as ( - select - "PLACEFP" as "place_fp", - "PLACENS" as "place_ns", - "GEOID" as "geoid", - "NAME" as "name", - "CLASSFP" as "class_fips_code", - {{ map_class_fips("CLASSFP") }} as "class_fips", - "geometry" - from places_source -), - -footprints_and_places_joined as ( - select - footprints.*, - places.* exclude "geometry", - /* We don't actually need the intersection for every footprint, only for the - ones that intersect more than one place. However, in order to establish which - ones intersect more than one place, we need a windowed COUNT partitioned by - _tmp_id. This is an expensive operation, as it likely triggers a shuffle - (even though it should already be sorted by _tmp_id). In testing we've found - that it's cheaper to just do the intersection for all the footprints. */ - st_area(st_intersection(footprints."geometry", places."geometry")) - as _tmp_intersection - from footprints - left join places on st_intersects(footprints."geometry", places."geometry") -), - -footprints_and_places_joined_dedupe as ( - select - -- Snowflake doesn't support geometries in max_by. It should, but it doesn't. - -- Fortunately, we know that the geometries are identical when partitioned - -- by _tmp_id, so we can just choose any_value. - any_value("geometry") as "geometry", - max_by("place_fp", _tmp_intersection) as "place_fp", - max_by("place_ns", _tmp_intersection) as "place_ns", - max_by("name", _tmp_intersection) as "name", - max_by("geoid", _tmp_intersection) as "geoid", - max_by("class_fips_code", _tmp_intersection) as "class_fips_code", - max_by("class_fips", _tmp_intersection) as "class_fips" - from footprints_and_places_joined - group by _tmp_id -) - -select * from footprints_and_places_joined_dedupe diff --git a/transform/models/marts/geo_reference/geo_reference__building_footprints_with_tiger.sql b/transform/models/marts/geo_reference/geo_reference__building_footprints_with_tiger.sql new file mode 100644 index 00000000..85414881 --- /dev/null +++ b/transform/models/marts/geo_reference/geo_reference__building_footprints_with_tiger.sql @@ -0,0 +1,66 @@ +with footprints as ( + select + "release", + "capture_dates_range", + "geometry" + from {{ source('building_footprints', 'california_building_footprints') }} +), + +blocks_source as ( + select * + from {{ source('tiger_2022', 'blocks') }} +), + +places_source as ( + select * from {{ source('tiger_2022', 'places') }} +), + +blocks as ( + select + "COUNTYFP20" as "county_fips", + "TRACTCE20" as "tract", + "BLOCKCE20" as "block", + "GEOID20" as "block_geoid", + "geometry" + from blocks_source +), + +places as ( + select + "PLACEFP" as "place_fips", + "PLACENS" as "place_ns", + "GEOID" as "place_geoid", + "NAME" as "place_name", + "CLASSFP" as "class_fips_code", + {{ map_class_fips("CLASSFP") }} as "class_fips", + "geometry" + from places_source +), + +footprints_with_blocks as ( + {{ spatial_join_with_deduplication( + "footprints", + "blocks", + ['"release"', '"capture_dates_range"'], + ['"county_fips"', '"tract"', '"block"', '"block_geoid"'], + left_geom='"geometry"', + right_geom='"geometry"', + kind="inner", + prefix="b", + ) }} +), + +footprints_with_blocks_and_places as ( + {{ spatial_join_with_deduplication( + "footprints_with_blocks", + "places", + ['"release"', '"capture_dates_range"', '"county_fips"', '"tract"', '"block"', '"block_geoid"'], + ['"place_fips"', '"place_ns"', '"place_geoid"', '"place_name"', '"class_fips_code"', '"class_fips"'], + left_geom='"geometry"', + right_geom='"geometry"', + kind="left", + prefix="p", + ) }} +) + +select * from footprints_with_blocks_and_places