From da7b74a209328914111961caf8f90efcb2862e31 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Fri, 13 Dec 2024 12:38:23 +0100 Subject: [PATCH 1/6] Use ASOF JOIN in Snowflake offline store query Signed-off-by: hkuepers --- .../feast/infra/offline_stores/snowflake.py | 129 +++++------------- 1 file changed, 35 insertions(+), 94 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 3d23682769..62300b42c8 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range( MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ /* - Compute a deterministic hash for the `left_table_query_string` that will be used throughout - all the logic as the field to GROUP BY the data + 0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data. */ WITH "entity_dataframe" AS ( SELECT *, @@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range( {% for featureview in featureviews %} +/* + 1. Only select the required columns with entities of the featureview. +*/ + "{{ featureview.name }}__entity_dataframe" AS ( SELECT {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} @@ -751,120 +755,57 @@ def _get_entity_df_event_timestamp_range( "{{featureview.name}}__entity_row_unique_id" ), -/* - This query template performs the point-in-time correctness join for a single feature set table - to the provided entity table. - - 1. We first join the current feature_view to the entity dataframe that has been passed. - This JOIN has the following logic: - - For each row of the entity dataframe, only keep the rows where the `timestamp_field` - is less than the one provided in the entity dataframe - - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` - is higher the the one provided minus the TTL - - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been - computed previously - - The output of this CTE will contain all the necessary information and already filtered out most - of the data that is not relevant. -*/ - -"{{ featureview.name }}__subquery" AS ( - SELECT - "{{ featureview.timestamp_field }}" as "event_timestamp", - {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }} - {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} - {% for feature in featureview.features %} - "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} - {% endfor %} - FROM {{ featureview.table_subquery }} - WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}' - {% if featureview.ttl == 0 %}{% else %} - AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}' - {% endif %} -), - -"{{ featureview.name }}__base" AS ( - SELECT - "subquery".*, - "entity_dataframe"."entity_timestamp", - "entity_dataframe"."{{featureview.name}}__entity_row_unique_id" - FROM "{{ featureview.name }}__subquery" AS "subquery" - INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe" - ON TRUE - AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp" - - {% if featureview.ttl == 0 %}{% else %} - AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp") - {% endif %} - - {% for entity in featureview.entities %} - AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}" - {% endfor %} -), - /* 2. If the `created_timestamp_column` has been set, we need to deduplicate the data first. This is done by calculating the `MAX(created_at_timestamp)` for each event_timestamp. - We then join the data on the next CTE + Otherwise, the ASOF JOIN can have unstable side effects + https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table */ + {% if featureview.created_timestamp_column %} "{{ featureview.name }}__dedup" AS ( SELECT - "{{featureview.name}}__entity_row_unique_id", + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp", MAX("created_timestamp") AS "created_timestamp" - FROM "{{ featureview.name }}__base" - GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp" + FROM "{{ featureview.table_subquery }}" + GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp" ), {% endif %} /* - 3. The data has been filtered during the first CTE "*__base" - Thus we only need to compute the latest timestamp of each feature. +3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe. */ -"{{ featureview.name }}__latest" AS ( + +"{{ featureview.name }}__asof_join" AS ( SELECT - "event_timestamp", - {% if featureview.created_timestamp_column %}"created_timestamp",{% endif %} - "{{featureview.name}}__entity_row_unique_id" - FROM - ( - SELECT *, - ROW_NUMBER() OVER( - PARTITION BY "{{featureview.name}}__entity_row_unique_id" - ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %} - ) AS "row_number" - FROM "{{ featureview.name }}__base" - {% if featureview.created_timestamp_column %} - INNER JOIN "{{ featureview.name }}__dedup" - USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp") - {% endif %} - ) - WHERE "row_number" = 1 + e.*, + {% for feature in featureview.features %} + v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} + {% endfor %}, + v."{{ featureview.timestamp_field }}" + FROM "{{ featureview.name }}__entity_dataframe" e + ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v + MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}") + USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %}) ), /* - 4. Once we know the latest value of each feature for a given timestamp, - we can join again the data back to the original "base" dataset +4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl. */ -"{{ featureview.name }}__cleaned" AS ( - SELECT "base".* - FROM "{{ featureview.name }}__base" AS "base" - INNER JOIN "{{ featureview.name }}__latest" - USING( - "{{featureview.name}}__entity_row_unique_id", - "event_timestamp" - {% if featureview.created_timestamp_column %} - ,"created_timestamp" - {% endif %} - ) -){% if loop.last %}{% else %}, {% endif %} +"{{ featureview.name }}__ttl" AS ( + SELECT * + FROM "{{ featureview.name }}__asof_join" + {% if featureview.ttl == 0 %}{% else %} + WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp") + {% endif %} +){% if loop.last %}{% else %}, {% endif %} {% endfor %} /* - Joins the outputs of multiple time travel joins to a single table. + Join the outputs of multiple time travel joins to a single table. The entity_dataframe dataset being our source of truth here. */ @@ -877,7 +818,7 @@ def _get_entity_df_event_timestamp_range( {% for feature in featureview.features %} ,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %} {% endfor %} - FROM "{{ featureview.name }}__cleaned" -) "{{ featureview.name }}__cleaned" USING ("{{featureview.name}}__entity_row_unique_id") + FROM "{{ featureview.name }}__ttl" +) "{{ featureview.name }}__ttl" USING ("{{featureview.name}}__entity_row_unique_id") {% endfor %} """ From 4b48655c1162f3fb20d7899a328c01039f737311 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Thu, 19 Dec 2024 10:09:57 +0100 Subject: [PATCH 2/6] Fix Snowflake query template for entityless feature views Signed-off-by: hkuepers --- sdk/python/feast/infra/offline_stores/snowflake.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 62300b42c8..4687523b09 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -766,11 +766,11 @@ def _get_entity_df_event_timestamp_range( {% if featureview.created_timestamp_column %} "{{ featureview.name }}__dedup" AS ( SELECT - {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", MAX("created_timestamp") AS "created_timestamp" FROM "{{ featureview.table_subquery }}" - GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}, "event_timestamp" + GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" ), {% endif %} @@ -788,7 +788,9 @@ def _get_entity_df_event_timestamp_range( FROM "{{ featureview.name }}__entity_dataframe" e ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}") + {% if featureview.entities %} USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %}) + {% endif %} ), /* From 3c0ca9bf7fc19d5e30115f436ab3cc8fa0b65486 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Thu, 19 Dec 2024 12:51:41 +0100 Subject: [PATCH 3/6] Remove quotes on subquery in snowflake template Signed-off-by: hkuepers --- sdk/python/feast/infra/offline_stores/snowflake.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 4687523b09..26d15c47ff 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -769,7 +769,7 @@ def _get_entity_df_event_timestamp_range( {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", MAX("created_timestamp") AS "created_timestamp" - FROM "{{ featureview.table_subquery }}" + FROM {{ featureview.table_subquery }} GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" ), {% endif %} @@ -788,9 +788,7 @@ def _get_entity_df_event_timestamp_range( FROM "{{ featureview.name }}__entity_dataframe" e ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}") - {% if featureview.entities %} - USING({% for entity in featureview.entities %}{% if not loop.first %},{% endif %}"{{ entity }}"{% endfor %}) - {% endif %} + {% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %} ), /* From f37b52a6df64f7964ec0c8d533a4e0210906b66f Mon Sep 17 00:00:00 2001 From: hkuepers Date: Thu, 19 Dec 2024 13:56:15 +0100 Subject: [PATCH 4/6] Use __subquery in Snowflake template for preparation Signed-off-by: hkuepers --- .../feast/infra/offline_stores/snowflake.py | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 26d15c47ff..a6527856d7 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -756,7 +756,22 @@ def _get_entity_df_event_timestamp_range( ), /* - 2. If the `created_timestamp_column` has been set, we need to +2. Use subquery to prepare event_timestamp, created_timestamp, entity columns and feature columns. +*/ + +"{{ featureview.name }}__subquery" AS ( + SELECT + "{{ featureview.timestamp_field }}" as "event_timestamp", + {{'"' ~ featureview.created_timestamp_column ~ '" as "created_timestamp",' if featureview.created_timestamp_column else '' }} + {{featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} + {% for feature in featureview.features %} + "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM {{ featureview.table_subquery }} +), + +/* + 3. If the `created_timestamp_column` has been set, we need to deduplicate the data first. This is done by calculating the `MAX(created_at_timestamp)` for each event_timestamp. Otherwise, the ASOF JOIN can have unstable side effects @@ -766,33 +781,29 @@ def _get_entity_df_event_timestamp_range( {% if featureview.created_timestamp_column %} "{{ featureview.name }}__dedup" AS ( SELECT - {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} - "event_timestamp", + *, MAX("created_timestamp") AS "created_timestamp" - FROM {{ featureview.table_subquery }} + FROM "{{ featureview.name }}__subquery" GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" ), {% endif %} /* -3. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe. +4. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe. */ "{{ featureview.name }}__asof_join" AS ( SELECT e.*, - {% for feature in featureview.features %} - v."{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} - {% endfor %}, - v."{{ featureview.timestamp_field }}" + v.* FROM "{{ featureview.name }}__entity_dataframe" e - ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}{{ featureview.table_subquery }}{% endif %} v - MATCH_CONDITION (e."entity_timestamp" >= v."{{ featureview.timestamp_field }}") + ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}"{{ featureview.name }}__subquery"{% endif %} v + MATCH_CONDITION (e."entity_timestamp" >= v."event_timestamp") {% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %} ), /* -4. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl. +5. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl. */ "{{ featureview.name }}__ttl" AS ( From 6b6c48e6bed9e2c51b8d29f1c0ee4341ddfb851f Mon Sep 17 00:00:00 2001 From: hkuepers Date: Thu, 19 Dec 2024 15:06:45 +0100 Subject: [PATCH 5/6] Fix deduplication in Snowflake query string Signed-off-by: hkuepers --- .../feast/infra/offline_stores/snowflake.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index a6527856d7..f32621e729 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -771,20 +771,26 @@ def _get_entity_df_event_timestamp_range( ), /* - 3. If the `created_timestamp_column` has been set, we need to - deduplicate the data first. This is done by calculating the - `MAX(created_at_timestamp)` for each event_timestamp. - Otherwise, the ASOF JOIN can have unstable side effects - https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table +3. If the `created_timestamp_column` has been set, we need to +deduplicate the data first. This is done by calculating the +`MAX(created_at_timestamp)` for each event_timestamp and joining back on the subquery. +Otherwise, the ASOF JOIN can have unstable side effects +https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table */ {% if featureview.created_timestamp_column %} "{{ featureview.name }}__dedup" AS ( - SELECT - *, - MAX("created_timestamp") AS "created_timestamp" + SELECT * FROM "{{ featureview.name }}__subquery" - GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" + INNER JOIN ( + SELECT + {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} + "event_timestamp", + MAX("created_timestamp") AS "created_timestamp" + FROM "{{ featureview.name }}__subquery" + GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp" + ) + USING({{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", "created_timestamp") ), {% endif %} From 089794ce04427745e06664b2f26317f80a1d10b9 Mon Sep 17 00:00:00 2001 From: hkuepers Date: Thu, 19 Dec 2024 22:04:41 +0100 Subject: [PATCH 6/6] Use event_timestamp in ttl cte Signed-off-by: hkuepers --- sdk/python/feast/infra/offline_stores/snowflake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index f32621e729..101685cec6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -816,7 +816,7 @@ def _get_entity_df_event_timestamp_range( SELECT * FROM "{{ featureview.name }}__asof_join" {% if featureview.ttl == 0 %}{% else %} - WHERE "{{ featureview.timestamp_field }}" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp") + WHERE "event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp") {% endif %} ){% if loop.last %}{% else %}, {% endif %}