Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor geospatial dedupe #190

Merged
merged 11 commits into from
Sep 11, 2023
Merged
2 changes: 1 addition & 1 deletion airflow/dags/geo_reference/load_building_footprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def building_footprints_dag():
job_queue=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_QUEUE"],
job_definition=os.environ["AIRFLOW__CUSTOM__DEFAULT_JOB_DEFINITION"],
overrides={
"command": ["python", "-m", "jobs.geo.building_footprints"],
"command": ["python", "-m", "jobs.geo.load_building_footprints"],
"resourceRequirements": [
{"type": "VCPU", "value": "8"},
{"type": "MEMORY", "value": "32768"},
Expand Down
2 changes: 1 addition & 1 deletion jobs/geo/write_building_footprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def write_building_footprints(conn):

gdf = gdf[gdf.geometry.geom_type != "GeometryCollection"]

file_prefix = f"footprints_with_blocks_for_county_fips_{county}"
file_prefix = f"footprints_with_tiger_for_county_fips_{county}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this name, since the files now include county, tract, block, and place data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me as a name change!

gdf.to_parquet(f"{file_prefix}.parquet")
gdf.to_file(f"{file_prefix}.shp.zip")

Expand Down
45 changes: 45 additions & 0 deletions transform/macros/_macros.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
version: 2

macros:
- name: spatial_join_with_deduplication
description: |
Macro to perform a spatial join between two relations with deduplication of the
geometries in the left table. For all left geometries that satisfy the predicate
for more than one geometry in the right table, we compute their intersection and
then choose the left geometry with the greatest intersection.
arguments:
- name: left_model
type: string
description: The left model to join. Can be a relation or CTE.
- name: right_model
type: string
description: The right model to join. Can be a relation or CTE.
- name: left_cols
type: list of strings
description: |
List columns to keep from the left table
(excluding the geometry column, which is always retained)
- name: right_cols
type: list of strings
description: |
List of columns to keep from the right table
(excluding the geometry column, which is never retained).
Cannot share any names with left_cols
- name: left_geom
type: string
description: The name of the left geometry column, defaults to "geometry"
- name: right_geom
type: string
description: The name of the right geometry column, defaults to "geometry"
- name: op
description: |
The spatial predicate function to choose,
defaults to "st_intersects"
- name: kind
type: string
description: The kind of join, either "left" or "inner". Defaults to "left"
- name: prefix
type: string
description: |
An optional prefix to give to temporary CTEs to improve legibility and
avoid name collisions.
4 changes: 2 additions & 2 deletions transform/macros/map_class_fp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
} -%}

case
{% for k, v in class_fips_dict.items() %}
{% for k, v in class_fips_dict.items() -%}
ian-r-rose marked this conversation as resolved.
Show resolved Hide resolved
when "{{ class_fips }}" = '{{ k }}'
then '{{ v }}'
{% endfor %}
{% endfor -%}
end

{%- endmacro %}
63 changes: 63 additions & 0 deletions transform/macros/spatial_join_with_deduplication.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{# Macro to perform a spatial join between two relations with deduplication of the
geometries in the left table. For all left geometries that satisfy the predicate for
more than one geometry in the right table, we compute their intersection and then
choose the left geometry with the greatest intersection.
#}

{% macro spatial_join_with_deduplication(left_model, right_model, left_cols, right_cols, left_geom="geometry", right_geom="geometry", op="st_intersects", kind="left", prefix="") %}

with {{ prefix }}_left_model_with_id as (
select
/* Generate a temporary ID for footprints. We will need this to group/partition
by unique footprints further down. We could use a UUID, but integers are
cheaper to generate and compare. */
*, seq4() as _tmp_sjoin_id
from {{ left_model }}
),

{{ prefix }}_joined as (
select
{% for lcol in left_cols -%}
{{ prefix }}_left_model_with_id.{{ lcol }},
{% endfor -%}
{% for rcol in right_cols -%}
{{ right_model }}.{{ rcol }},
{% endfor -%}
{{ prefix }}_left_model_with_id.{{ left_geom }},
/* We don't actually need the intersection for every geometry, only for the
ones that intersect more than one. However, in order to establish which
ones intersect more than one, we need a windowed COUNT partitioned by
_tmp_sjoin_id. This is an expensive operation, as it likely triggers a shuffle
(even though it should already be sorted by _tmp_id). In testing we've found
that it's cheaper to just do the intersection for all the geometries. */
st_area(
st_intersection({{ prefix }}_left_model_with_id.{{ left_geom }}, {{ right_model }}.{{ right_geom }})
) as _tmp_sjoin_intersection,
{{ prefix }}_left_model_with_id._tmp_sjoin_id
from {{ prefix }}_left_model_with_id
{{ kind }} join {{ right_model }}
on {{ op }}({{ prefix }}_left_model_with_id.{{ left_geom }}, {{ right_model }}.{{ right_geom }})
),

{{ prefix }}_deduplicated as (
select
-- Snowflake doesn't support geometries in max_by. It should, but it doesn't.
-- Fortunately, we know that the geometries are identical when partitioned
-- by _tmp_sjoin_id, so we can just choose any_value.
any_value({{ left_geom }}) as {{ left_geom }},
{% for lcol in left_cols -%}
-- max_by returns null if all the values in a group are null. So if we have a left
-- join, we need to guard against nulls with a coalesce to return the single value
max_by({{ lcol }}, coalesce(_tmp_sjoin_intersection, 1.0)) as {{ lcol }},
{% endfor -%}
{% for rcol in right_cols -%}
-- max_by returns null if all the values in a group are null. So if we have a left
-- join, we need to guard against nulls with a coalesce to return the single value
max_by({{ rcol }}, coalesce(_tmp_sjoin_intersection, 1.0)) as {{ rcol }}{{ "," if not loop.last }}
{% endfor -%}
from {{ prefix }}_joined
group by _tmp_sjoin_id
)

select * from {{ prefix }}_deduplicated
{%- endmacro -%}
39 changes: 8 additions & 31 deletions transform/models/marts/geo_reference/_geo_reference__models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ sources:
- name: places

models:
- name: geo_reference__building_footprints_with_blocks
- name: geo_reference__building_footprints_with_tiger
description: |
This data table is a join of the TIGER shapefile, Blocks,
with the Microsoft Building Footprints data for the state of CA.
This data table is a join of the TIGER data for blocks, tracts, counties, and
places with the Microsoft Building Footprints data for the state of CA.
columns:
- name: release
description: The version of the data
Expand All @@ -29,39 +29,19 @@ models:
description: 2020 Census tract code
- name: block
description: 2020 Census tabulation block number
- name: geoid
- name: block_geoid
description: >
Census block identifier; a concatenation of 2020 Census state FIPS code, 2020
Census county FIPS code, 2020 Census tract code, and 2020 Census block number
- name: name
description: >
2020 Census tabulation block name; a concatenation of 'Block'
and the tabulation block number
- name: geometry
description: The spatial component of geographic features
tests:
- dbt_utils.equal_rowcount:
compare_model: source('building_footprints', 'california_building_footprints')
Comment on lines -42 to -44
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shame to lose this test, but with the inner join, the number of footprints is no longer conserved! One approach to restore it would be to do the "in-California" filtering in an intermediate model, then compare the row-count with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make that a to do!

- name: geo_reference__building_footprints_with_places
description: |
This data table is a join of the TIGER shapefile
, Places, with the Microsoft Building Footprints
data for the state of CA.
columns:
- name: release
description: The version of the data
- name: capture_dates_range
description: >
Each building footprint has a capture date tag associated from 2019-2020
- name: place_fp
- name: place_fips
description: Current place FIPS code
- name: place_ns
description: Current place GNIS code
- name: geoid
- name: place_geoid
description: >
Place identifier; a concatenation of the current state
FIPS code and place FIPS code
- name: name
- name: place_name
description: >
Current name and the translated legal/statistical
area description for place
Expand All @@ -70,7 +50,4 @@ models:
- name: class_fips
description: Current FIPS class definition
- name: geometry
description: The spatial component of geographic features
tests:
- dbt_utils.equal_rowcount:
compare_model: source('building_footprints', 'california_building_footprints')
description: The footprint geometry

This file was deleted.

This file was deleted.

ian-r-rose marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
with footprints as (
select
"release",
"capture_dates_range",
"geometry"
from {{ source('building_footprints', 'california_building_footprints') }}
),

blocks_source as (
select *
from {{ source('tiger_2022', 'blocks') }}
),

places_source as (
select * from {{ source('tiger_2022', 'places') }}
),

blocks as (
select
"COUNTYFP20" as "county_fips",
"TRACTCE20" as "tract",
"BLOCKCE20" as "block",
"GEOID20" as "block_geoid",
"geometry"
from blocks_source
),

places as (
select
"PLACEFP" as "place_fips",
"PLACENS" as "place_ns",
"GEOID" as "place_geoid",
"NAME" as "place_name",
"CLASSFP" as "class_fips_code",
{{ map_class_fips("CLASSFP") }} as "class_fips",
"geometry"
from places_source
),

footprints_with_blocks as (
{{ spatial_join_with_deduplication(
"footprints",
"blocks",
['"release"', '"capture_dates_range"'],
['"county_fips"', '"tract"', '"block"', '"block_geoid"'],
left_geom='"geometry"',
right_geom='"geometry"',
kind="inner",
ian-r-rose marked this conversation as resolved.
Show resolved Hide resolved
prefix="b",
) }}
),

footprints_with_blocks_and_places as (
{{ spatial_join_with_deduplication(
"footprints_with_blocks",
"places",
['"release"', '"capture_dates_range"', '"county_fips"', '"tract"', '"block"', '"block_geoid"'],
['"place_fips"', '"place_ns"', '"place_geoid"', '"place_name"', '"class_fips_code"', '"class_fips"'],
left_geom='"geometry"',
right_geom='"geometry"',
kind="left",
prefix="p",
) }}
)

select * from footprints_with_blocks_and_places