diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 3d23682769..101685cec6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range( MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ /* - Compute a deterministic hash for the `left_table_query_string` that will be used throughout - all the logic as the field to GROUP BY the data + 0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data. */ WITH "entity_dataframe" AS ( SELECT *, @@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range( {% for featureview in featureviews %} +/* + 1. Only select the required columns with entities of the featureview. +*/ + "{{ featureview.name }}__entity_dataframe" AS ( SELECT {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} @@ -752,20 +756,7 @@ def _get_entity_df_event_timestamp_range( ), /* - This query template performs the point-in-time correctness join for a single feature set table - to the provided entity table. - - 1. We first join the current feature_view to the entity dataframe that has been passed. - This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `timestamp_field` - is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` - is higher the the one provided minus the TTL - - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been - computed previously - - The output of this CTE will contain all the necessary information and already filtered out most - of the data that is not relevant. +2. Use subquery to prepare event_timestamp, created_timestamp, entity columns and feature columns. */ "{{ featureview.name }}__subquery" AS ( @@ -777,94 +768,61 @@ def _get_entity_df_event_timestamp_range( "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} - WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}' - {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' - {% endif %} -), - -"{{ featureview.name }}__base" AS ( - SELECT - "subquery".*, - "entity_dataframe"."entity_timestamp", - "entity_dataframe"."{{featureview.name}}__entity_row_unique_id" - FROM "{{ featureview.name }}__subquery" AS "subquery" - INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe" - ON TRUE - AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp" - - {% if featureview.ttl == 0 %}{% else %} - AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp") - {% endif %} - - {% for entity in featureview.entities %} - AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}" - {% endfor %} ), /* - 2. If the `created_timestamp_column` has been set, we need to - deduplicate the data first. This is done by calculating the - `MAX(created_at_timestamp)` for each event_timestamp. - We then join the data on the next CTE +3. If the `created_timestamp_column` has been set, we need to +deduplicate the data first. This is done by calculating the +`MAX(created_at_timestamp)` for each event_timestamp and joining back on the subquery. +Otherwise, the ASOF JOIN can have unstable side effects +https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table */ + {% if featureview.created_timestamp_column %} "{{ featureview.name }}__dedup" AS ( - SELECT - "{{featureview.name}}__entity_row_unique_id", - "event_timestamp", - MAX("created_timestamp") AS "created_timestamp" - FROM "{{ featureview.name }}__base" - GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp" + SELECT * + FROM "{{ featureview.name }}__subquery" + INNER JOIN ( + SELECT + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} + "event_timestamp", + MAX("created_timestamp") AS "created_timestamp" + FROM "{{ featureview.name }}__subquery" + GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" + ) + USING({{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", "created_timestamp") ), {% endif %} /* - 3. The data has been filtered during the first CTE "*__base" - Thus we only need to compute the latest timestamp of each feature. +4. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe. */ -"{{ featureview.name }}__latest" AS ( + +"{{ featureview.name }}__asof_join" AS ( SELECT - "event_timestamp", - {% if featureview.created_timestamp_column %}"created_timestamp",{% endif %} - "{{featureview.name}}__entity_row_unique_id" - FROM - ( - SELECT *, - ROW_NUMBER() OVER( - PARTITION BY "{{featureview.name}}__entity_row_unique_id" - ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %} - ) AS "row_number" - FROM "{{ featureview.name }}__base" - {% if featureview.created_timestamp_column %} - INNER JOIN "{{ featureview.name }}__dedup" - USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp") - {% endif %} - ) - WHERE "row_number" = 1 + e.*, + v.* + FROM "{{ featureview.name }}__entity_dataframe" e + ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}"{{ featureview.name }}__subquery"{% endif %} v + MATCH_CONDITION (e."entity_timestamp" >= v."event_timestamp") + {% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %} ), /* - 4. Once we know the latest value of each feature for a given timestamp, - we can join again the data back to the original "base" dataset +5. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl. */ -"{{ featureview.name }}__cleaned" AS ( - SELECT "base".* - FROM "{{ featureview.name }}__base" AS "base" - INNER JOIN "{{ featureview.name }}__latest" - USING( - "{{featureview.name}}__entity_row_unique_id", - "event_timestamp" - {% if featureview.created_timestamp_column %} - ,"created_timestamp" - {% endif %} - ) -){% if loop.last %}{% else %}, {% endif %} +"{{ featureview.name }}__ttl" AS ( + SELECT * + FROM "{{ featureview.name }}__asof_join" + {% if featureview.ttl == 0 %}{% else %} + WHERE "event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp") + {% endif %} +){% if loop.last %}{% else %}, {% endif %} {% endfor %} /* - Joins the outputs of multiple time travel joins to a single table. + Join the outputs of multiple time travel joins to a single table. The entity_dataframe dataset being our source of truth here. */ @@ -877,7 +835,7 @@ def _get_entity_df_event_timestamp_range( {% for feature in featureview.features %} ,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %} {% endfor %} - FROM "{{ featureview.name }}__cleaned" -) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id") + FROM "{{ featureview.name }}__ttl" +) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id") {% endfor %} """