From d03a895f24be36babfbc2113ea2831cf9faacedb Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:09:43 -0400 Subject: [PATCH 01/45] merged changes Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 1b75d23ed4..f4c4cc8257 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -128,6 +128,7 @@ def __init__( # noqa: C901 description=description, tags=tags, owner=owner, + write_to_online_store=write_to_online_store, ) schema = schema or [] @@ -246,7 +247,10 @@ def __eq__(self, other): or self.mode != other.mode or self.feature_transformation != other.feature_transformation or self.write_to_online_store != other.write_to_online_store +<<<<<<< HEAD or sorted(self.entity_columns) != sorted(other.entity_columns) +======= +>>>>>>> ac077c6b (feat: Adding write capabability to online store to on demand feature views) ): return False From e0f42c6a6938082f99298928dc05b289c0e93318 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 17 Aug 2024 05:45:02 -0400 Subject: [PATCH 02/45] saving progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 1 - .../tests/unit/test_on_demand_feature_view.py | 54 ++++++++++++++++--- 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f4c4cc8257..5c7fc79cd5 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -128,7 +128,6 @@ def __init__( # noqa: C901 description=description, tags=tags, owner=owner, - write_to_online_store=write_to_online_store, ) schema = schema or [] diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 6073891aba..35e2525e1b 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -185,14 +185,14 @@ def test_python_native_transformation_mode(): ) assert ( - on_demand_feature_view_python_native.feature_transformation - == PythonTransformation(python_native_udf, "python native udf source code") + on_demand_feature_view_python_native.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") ) with pytest.raises(TypeError): assert ( - on_demand_feature_view_python_native_err.feature_transformation - == PythonTransformation(python_native_udf, "python native udf source code") + on_demand_feature_view_python_native_err.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") ) assert on_demand_feature_view_python_native.transform_dict( @@ -232,8 +232,8 @@ def test_from_proto_backwards_compatible_udf(): # and to populate it in feature_transformation proto = on_demand_feature_view.to_proto() assert ( - on_demand_feature_view.feature_transformation.udf_string - == proto.spec.feature_transformation.user_defined_function.body_text + on_demand_feature_view.feature_transformation.udf_string + == proto.spec.feature_transformation.user_defined_function.body_text ) # Because of the current set of code this is just confirming it is empty assert proto.spec.user_defined_function.body_text == "" @@ -258,6 +258,44 @@ def test_from_proto_backwards_compatible_udf(): # And now we expect the to get the same object back under feature_transformation reserialized_proto = OnDemandFeatureView.from_proto(proto) assert ( - reserialized_proto.feature_transformation.udf_string - == on_demand_feature_view.feature_transformation.udf_string + reserialized_proto.feature_transformation.udf_string + == on_demand_feature_view.feature_transformation.udf_string ) + + +def test_on_demand_feature_view_writes_protos(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), + write_to_online_store=True, + ) + + proto = on_demand_feature_view.to_proto() + reserialized_proto = OnDemandFeatureView.from_proto(proto) + + + assert on_demand_feature_view.write_to_online_store + assert proto.spec.write_to_online_store + assert reserialized_proto.write_to_online_store + + proto.spec.write_to_online_store = False + reserialized_proto = OnDemandFeatureView.from_proto(proto) + assert not reserialized_proto.write_to_online_store \ No newline at end of file From 856cea2c0682be5b82ed75a2883028dd4e12cab1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:16:53 -0400 Subject: [PATCH 03/45] merged changes to odfv Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 5c7fc79cd5..d4c7924b9e 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -162,7 +162,6 @@ def __init__( # noqa: C901 self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): self.source_feature_view_projections[odfv_source.name] = odfv_source - else: self.source_feature_view_projections[odfv_source.name] = ( odfv_source.projection @@ -246,10 +245,7 @@ def __eq__(self, other): or self.mode != other.mode or self.feature_transformation != other.feature_transformation or self.write_to_online_store != other.write_to_online_store -<<<<<<< HEAD or sorted(self.entity_columns) != sorted(other.entity_columns) -======= ->>>>>>> ac077c6b (feat: Adding write capabability to online store to on demand feature views) ): return False From f6a7133b6792b7f3476f6b8c9ce723a01bf5b30a Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 18 Aug 2024 06:01:20 -0400 Subject: [PATCH 04/45] linted Signed-off-by: Francisco Javier Arceo --- .../tests/unit/test_on_demand_feature_view.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 35e2525e1b..4a0f346784 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -185,14 +185,14 @@ def test_python_native_transformation_mode(): ) assert ( - on_demand_feature_view_python_native.feature_transformation - == PythonTransformation(python_native_udf, "python native udf source code") + on_demand_feature_view_python_native.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") ) with pytest.raises(TypeError): assert ( - on_demand_feature_view_python_native_err.feature_transformation - == PythonTransformation(python_native_udf, "python native udf source code") + on_demand_feature_view_python_native_err.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") ) assert on_demand_feature_view_python_native.transform_dict( @@ -232,8 +232,8 @@ def test_from_proto_backwards_compatible_udf(): # and to populate it in feature_transformation proto = on_demand_feature_view.to_proto() assert ( - on_demand_feature_view.feature_transformation.udf_string - == proto.spec.feature_transformation.user_defined_function.body_text + on_demand_feature_view.feature_transformation.udf_string + == proto.spec.feature_transformation.user_defined_function.body_text ) # Because of the current set of code this is just confirming it is empty assert proto.spec.user_defined_function.body_text == "" @@ -258,8 +258,8 @@ def test_from_proto_backwards_compatible_udf(): # And now we expect the to get the same object back under feature_transformation reserialized_proto = OnDemandFeatureView.from_proto(proto) assert ( - reserialized_proto.feature_transformation.udf_string - == on_demand_feature_view.feature_transformation.udf_string + reserialized_proto.feature_transformation.udf_string + == on_demand_feature_view.feature_transformation.udf_string ) @@ -291,11 +291,10 @@ def test_on_demand_feature_view_writes_protos(): proto = on_demand_feature_view.to_proto() reserialized_proto = OnDemandFeatureView.from_proto(proto) - assert on_demand_feature_view.write_to_online_store assert proto.spec.write_to_online_store assert reserialized_proto.write_to_online_store proto.spec.write_to_online_store = False reserialized_proto = OnDemandFeatureView.from_proto(proto) - assert not reserialized_proto.write_to_online_store \ No newline at end of file + assert not reserialized_proto.write_to_online_store From fe387fc37021da450e66a99cebcb39d691b90102 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 18 Aug 2024 06:13:02 -0400 Subject: [PATCH 05/45] adding the test needed to show the expected behavior Signed-off-by: Francisco Javier Arceo --- .../tests/unit/test_on_demand_feature_view.py | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 4a0f346784..ec6fd3d7ce 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime from typing import Any, Dict, List import pandas as pd @@ -50,6 +50,15 @@ def python_native_udf(features_dict: Dict[str, Any]) -> Dict[str, Any]: return output_dict +def python_writes_test_udf(features_dict: Dict[str, Any]) -> Dict[str, Any]: + output_dict: Dict[str, List[Any]] = { + "output1": features_dict["feature1"] + 100, + "output2": features_dict["feature2"] + 101, + "output3": datetime.datetime.now(), + } + return output_dict + + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") @@ -298,3 +307,61 @@ def test_on_demand_feature_view_writes_protos(): proto.spec.write_to_online_store = False reserialized_proto = OnDemandFeatureView.from_proto(proto) assert not reserialized_proto.write_to_online_store + + +def test_on_demand_feature_view_stored_writes(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + + on_demand_feature_view = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PythonTransformation( + udf=python_writes_test_udf, udf_string="python native udf source code" + ), + description="testing on demand feature view stored writes", + mode="python", + write_to_online_store=True, + ) + transformed_output = on_demand_feature_view.transform_dict( + { + "feature1": 0, + "feature2": 1, + } + ) + expected_output = {"feature1": 0, "feature2": 1, "output1": 100, "output2": 102} + keys_to_validate = [ + "feature1", + "feature2", + "output1", + "output2", + ] + for k in keys_to_validate: + assert transformed_output[k] == expected_output[k] + + assert transformed_output["output3"] is not None and isinstance( + transformed_output["output3"], datetime.datetime + ) + + # Now this is where we need to test the stored writes, this should return the same output as the previous + twice_transformed_output = on_demand_feature_view.transform_dict( + { + "feature1": 0, + "feature2": 1, + } + ) + for k in twice_transformed_output: + assert twice_transformed_output[k] == transformed_output[k] From 646a1246b4cada0007ce4720340db545dbd23080 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 20 Aug 2024 22:32:59 -0400 Subject: [PATCH 06/45] updated test case Signed-off-by: Francisco Javier Arceo --- .../tests/unit/test_on_demand_feature_view.py | 11 +--- .../test_on_demand_python_transformation.py | 55 ++++++++++++++++++- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index ec6fd3d7ce..4b30bd6be9 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -336,6 +336,7 @@ def test_on_demand_feature_view_stored_writes(): mode="python", write_to_online_store=True, ) + transformed_output = on_demand_feature_view.transform_dict( { "feature1": 0, @@ -355,13 +356,3 @@ def test_on_demand_feature_view_stored_writes(): assert transformed_output["output3"] is not None and isinstance( transformed_output["output3"], datetime.datetime ) - - # Now this is where we need to test the stored writes, this should return the same output as the previous - twice_transformed_output = on_demand_feature_view.transform_dict( - { - "feature1": 0, - "feature2": 1, - } - ) - for k in twice_transformed_output: - assert twice_transformed_output[k] == transformed_output[k] diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index ff7ad494ca..3fd9484791 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -20,7 +20,7 @@ from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Array, Bool, Float32, Float64, Int64, String +from feast.types import Array, Bool, Float32, Float64, Int64, String, UnixTimestamp class TestOnDemandPythonTransformation(unittest.TestCase): @@ -140,6 +140,29 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: ) return output + @on_demand_feature_view( + sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + ], + mode="python", + write_to_online_store=True, + ) + def python_stored_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + } + return output + with pytest.raises(TypeError): # Note the singleton view will fail as the type is # expected to be a list which can be confirmed in _infer_features_dict @@ -162,14 +185,15 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: pandas_view, python_view, python_demo_view, + python_stored_writes_feature_view, ] ) self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df ) - assert len(self.store.list_all_feature_views()) == 4 + assert len(self.store.list_all_feature_views()) == 5 assert len(self.store.list_feature_views()) == 1 - assert len(self.store.list_on_demand_feature_views()) == 3 + assert len(self.store.list_on_demand_feature_views()) == 4 assert len(self.store.list_stream_feature_views()) == 0 def test_python_pandas_parity(self): @@ -257,6 +281,31 @@ def test_python_docs_demo(self): == online_python_response["conv_rate_plus_val2_python"][0] ) + def test_stored_writes(self): + entity_rows = [ + { + "driver_id": 1001, + } + ] + + online_python_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "python_stored_writes_feature_view:conv_rate_plus_acc", + "python_stored_writes_feature_view:current_datetime", + ], + ).to_dict() + + assert sorted(list(online_python_response.keys())) == sorted( + [ + "driver_id", + "conv_rate_plus_acc", + "current_datetime", + ] + ) + print(online_python_response) + # Now this is where we need to test the stored writes, this should return the same output as the previous + class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase): def setUp(self): From b345e7e22e98949da0f92ff9f70a1e50d0ee7a83 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 20 Aug 2024 23:38:02 -0400 Subject: [PATCH 07/45] saving progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 22 ++++++++++++------- .../feast/infra/passthrough_provider.py | 4 ++++ .../test_on_demand_python_transformation.py | 8 +++++++ 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 52556eda15..2e5e438d4f 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1444,19 +1444,18 @@ def write_to_online_store( inputs: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. """ - # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type + feature_view_dict = { + fv_proto.name: fv_proto + for fv_proto in self.list_all_feature_views(allow_registry_cache) + } try: - feature_view: FeatureView = self.get_stream_feature_view( - feature_view_name, allow_registry_cache=allow_registry_cache - ) + feature_view = feature_view_dict[feature_view_name] except FeatureViewNotFoundException: - feature_view = self.get_feature_view( - feature_view_name, allow_registry_cache=allow_registry_cache - ) + raise FeatureViewNotFoundException(feature_view_name, self.project) if df is not None and inputs is not None: raise ValueError("Both df and inputs cannot be provided at the same time.") if df is None and inputs is not None: - if isinstance(inputs, dict): + if isinstance(inputs, dict) or isinstance(inputs, List): try: df = pd.DataFrame(inputs) except Exception as _: @@ -1465,6 +1464,13 @@ def write_to_online_store( pass else: raise ValueError("inputs must be a dictionary or a pandas DataFrame.") + if df is not None and inputs is None: + if isinstance(df, dict) or isinstance(df, List): + try: + df = pd.DataFrame(df) + except Exception as _: + raise DataFrameSerializationError + provider = self._get_provider() provider.ingest_df(feature_view, df) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index c3c3048a89..24861e6390 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -279,6 +279,10 @@ def ingest_df( ): table = pa.Table.from_pandas(df) + # TODO: Update this to support On Demand Feature Views. + # Note: A dictionary mapping of column names in this data + # source to feature names in a feature table or view. Only used for feature + # columns, not entity or timestamp columns. if feature_view.batch_source.field_mapping is not None: table = _run_pyarrow_field_mapping( table, feature_view.batch_source.field_mapping diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 3fd9484791..285cf2572a 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1,6 +1,7 @@ import os import re import tempfile +import time import unittest from datetime import datetime, timedelta from typing import Any @@ -285,9 +286,16 @@ def test_stored_writes(self): entity_rows = [ { "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, } ] + self.store.write_to_online_store( + feature_view_name="python_stored_writes_feature_view", + df=entity_rows, + ) + time.sleep(1) online_python_response = self.store.get_online_features( entity_rows=entity_rows, features=[ From 16e68f096806c5e616689c0b6719f3bd58c8b497 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:21:02 -0400 Subject: [PATCH 08/45] merging Signed-off-by: Francisco Javier Arceo --- .../feast/infra/passthrough_provider.py | 38 ++++---- sdk/python/feast/on_demand_feature_view.py | 2 +- sdk/python/feast/types.py | 30 ++++++- sdk/python/feast/utils.py | 87 +++++++++++++++++++ .../test_on_demand_python_transformation.py | 2 - 5 files changed, 136 insertions(+), 23 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 24861e6390..1ae034cb12 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -5,7 +5,7 @@ import pyarrow as pa from tqdm import tqdm -from feast import importer +from feast import OnDemandFeatureView, importer from feast.batch_feature_view import BatchFeatureView from feast.data_source import DataSource from feast.entity import Entity @@ -278,25 +278,27 @@ def ingest_df( df: pd.DataFrame, ): table = pa.Table.from_pandas(df) + if isinstance(feature_view, OnDemandFeatureView): + # TODO: Update this to support On Demand Feature Views. + pass + else: + # Note: A dictionary mapping of column names in this data + # source to feature names in a feature table or view. Only used for feature + # columns, not entity or timestamp columns. + if feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, feature_view.batch_source.field_mapping + ) - # TODO: Update this to support On Demand Feature Views. - # Note: A dictionary mapping of column names in this data - # source to feature names in a feature table or view. Only used for feature - # columns, not entity or timestamp columns. - if feature_view.batch_source.field_mapping is not None: - table = _run_pyarrow_field_mapping( - table, feature_view.batch_source.field_mapping - ) - - join_keys = { - entity.name: entity.dtype.to_value_type() - for entity in feature_view.entity_columns - } - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + join_keys = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - self.online_write_batch( - self.repo_config, feature_view, rows_to_write, progress=None - ) + self.online_write_batch( + self.repo_config, feature_view, rows_to_write, progress=None + ) def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table): if feature_view.batch_source.field_mapping is not None: diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index d4c7924b9e..96805e6c68 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -3,7 +3,7 @@ import inspect import warnings from types import FunctionType -from typing import Any, List, Optional, Union, get_type_hints +from typing import Any, Optional, Union, get_type_hints, List import dill import pandas as pd diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 4b07c58d19..26e6d20a13 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -15,6 +15,8 @@ from enum import Enum from typing import Dict, Union +import pyarrow + from feast.value_type import ValueType PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = { @@ -103,7 +105,6 @@ def __hash__(self): Float64 = PrimitiveFeastType.FLOAT64 UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP - SUPPORTED_BASE_TYPES = [ Invalid, String, @@ -159,7 +160,6 @@ def __str__(self): FeastType = Union[ComplexFeastType, PrimitiveFeastType] - VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = { ValueType.UNKNOWN: Invalid, ValueType.BYTES: Bytes, @@ -180,6 +180,32 @@ def __str__(self): ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp), } +FEAST_TYPES_TO_PYARROW_TYPES = { + String: pyarrow.string(), + Bool: pyarrow.bool_(), + Int32: pyarrow.int32(), + Int64: pyarrow.int64(), + Float32: pyarrow.float32(), + Float64: pyarrow.float64(), + UnixTimestamp: pyarrow.timestamp(), +} + + +def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType: + """ + Converts a Feast type to a PyArrow type. + + Args: + feast_type: The Feast type to be converted. + + Raises: + ValueError: The conversion could not be performed. + """ + if feast_type in FEAST_TYPES_TO_PYARROW_TYPES: + return FEAST_TYPES_TO_PYARROW_TYPES[feast_type] + + raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.") + def from_value_type( value_type: ValueType, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 8a9f1fadae..76d6684ff8 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -43,6 +43,7 @@ from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.type_map import python_values_to_proto_values +from feast.types import from_feast_to_pyarrow_type from feast.value_type import ValueType from feast.version import get_version @@ -230,6 +231,78 @@ def _convert_arrow_to_proto( table: Union[pyarrow.Table, pyarrow.RecordBatch], feature_view: "FeatureView", join_keys: Dict[str, ValueType], +) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + if isinstance(feature_view, OnDemandFeatureView): + return _convert_arrow_odfv_to_proto(table, feature_view, join_keys) + else: + return _convert_arrow_fv_to_proto(table, feature_view, join_keys) + + +def _convert_arrow_fv_to_proto( + table: Union[pyarrow.Table, pyarrow.RecordBatch], + feature_view: "FeatureView", + join_keys: Dict[str, ValueType], +) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: + # Avoid ChunkedArrays which guarantees `zero_copy_only` available. + if isinstance(table, pyarrow.Table): + table = table.to_batches()[0] + + columns = [ + (field.name, field.dtype.to_value_type()) for field in feature_view.features + ] + list(join_keys.items()) + + proto_values_by_column = { + column: python_values_to_proto_values( + table.column(column).to_numpy(zero_copy_only=False), value_type + ) + for column, value_type in columns + } + + entity_keys = [ + EntityKeyProto( + join_keys=join_keys, + entity_values=[proto_values_by_column[k][idx] for k in join_keys], + ) + for idx in range(table.num_rows) + ] + + # Serialize the features per row + feature_dict = { + feature.name: proto_values_by_column[feature.name] + for feature in feature_view.features + } + features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] + + # Convert event_timestamps + event_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column(feature_view.batch_source.timestamp_field).to_numpy( + zero_copy_only=False + ) + ) + ] + + # Convert created_timestamps if they exist + if feature_view.batch_source.created_timestamp_column: + created_timestamps = [ + _coerce_datetime(val) + for val in pd.to_datetime( + table.column( + feature_view.batch_source.created_timestamp_column + ).to_numpy(zero_copy_only=False) + ) + ] + else: + created_timestamps = [None] * table.num_rows + + return list(zip(entity_keys, features, event_timestamps, created_timestamps)) + + +def _convert_arrow_odfv_to_proto( + table: Union[pyarrow.Table, pyarrow.RecordBatch], + feature_view: "FeatureView", + join_keys: Dict[str, ValueType], ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: # Avoid ChunkedArrays which guarantees `zero_copy_only` available. if isinstance(table, pyarrow.Table): @@ -244,7 +317,21 @@ def _convert_arrow_to_proto( table.column(column).to_numpy(zero_copy_only=False), value_type ) for column, value_type in columns + if column in table.column_names } + # Adding On Demand Features + for feature in feature_view.features: + if feature.name in [c[0] for c in columns]: + # initializing the column as null + proto_values_by_column[feature.name] = python_values_to_proto_values( + table.append_column( + feature.name, + pyarrow.array( + [None] * table.shape[0], + type=from_feast_to_pyarrow_type(feature.dtype), + ), + ), + ) entity_keys = [ EntityKeyProto( diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 285cf2572a..df512b3e67 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -311,8 +311,6 @@ def test_stored_writes(self): "current_datetime", ] ) - print(online_python_response) - # Now this is where we need to test the stored writes, this should return the same output as the previous class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase): From 24c4ae258b8da9ac380d0307d3e1180b578a031c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:25:23 -0400 Subject: [PATCH 09/45] merged Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/types.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 26e6d20a13..5287fee55c 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -18,6 +18,7 @@ import pyarrow from feast.value_type import ValueType +from feast.utils import _utc_now PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = { "INVALID": "UNKNOWN", @@ -187,7 +188,8 @@ def __str__(self): Int64: pyarrow.int64(), Float32: pyarrow.float32(), Float64: pyarrow.float64(), - UnixTimestamp: pyarrow.timestamp(), + # Note: datetime only supports microseconds https://github.com/python/cpython/blob/3.8/Lib/datetime.py#L1559 + UnixTimestamp: pyarrow.timestamp('us', tz=_utc_now().tzname()), } From 3d2188154029ba1e5d86e7ce92aff67b766a4ed4 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:27:20 -0400 Subject: [PATCH 10/45] merged Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/types.py | 8 ++++++-- sdk/python/feast/utils.py | 6 +++++- .../test_on_demand_python_transformation.py | 19 ++++++++++++------- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 5287fee55c..b934f12a86 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import ABC, abstractmethod +from datetime import datetime, timezone from enum import Enum from typing import Dict, Union import pyarrow from feast.value_type import ValueType -from feast.utils import _utc_now PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = { "INVALID": "UNKNOWN", @@ -33,6 +33,10 @@ } +def _utc_now() -> datetime: + return datetime.now(tz=timezone.utc) + + class ComplexFeastType(ABC): """ A ComplexFeastType represents a structured type that is recognized by Feast. @@ -189,7 +193,7 @@ def __str__(self): Float32: pyarrow.float32(), Float64: pyarrow.float64(), # Note: datetime only supports microseconds https://github.com/python/cpython/blob/3.8/Lib/datetime.py#L1559 - UnixTimestamp: pyarrow.timestamp('us', tz=_utc_now().tzname()), + UnixTimestamp: pyarrow.timestamp("us", tz=_utc_now().tzname()), } diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 76d6684ff8..e89d7e97bc 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -232,7 +232,8 @@ def _convert_arrow_to_proto( feature_view: "FeatureView", join_keys: Dict[str, ValueType], ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: - if isinstance(feature_view, OnDemandFeatureView): + # This is a workaround for isinstance(feature_view, OnDemandFeatureView), which triggers a circular import + if getattr(feature_view, "source_request_sources", None): return _convert_arrow_odfv_to_proto(table, feature_view, join_keys) else: return _convert_arrow_fv_to_proto(table, feature_view, join_keys) @@ -1140,6 +1141,7 @@ def tags_str_to_dict(tags: str = "") -> dict[str, str]: cast(tuple[str, str], tag.split(":", 1)) for tag in tags_list if ":" in tag ).items() } +<<<<<<< HEAD def _utc_now() -> datetime: @@ -1192,3 +1194,5 @@ def _build_retrieve_online_document_record( vector_value_proto, distance_value_proto, ) +======= +>>>>>>> f97a28ca (checking in progress) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index df512b3e67..4d17ae6be8 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1,7 +1,6 @@ import os import re import tempfile -import time import unittest from datetime import datetime, timedelta from typing import Any @@ -162,6 +161,7 @@ def python_stored_writes_feature_view( ], "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], } + print("running odfv transform") return output with pytest.raises(TypeError): @@ -283,27 +283,32 @@ def test_python_docs_demo(self): ) def test_stored_writes(self): - entity_rows = [ + entity_rows_to_write = [ { "driver_id": 1001, "conv_rate": 0.25, "acc_rate": 0.25, } ] - + entity_rows_to_read = [ + { + "driver_id": 1001, + } + ] + print("storing odfv features") self.store.write_to_online_store( feature_view_name="python_stored_writes_feature_view", - df=entity_rows, + df=entity_rows_to_write, ) - time.sleep(1) + print("reading odfv features") online_python_response = self.store.get_online_features( - entity_rows=entity_rows, + entity_rows=entity_rows_to_read, features=[ "python_stored_writes_feature_view:conv_rate_plus_acc", "python_stored_writes_feature_view:current_datetime", ], ).to_dict() - + print(online_python_response) assert sorted(list(online_python_response.keys())) == sorted( [ "driver_id", From 7afaa84dc5cd1adce13938ab0af38483f42d86a3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:30:50 -0400 Subject: [PATCH 11/45] merging Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index e89d7e97bc..560a5d997e 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1141,7 +1141,6 @@ def tags_str_to_dict(tags: str = "") -> dict[str, str]: cast(tuple[str, str], tag.split(":", 1)) for tag in tags_list if ":" in tag ).items() } -<<<<<<< HEAD def _utc_now() -> datetime: @@ -1194,5 +1193,3 @@ def _build_retrieve_online_document_record( vector_value_proto, distance_value_proto, ) -======= ->>>>>>> f97a28ca (checking in progress) From 436478aca1aa939e20c7bc67265f3e96eac50634 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 28 Aug 2024 22:54:42 -0400 Subject: [PATCH 12/45] adding the entity keys for now to do retrieval Signed-off-by: Francisco Javier Arceo --- .../test_on_demand_python_transformation.py | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 4d17ae6be8..74b996dab5 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -20,7 +20,16 @@ from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig from feast.on_demand_feature_view import on_demand_feature_view -from feast.types import Array, Bool, Float32, Float64, Int64, String, UnixTimestamp +from feast.types import ( + Array, + Bool, + Float32, + Float64, + Int64, + String, + UnixTimestamp, + _utc_now, +) class TestOnDemandPythonTransformation(unittest.TestCase): @@ -59,6 +68,13 @@ def setUp(self): timestamp_field="event_timestamp", created_timestamp_column="created", ) + input_request_source = RequestSource( + name="counter_source", + schema=[ + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + ], + ) driver_stats_fv = FeatureView( name="driver_hourly_stats", @@ -141,10 +157,15 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: return output @on_demand_feature_view( - sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], schema=[ Field(name="conv_rate_plus_acc", dtype=Float64), Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), ], mode="python", write_to_online_store=True, @@ -160,6 +181,8 @@ def python_stored_writes_feature_view( ) ], "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], } print("running odfv transform") return output @@ -201,6 +224,8 @@ def test_python_pandas_parity(self): entity_rows = [ { "driver_id": 1001, + "counter": 0, + "input_datetime": _utc_now(), } ] @@ -283,13 +308,18 @@ def test_python_docs_demo(self): ) def test_stored_writes(self): + current_datetime = _utc_now() entity_rows_to_write = [ { "driver_id": 1001, "conv_rate": 0.25, "acc_rate": 0.25, + "counter": 0, + "input_datetime": current_datetime, } ] + # Note that here we shouldn't have to pass the request source features for reading + # because they should have already been written to the online store entity_rows_to_read = [ { "driver_id": 1001, @@ -306,6 +336,8 @@ def test_stored_writes(self): features=[ "python_stored_writes_feature_view:conv_rate_plus_acc", "python_stored_writes_feature_view:current_datetime", + "python_stored_writes_feature_view:counter", + "python_stored_writes_feature_view:input_datetime", ], ).to_dict() print(online_python_response) @@ -313,7 +345,9 @@ def test_stored_writes(self): [ "driver_id", "conv_rate_plus_acc", + "counter", "current_datetime", + "input_datetime", ] ) From c78aeadf6bf62652a64878abb3baa56c82580cab Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 28 Aug 2024 23:04:06 -0400 Subject: [PATCH 13/45] adding entity to odfv Signed-off-by: Francisco Javier Arceo --- .../tests/unit/test_on_demand_python_transformation.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 74b996dab5..6285c56664 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -157,6 +157,7 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: return output @on_demand_feature_view( + entities=[driver], sources=[ driver_stats_fv[["conv_rate", "acc_rate"]], input_request_source, @@ -224,8 +225,6 @@ def test_python_pandas_parity(self): entity_rows = [ { "driver_id": 1001, - "counter": 0, - "input_datetime": _utc_now(), } ] @@ -312,8 +311,6 @@ def test_stored_writes(self): entity_rows_to_write = [ { "driver_id": 1001, - "conv_rate": 0.25, - "acc_rate": 0.25, "counter": 0, "input_datetime": current_datetime, } @@ -323,6 +320,10 @@ def test_stored_writes(self): entity_rows_to_read = [ { "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, + "counter": 0, + "input_datetime": current_datetime, } ] print("storing odfv features") From fbebc634c3d271531c0004357f7d6a4de358279d Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 28 Aug 2024 23:30:05 -0400 Subject: [PATCH 14/45] checking in progress...getting closer Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/utils.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 560a5d997e..c1fdc3e640 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1019,6 +1019,17 @@ def _prepare_entities_to_read_from_online_store( num_rows = _validate_entity_values(entity_proto_values) + odfv_entities = [] + request_source_keys = [] + for on_demand_feature_view in requested_on_demand_feature_views: + odfv_entities.append(*getattr(on_demand_feature_view, "entities", None)) + for source in on_demand_feature_view.source_request_sources: + source_schema = on_demand_feature_view.source_request_sources[source].schema + for column in source_schema: + request_source_keys.append(column.name) + + join_keys_set.update(set(odfv_entities)) + join_key_values: Dict[str, List[ValueProto]] = {} request_data_features: Dict[str, List[ValueProto]] = {} # Entity rows may be either entities or request data. @@ -1031,7 +1042,8 @@ def _prepare_entities_to_read_from_online_store( join_key = join_key_or_entity_name else: try: - join_key = entity_name_to_join_key_map[join_key_or_entity_name] + if join_key_or_entity_name in request_source_keys: + join_key = entity_name_to_join_key_map[join_key_or_entity_name] except KeyError: raise EntityNotFoundException(join_key_or_entity_name, project) else: From 5b3e2f57e67b6ae90e816b4a26d8f42c73b1801a Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 31 Aug 2024 06:26:05 -0400 Subject: [PATCH 15/45] may have to revert some of this...looks like the challenge is getting the entities correct when storing writes. just checking in progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 26 ++++++++++++++++++- sdk/python/feast/feature_view.py | 1 + .../feast/infra/passthrough_provider.py | 18 +++++++++++-- sdk/python/feast/on_demand_feature_view.py | 1 + 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2e5e438d4f..2d019b3b17 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1472,7 +1472,31 @@ def write_to_online_store( raise DataFrameSerializationError provider = self._get_provider() - provider.ingest_df(feature_view, df) + if isinstance(feature_view, OnDemandFeatureView): + # projection_mapping = {} + # source_projections = feature_view.source_feature_view_projections + # for projection in source_projections: + # try: + # field_mapping = self.get_feature_view(projection).batch_source.field_mapping + # except: + # print(f'feature view {feature_view} broke') + # raise ValueError( + # "Field mapping not found for source feature view. Please check the source feature view configuration." + # ) + # projection_mapping.update(self.get_feature_view(field_mapping)) + # + # request_source_projections = feature_view.source_request_projections + # for projection in request_source_projections: + # try: + # field_mapping = projection.field_mapping + # except: + # raise ValueError( + # "Request sources not found for on demand feature view. Please check the source feature view configuration." + # ) + # projection_mapping.update(self.get_feature_view(projection)) + provider.ingest_df(feature_view, df, {}) + else: + provider.ingest_df(feature_view, df) def write_to_offline_store( self, diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 33ea761158..3e3775b345 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -132,6 +132,7 @@ def __init__( """ self.name = name self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] + print('fv', self.entities) self.ttl = ttl schema = schema or [] diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 1ae034cb12..87cf9c98db 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -276,11 +276,25 @@ def ingest_df( self, feature_view: FeatureView, df: pd.DataFrame, + field_mapping: Optional[Dict] = None, ): table = pa.Table.from_pandas(df) if isinstance(feature_view, OnDemandFeatureView): - # TODO: Update this to support On Demand Feature Views. - pass + table = _run_pyarrow_field_mapping(table, field_mapping) + join_keys = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + + self.online_write_batch( + self.repo_config, feature_view, rows_to_write, progress=None + ) + # We get these two sets of feature view projections, which are similar to the batch source for regular feature_views + # We need to extract these dependencies to get the data types and field mappings to run the _convert_arrow_to_proto function + #print(feature_view.source_feature_view_projections) + #print(feature_view.source_request_sources) + #print('something should happen here') else: # Note: A dictionary mapping of column names in this data # source to feature names in a feature table or view. Only used for feature diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 96805e6c68..c6f1d6e1c1 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -132,6 +132,7 @@ def __init__( # noqa: C901 schema = schema or [] self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] + print('odfv', self.entities) self.mode = mode.lower() if self.mode not in {"python", "pandas", "substrait"}: From 7879e21b9d38c856a25743edcc4c0be7969a97d5 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 1 Sep 2024 06:41:19 -0400 Subject: [PATCH 16/45] moving things around to make it easier to debug Signed-off-by: Francisco Javier Arceo --- .../test_on_demand_python_transformation.py | 330 +++++++++++++----- 1 file changed, 250 insertions(+), 80 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 6285c56664..1b097eb588 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -156,38 +156,6 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: ) return output - @on_demand_feature_view( - entities=[driver], - sources=[ - driver_stats_fv[["conv_rate", "acc_rate"]], - input_request_source, - ], - schema=[ - Field(name="conv_rate_plus_acc", dtype=Float64), - Field(name="current_datetime", dtype=UnixTimestamp), - Field(name="counter", dtype=Int64), - Field(name="input_datetime", dtype=UnixTimestamp), - ], - mode="python", - write_to_online_store=True, - ) - def python_stored_writes_feature_view( - inputs: dict[str, Any], - ) -> dict[str, Any]: - output: dict[str, Any] = { - "conv_rate_plus_acc": [ - conv_rate + acc_rate - for conv_rate, acc_rate in zip( - inputs["conv_rate"], inputs["acc_rate"] - ) - ], - "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], - "counter": [c + 1 for c in inputs["counter"]], - "input_datetime": [d for d in inputs["input_datetime"]], - } - print("running odfv transform") - return output - with pytest.raises(TypeError): # Note the singleton view will fail as the type is # expected to be a list which can be confirmed in _infer_features_dict @@ -210,7 +178,6 @@ def python_stored_writes_feature_view( pandas_view, python_view, python_demo_view, - python_stored_writes_feature_view, ] ) self.store.write_to_online_store( @@ -306,53 +273,6 @@ def test_python_docs_demo(self): == online_python_response["conv_rate_plus_val2_python"][0] ) - def test_stored_writes(self): - current_datetime = _utc_now() - entity_rows_to_write = [ - { - "driver_id": 1001, - "counter": 0, - "input_datetime": current_datetime, - } - ] - # Note that here we shouldn't have to pass the request source features for reading - # because they should have already been written to the online store - entity_rows_to_read = [ - { - "driver_id": 1001, - "conv_rate": 0.25, - "acc_rate": 0.25, - "counter": 0, - "input_datetime": current_datetime, - } - ] - print("storing odfv features") - self.store.write_to_online_store( - feature_view_name="python_stored_writes_feature_view", - df=entity_rows_to_write, - ) - print("reading odfv features") - online_python_response = self.store.get_online_features( - entity_rows=entity_rows_to_read, - features=[ - "python_stored_writes_feature_view:conv_rate_plus_acc", - "python_stored_writes_feature_view:current_datetime", - "python_stored_writes_feature_view:counter", - "python_stored_writes_feature_view:input_datetime", - ], - ).to_dict() - print(online_python_response) - assert sorted(list(online_python_response.keys())) == sorted( - [ - "driver_id", - "conv_rate_plus_acc", - "counter", - "current_datetime", - "input_datetime", - ] - ) - - class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase): def setUp(self): with tempfile.TemporaryDirectory() as data_dir: @@ -589,3 +509,253 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]: ), ): store.apply([request_source, python_view]) + + + +class TestOnDemandTransformationsWithWrites(unittest.TestCase): + def setUp(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + input_request_source = RequestSource( + name="counter_source", + schema=[ + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + ], + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=0), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + ) + + @on_demand_feature_view( + entities=[driver], + sources=[ + driver_stats_fv[["conv_rate", "acc_rate"]], + input_request_source, + ], + schema=[ + Field(name="conv_rate_plus_acc", dtype=Float64), + Field(name="current_datetime", dtype=UnixTimestamp), + Field(name="counter", dtype=Int64), + Field(name="input_datetime", dtype=UnixTimestamp), + ], + mode="python", + write_to_online_store=True, + ) + def python_stored_writes_feature_view( + inputs: dict[str, Any], + ) -> dict[str, Any]: + output: dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ], + "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], + "counter": [c + 1 for c in inputs["counter"]], + "input_datetime": [d for d in inputs["input_datetime"]], + } + print("running odfv transform") + return output + + with pytest.raises(TypeError): + # Note the singleton view will fail as the type is + # expected to be a list which can be confirmed in _infer_features_dict + self.store.apply( + [ + driver, + driver_stats_source, + driver_stats_fv, + ] + ) + + self.store.apply( + [ + driver, + driver_stats_source, + driver_stats_fv, + python_stored_writes_feature_view, + ] + ) + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_df + ) + assert len(self.store.list_all_feature_views()) == 5 + assert len(self.store.list_feature_views()) == 1 + assert len(self.store.list_on_demand_feature_views()) == 4 + assert len(self.store.list_stream_feature_views()) == 0 + + def test_python_pandas_parity(self): + entity_rows = [ + { + "driver_id": 1001, + } + ] + + online_python_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "python_view:conv_rate_plus_acc_python", + ], + ).to_dict() + + online_pandas_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "pandas_view:conv_rate_plus_acc_pandas", + ], + ).to_df() + + assert len(online_python_response) == 4 + assert all( + key in online_python_response.keys() + for key in [ + "driver_id", + "acc_rate", + "conv_rate", + "conv_rate_plus_acc_python", + ] + ) + assert len(online_python_response["conv_rate_plus_acc_python"]) == 1 + assert ( + online_python_response["conv_rate_plus_acc_python"][0] + == online_pandas_response["conv_rate_plus_acc_pandas"][0] + == online_python_response["conv_rate"][0] + + online_python_response["acc_rate"][0] + ) + + def test_python_docs_demo(self): + entity_rows = [ + { + "driver_id": 1001, + } + ] + + online_python_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "python_demo_view:conv_rate_plus_val1_python", + "python_demo_view:conv_rate_plus_val2_python", + ], + ).to_dict() + + assert sorted(list(online_python_response.keys())) == sorted( + [ + "driver_id", + "acc_rate", + "conv_rate", + "conv_rate_plus_val1_python", + "conv_rate_plus_val2_python", + ] + ) + + assert ( + online_python_response["conv_rate_plus_val1_python"][0] + == online_python_response["conv_rate_plus_val2_python"][0] + ) + assert ( + online_python_response["conv_rate"][0] + + online_python_response["acc_rate"][0] + == online_python_response["conv_rate_plus_val1_python"][0] + ) + assert ( + online_python_response["conv_rate"][0] + + online_python_response["acc_rate"][0] + == online_python_response["conv_rate_plus_val2_python"][0] + ) + + def test_stored_writes(self): + current_datetime = _utc_now() + entity_rows_to_write = [ + { + "driver_id": 1001, + "counter": 0, + "input_datetime": current_datetime, + } + ] + # Note that here we shouldn't have to pass the request source features for reading + # because they should have already been written to the online store + entity_rows_to_read = [ + { + "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, + "counter": 0, + "input_datetime": current_datetime, + } + ] + print("storing odfv features") + self.store.write_to_online_store( + feature_view_name="python_stored_writes_feature_view", + df=entity_rows_to_write, + ) + print("reading odfv features") + online_python_response = self.store.get_online_features( + entity_rows=entity_rows_to_read, + features=[ + "python_stored_writes_feature_view:conv_rate_plus_acc", + "python_stored_writes_feature_view:current_datetime", + "python_stored_writes_feature_view:counter", + "python_stored_writes_feature_view:input_datetime", + ], + ).to_dict() + print(online_python_response) + assert sorted(list(online_python_response.keys())) == sorted( + [ + "driver_id", + "conv_rate_plus_acc", + "counter", + "current_datetime", + "input_datetime", + ] + ) + From 72caa151771b1f0a3e60c40fe1407b69843e24a9 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 1 Sep 2024 14:45:26 -0400 Subject: [PATCH 17/45] debugging Signed-off-by: Francisco Javier Arceo --- .../test_on_demand_python_transformation.py | 101 +----------------- 1 file changed, 2 insertions(+), 99 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 1b097eb588..b4c387c10d 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -601,17 +601,6 @@ def python_stored_writes_feature_view( print("running odfv transform") return output - with pytest.raises(TypeError): - # Note the singleton view will fail as the type is - # expected to be a list which can be confirmed in _infer_features_dict - self.store.apply( - [ - driver, - driver_stats_source, - driver_stats_fv, - ] - ) - self.store.apply( [ driver, @@ -623,96 +612,10 @@ def python_stored_writes_feature_view( self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df ) - assert len(self.store.list_all_feature_views()) == 5 + assert len(self.store.list_all_feature_views()) == 2 assert len(self.store.list_feature_views()) == 1 - assert len(self.store.list_on_demand_feature_views()) == 4 + assert len(self.store.list_on_demand_feature_views()) == 1 assert len(self.store.list_stream_feature_views()) == 0 - - def test_python_pandas_parity(self): - entity_rows = [ - { - "driver_id": 1001, - } - ] - - online_python_response = self.store.get_online_features( - entity_rows=entity_rows, - features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "python_view:conv_rate_plus_acc_python", - ], - ).to_dict() - - online_pandas_response = self.store.get_online_features( - entity_rows=entity_rows, - features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "pandas_view:conv_rate_plus_acc_pandas", - ], - ).to_df() - - assert len(online_python_response) == 4 - assert all( - key in online_python_response.keys() - for key in [ - "driver_id", - "acc_rate", - "conv_rate", - "conv_rate_plus_acc_python", - ] - ) - assert len(online_python_response["conv_rate_plus_acc_python"]) == 1 - assert ( - online_python_response["conv_rate_plus_acc_python"][0] - == online_pandas_response["conv_rate_plus_acc_pandas"][0] - == online_python_response["conv_rate"][0] - + online_python_response["acc_rate"][0] - ) - - def test_python_docs_demo(self): - entity_rows = [ - { - "driver_id": 1001, - } - ] - - online_python_response = self.store.get_online_features( - entity_rows=entity_rows, - features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "python_demo_view:conv_rate_plus_val1_python", - "python_demo_view:conv_rate_plus_val2_python", - ], - ).to_dict() - - assert sorted(list(online_python_response.keys())) == sorted( - [ - "driver_id", - "acc_rate", - "conv_rate", - "conv_rate_plus_val1_python", - "conv_rate_plus_val2_python", - ] - ) - - assert ( - online_python_response["conv_rate_plus_val1_python"][0] - == online_python_response["conv_rate_plus_val2_python"][0] - ) - assert ( - online_python_response["conv_rate"][0] - + online_python_response["acc_rate"][0] - == online_python_response["conv_rate_plus_val1_python"][0] - ) - assert ( - online_python_response["conv_rate"][0] - + online_python_response["acc_rate"][0] - == online_python_response["conv_rate_plus_val2_python"][0] - ) - def test_stored_writes(self): current_datetime = _utc_now() entity_rows_to_write = [ From 6d2388997f707fa8431a5942dad1dcd21ea55889 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:34:21 -0400 Subject: [PATCH 18/45] merged Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 3 +++ sdk/python/feast/feature_view.py | 1 - sdk/python/feast/inference.py | 17 +++++++++++++++-- sdk/python/feast/on_demand_feature_view.py | 1 - .../test_on_demand_python_transformation.py | 9 +++++++-- 5 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2d019b3b17..3eb051a3d2 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -608,6 +608,9 @@ def _make_inferences( update_feature_views_with_inferred_features_and_entities( sfvs_to_update, entities + entities_to_update, self.config ) + update_feature_views_with_inferred_features_and_entities( + odfvs_to_update, entities + entities_to_update, self.config + ) # TODO(kevjumba): Update schema inferrence for sfv in sfvs_to_update: if not sfv.schema: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 3e3775b345..33ea761158 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -132,7 +132,6 @@ def __init__( """ self.name = name self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] - print('fv', self.entities) self.ttl = ttl schema = schema or [] diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index b9fb9b694d..195b437c25 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -16,6 +16,7 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.types import String from feast.value_type import ValueType @@ -161,8 +162,13 @@ def update_feature_views_with_inferred_features_and_entities( # Infer a dummy entity column for entityless feature views. if ( +<<<<<<< HEAD len(fv_entities) == 1 and fv_entities[0] == DUMMY_ENTITY_NAME +======= + len(fv.entities) == 1 + and fv.entities[0] == DUMMY_ENTITY_NAME +>>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) and not entity_columns ): entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) @@ -204,11 +210,15 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ +<<<<<<< HEAD if isinstance(fv, OnDemandFeatureView): return _infer_on_demand_features_and_entities( fv, join_keys, run_inference_for_features, config ) +======= + entity_columns = [] +>>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) columns_to_exclude = { fv.batch_source.timestamp_field, fv.batch_source.created_timestamp_column, @@ -233,9 +243,9 @@ def _infer_features_and_entities( ), ) if field.name not in [ - entity_column.name for entity_column in fv.entity_columns + entity_column.name for entity_column in entity_columns ]: - fv.entity_columns.append(field) + entity_columns.append(field) elif not re.match( "^__|__$", col_name ): # double underscores often signal an internal-use column @@ -256,6 +266,7 @@ def _infer_features_and_entities( if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) +<<<<<<< HEAD def _infer_on_demand_features_and_entities( fv: OnDemandFeatureView, @@ -331,4 +342,6 @@ def _infer_on_demand_features_and_entities( feature.name for feature in source_feature_view.features ]: source_feature_view.features.append(field) +======= +>>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) fv.entity_columns = entity_columns diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index c6f1d6e1c1..96805e6c68 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -132,7 +132,6 @@ def __init__( # noqa: C901 schema = schema or [] self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME] - print('odfv', self.entities) self.mode = mode.lower() if self.mode not in {"python", "pandas", "substrait"}: diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index b4c387c10d..366dff7523 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -183,9 +183,9 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df ) - assert len(self.store.list_all_feature_views()) == 5 + assert len(self.store.list_all_feature_views()) == 4 assert len(self.store.list_feature_views()) == 1 - assert len(self.store.list_on_demand_feature_views()) == 4 + assert len(self.store.list_on_demand_feature_views()) == 3 assert len(self.store.list_stream_feature_views()) == 0 def test_python_pandas_parity(self): @@ -616,6 +616,11 @@ def python_stored_writes_feature_view( assert len(self.store.list_feature_views()) == 1 assert len(self.store.list_on_demand_feature_views()) == 1 assert len(self.store.list_stream_feature_views()) == 0 + assert driver_stats_fv.entity_columns == \ + self.store.get_feature_view('driver_hourly_stats').entity_columns + assert self.store.get_on_demand_feature_view('python_stored_writes_feature_view').entity_columns == \ + self.store.get_feature_view('driver_hourly_stats').entity_columns + def test_stored_writes(self): current_datetime = _utc_now() entity_rows_to_write = [ From b76bf4e77e9814e280db7e16cf878fca7d65690e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 23 Sep 2024 23:38:46 -0400 Subject: [PATCH 19/45] merging Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 ++ sdk/python/feast/inference.py | 11 ----------- sdk/python/feast/on_demand_feature_view.py | 1 + 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 3eb051a3d2..854ee81b73 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -608,6 +608,8 @@ def _make_inferences( update_feature_views_with_inferred_features_and_entities( sfvs_to_update, entities + entities_to_update, self.config ) + # We need to attach the time stamp fields to the underlying data sources + # and cascade the dependencies update_feature_views_with_inferred_features_and_entities( odfvs_to_update, entities + entities_to_update, self.config ) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 195b437c25..42804137cc 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -162,13 +162,8 @@ def update_feature_views_with_inferred_features_and_entities( # Infer a dummy entity column for entityless feature views. if ( -<<<<<<< HEAD len(fv_entities) == 1 and fv_entities[0] == DUMMY_ENTITY_NAME -======= - len(fv.entities) == 1 - and fv.entities[0] == DUMMY_ENTITY_NAME ->>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) and not entity_columns ): entity_columns.append(Field(name=DUMMY_ENTITY_ID, dtype=String)) @@ -210,15 +205,12 @@ def _infer_features_and_entities( run_inference_for_features: Whether to run inference for features. config: The config for the current feature store. """ -<<<<<<< HEAD if isinstance(fv, OnDemandFeatureView): return _infer_on_demand_features_and_entities( fv, join_keys, run_inference_for_features, config ) -======= entity_columns = [] ->>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) columns_to_exclude = { fv.batch_source.timestamp_field, fv.batch_source.created_timestamp_column, @@ -266,7 +258,6 @@ def _infer_features_and_entities( if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) -<<<<<<< HEAD def _infer_on_demand_features_and_entities( fv: OnDemandFeatureView, @@ -342,6 +333,4 @@ def _infer_on_demand_features_and_entities( feature.name for feature in source_feature_view.features ]: source_feature_view.features.append(field) -======= ->>>>>>> 743ae513 (storing current progress...things are getting overriden in the _infer_features_and_entities() method in FeatureStore...that is another thing we have to chnage) fv.entity_columns = entity_columns diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 96805e6c68..a55eb4bbc5 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -162,6 +162,7 @@ def __init__( # noqa: C901 self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): self.source_feature_view_projections[odfv_source.name] = odfv_source + else: self.source_feature_view_projections[odfv_source.name] = ( odfv_source.projection From 3cf03691cce8afcc4df78237604f6fe2499ea850 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 6 Sep 2024 08:07:21 -0400 Subject: [PATCH 20/45] Rebasing and merging changes from other PR Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/inference.py | 1 - sdk/python/feast/utils.py | 18 ++++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 42804137cc..b5efdd338a 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,7 +13,6 @@ from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource -from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView from feast.on_demand_feature_view import OnDemandFeatureView diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index c1fdc3e640..3f9c8ee927 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -322,16 +322,18 @@ def _convert_arrow_odfv_to_proto( } # Adding On Demand Features for feature in feature_view.features: - if feature.name in [c[0] for c in columns]: + if feature.name in [c[0] for c in columns] and feature.name not in proto_values_by_column: # initializing the column as null - proto_values_by_column[feature.name] = python_values_to_proto_values( - table.append_column( - feature.name, - pyarrow.array( - [None] * table.shape[0], + null_column = pyarrow.array( + [None] * table.num_rows, type=from_feast_to_pyarrow_type(feature.dtype), - ), - ), + ) + updated_table = pyarrow.RecordBatch.from_arrays( + table.columns + [null_column], + schema=table.schema.append(pyarrow.field(feature.name, null_column.type)) + ) + proto_values_by_column[feature.name] = python_values_to_proto_values( + updated_table.column(feature.name).to_numpy(zero_copy_only=False), feature.dtype.to_value_type(), ) entity_keys = [ From 3a6dfc40d28377c80559bf81580c478010acfe26 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 6 Sep 2024 23:26:44 -0400 Subject: [PATCH 21/45] Merging changes continued Signed-off-by: Francisco Javier Arceo --- .../feast/infra/passthrough_provider.py | 5 -- sdk/python/feast/utils.py | 47 ++++++++----------- .../test_on_demand_python_transformation.py | 24 +++++----- 3 files changed, 31 insertions(+), 45 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 87cf9c98db..b0c67bcf15 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -290,11 +290,6 @@ def ingest_df( self.online_write_batch( self.repo_config, feature_view, rows_to_write, progress=None ) - # We get these two sets of feature view projections, which are similar to the batch source for regular feature_views - # We need to extract these dependencies to get the data types and field mappings to run the _convert_arrow_to_proto function - #print(feature_view.source_feature_view_projections) - #print(feature_view.source_request_sources) - #print('something should happen here') else: # Note: A dictionary mapping of column names in this data # source to feature names in a feature table or view. Only used for feature diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 3f9c8ee927..dd37559762 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -322,18 +322,24 @@ def _convert_arrow_odfv_to_proto( } # Adding On Demand Features for feature in feature_view.features: - if feature.name in [c[0] for c in columns] and feature.name not in proto_values_by_column: + if ( + feature.name in [c[0] for c in columns] + and feature.name not in proto_values_by_column + ): # initializing the column as null null_column = pyarrow.array( - [None] * table.num_rows, - type=from_feast_to_pyarrow_type(feature.dtype), - ) + [None] * table.num_rows, + type=from_feast_to_pyarrow_type(feature.dtype), + ) updated_table = pyarrow.RecordBatch.from_arrays( table.columns + [null_column], - schema=table.schema.append(pyarrow.field(feature.name, null_column.type)) + schema=table.schema.append( + pyarrow.field(feature.name, null_column.type) + ), ) proto_values_by_column[feature.name] = python_values_to_proto_values( - updated_table.column(feature.name).to_numpy(zero_copy_only=False), feature.dtype.to_value_type(), + updated_table.column(feature.name).to_numpy(zero_copy_only=False), + feature.dtype.to_value_type(), ) entity_keys = [ @@ -351,28 +357,15 @@ def _convert_arrow_odfv_to_proto( } features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] - # Convert event_timestamps - event_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column(feature_view.batch_source.timestamp_field).to_numpy( - zero_copy_only=False - ) - ) - ] + # We need to artificially add event_timestamps and created_timestamps + event_timestamps = [] + timestamp_values = pd.to_datetime([_utc_now() for i in range(table.num_rows)]) - # Convert created_timestamps if they exist - if feature_view.batch_source.created_timestamp_column: - created_timestamps = [ - _coerce_datetime(val) - for val in pd.to_datetime( - table.column( - feature_view.batch_source.created_timestamp_column - ).to_numpy(zero_copy_only=False) - ) - ] - else: - created_timestamps = [None] * table.num_rows + for val in timestamp_values: + event_timestamps.append(_coerce_datetime(val)) + + # setting them equivalent + created_timestamps = event_timestamps return list(zip(entity_keys, features, event_timestamps, created_timestamps)) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 366dff7523..95b8933476 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -68,13 +68,6 @@ def setUp(self): timestamp_field="event_timestamp", created_timestamp_column="created", ) - input_request_source = RequestSource( - name="counter_source", - schema=[ - Field(name="counter", dtype=Int64), - Field(name="input_datetime", dtype=UnixTimestamp), - ], - ) driver_stats_fv = FeatureView( name="driver_hourly_stats", @@ -273,6 +266,7 @@ def test_python_docs_demo(self): == online_python_response["conv_rate_plus_val2_python"][0] ) + class TestOnDemandPythonTransformationAllDataTypes(unittest.TestCase): def setUp(self): with tempfile.TemporaryDirectory() as data_dir: @@ -511,7 +505,6 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]: store.apply([request_source, python_view]) - class TestOnDemandTransformationsWithWrites(unittest.TestCase): def setUp(self): with tempfile.TemporaryDirectory() as data_dir: @@ -616,10 +609,16 @@ def python_stored_writes_feature_view( assert len(self.store.list_feature_views()) == 1 assert len(self.store.list_on_demand_feature_views()) == 1 assert len(self.store.list_stream_feature_views()) == 0 - assert driver_stats_fv.entity_columns == \ - self.store.get_feature_view('driver_hourly_stats').entity_columns - assert self.store.get_on_demand_feature_view('python_stored_writes_feature_view').entity_columns == \ - self.store.get_feature_view('driver_hourly_stats').entity_columns + assert ( + driver_stats_fv.entity_columns + == self.store.get_feature_view("driver_hourly_stats").entity_columns + ) + assert ( + self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view" + ).entity_columns + == self.store.get_feature_view("driver_hourly_stats").entity_columns + ) def test_stored_writes(self): current_datetime = _utc_now() @@ -666,4 +665,3 @@ def test_stored_writes(self): "input_datetime", ] ) - From 1c37e544bf19a66706a4150f2954ada4f93564fc Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 7 Sep 2024 06:13:23 -0400 Subject: [PATCH 22/45] update the _make_inference to include odfvs with writes in the update map Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 854ee81b73..bcd65daac0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -623,8 +623,13 @@ def _make_inferences( for odfv in odfvs_to_update: odfv.infer_features() + odfvs_to_write = [ + odfv for odfv in odfvs_to_update if odfv.write_to_online_store + ] + # Update to include ODFVs with write to online store fvs_to_update_map = { - view.name: view for view in [*views_to_update, *sfvs_to_update] + view.name: view + for view in [*views_to_update, *sfvs_to_update, *odfvs_to_write] } for feature_service in feature_services_to_update: feature_service.infer_features(fvs_to_update=fvs_to_update_map) @@ -1478,7 +1483,8 @@ def write_to_online_store( provider = self._get_provider() if isinstance(feature_view, OnDemandFeatureView): - # projection_mapping = {} + # TODO: add projection mapping + projection_mapping = {} # source_projections = feature_view.source_feature_view_projections # for projection in source_projections: # try: @@ -1499,7 +1505,7 @@ def write_to_online_store( # "Request sources not found for on demand feature view. Please check the source feature view configuration." # ) # projection_mapping.update(self.get_feature_view(projection)) - provider.ingest_df(feature_view, df, {}) + provider.ingest_df(feature_view, df, projection_mapping) else: provider.ingest_df(feature_view, df) From 2f9546fad30ede5b55c4699be864465ee49e8e42 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 7 Sep 2024 23:36:11 -0400 Subject: [PATCH 23/45] have the table being written now...the create table happens in the SqliteOnlineStore.update() method Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bcd65daac0..c08fd28249 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -857,6 +857,11 @@ def apply( ] sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] + odfvs_with_writes_to_update = [ + ob + for ob in objects + if isinstance(ob, OnDemandFeatureView) and ob.write_to_online_store + ] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] data_sources_set_to_update = { ob for ob in objects if isinstance(ob, DataSource) @@ -878,10 +883,20 @@ def apply( for batch_source in batch_sources_to_add: data_sources_set_to_update.add(batch_source) - for fv in itertools.chain(views_to_update, sfvs_to_update): - data_sources_set_to_update.add(fv.batch_source) - if fv.stream_source: + for fv in itertools.chain( + views_to_update, sfvs_to_update, odfvs_with_writes_to_update + ): + if isinstance(fv, FeatureView): + data_sources_set_to_update.add(fv.batch_source) + if isinstance(fv, StreamFeatureView): data_sources_set_to_update.add(fv.stream_source) + if isinstance(fv, OnDemandFeatureView): + for source_fvp in fv.source_feature_view_projections: + data_sources_set_to_update.add( + fv.source_feature_view_projections[source_fvp].batch_source + ) + else: + pass for odfv in odfvs_to_update: for v in odfv.source_request_sources.values(): @@ -999,7 +1014,9 @@ def apply( tables_to_delete: List[FeatureView] = ( views_to_delete + sfvs_to_delete if not partial else [] # type: ignore ) - tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore + tables_to_keep: List[FeatureView] = ( + views_to_update + sfvs_to_update + odfvs_with_writes_to_update + ) # type: ignore self._get_provider().update_infra( project=self.project, From c46e1575629099e8ef9578e8a6ad241d94a63f96 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 9 Sep 2024 05:11:30 -0400 Subject: [PATCH 24/45] checking in progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 20 ------------------- .../feast/infra/online_stores/sqlite.py | 5 ++++- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c08fd28249..e65ab0b605 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1502,26 +1502,6 @@ def write_to_online_store( if isinstance(feature_view, OnDemandFeatureView): # TODO: add projection mapping projection_mapping = {} - # source_projections = feature_view.source_feature_view_projections - # for projection in source_projections: - # try: - # field_mapping = self.get_feature_view(projection).batch_source.field_mapping - # except: - # print(f'feature view {feature_view} broke') - # raise ValueError( - # "Field mapping not found for source feature view. Please check the source feature view configuration." - # ) - # projection_mapping.update(self.get_feature_view(field_mapping)) - # - # request_source_projections = feature_view.source_request_projections - # for projection in request_source_projections: - # try: - # field_mapping = projection.field_mapping - # except: - # raise ValueError( - # "Request sources not found for on demand feature view. Please check the source feature view configuration." - # ) - # projection_mapping.update(self.get_feature_view(projection)) provider.ingest_df(feature_view, df, projection_mapping) else: provider.ingest_df(feature_view, df) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 1b79b1a94b..09f61f4f55 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -170,7 +170,8 @@ def online_write_batch( ), ) - conn.execute( + try: + conn.execute( f"""INSERT OR IGNORE INTO {table_name} (entity_key, feature_name, value, event_ts, created_ts) VALUES (?, ?, ?, ?, ?)""", @@ -182,6 +183,8 @@ def online_write_batch( created_ts, ), ) + except Exception as e: + pass if progress: progress(1) From 580b77f2570ca5e68d24a555d8ae67071616e619 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 10 Sep 2024 05:50:55 -0400 Subject: [PATCH 25/45] adding logs Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/online_stores/sqlite.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 09f61f4f55..ceb773ce1e 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -153,6 +153,8 @@ def online_write_batch( ) else: + if table_name == 'test_on_demand_python_transformation_python_stored_writes_feature_view': + print(f"writing online batch for {table_name} - {feature_name} = {val}") conn.execute( f""" UPDATE {table_name} @@ -184,7 +186,8 @@ def online_write_batch( ), ) except Exception as e: - pass + print(f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}") + print(f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}') if progress: progress(1) @@ -251,6 +254,7 @@ def update( project = config.project for table in tables_to_keep: + print(f'updating {_table_id(project, table)}') conn.execute( f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" ) From 815a35203e82b6cab7d5eed2fa82ca275b7fe6e9 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 10 Sep 2024 05:51:12 -0400 Subject: [PATCH 26/45] updating permissions Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/sqlite.py | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index ceb773ce1e..322b7242b0 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -153,8 +153,13 @@ def online_write_batch( ) else: - if table_name == 'test_on_demand_python_transformation_python_stored_writes_feature_view': - print(f"writing online batch for {table_name} - {feature_name} = {val}") + if ( + table_name + == "test_on_demand_python_transformation_python_stored_writes_feature_view" + ): + print( + f"writing online batch for {table_name} - {feature_name} = {val}" + ) conn.execute( f""" UPDATE {table_name} @@ -174,20 +179,24 @@ def online_write_batch( try: conn.execute( - f"""INSERT OR IGNORE INTO {table_name} + f"""INSERT OR IGNORE INTO {table_name} (entity_key, feature_name, value, event_ts, created_ts) VALUES (?, ?, ?, ?, ?)""", - ( - entity_key_bin, - feature_name, - val.SerializeToString(), - timestamp, - created_ts, - ), - ) + ( + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + ), + ) except Exception as e: - print(f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}") - print(f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}') + print( + f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" + ) + print( + f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}' + ) if progress: progress(1) @@ -254,7 +263,7 @@ def update( project = config.project for table in tables_to_keep: - print(f'updating {_table_id(project, table)}') + print(f"updating {_table_id(project, table)}") conn.execute( f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" ) From 2eda92ca016e21ec24ca58e21c80c0402ca20ab1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 10 Sep 2024 06:12:35 -0400 Subject: [PATCH 27/45] going to error out on purpose Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/online_stores/sqlite.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 322b7242b0..e0c0a43853 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -177,7 +177,8 @@ def online_write_batch( ), ) - try: + # try: + if True: conn.execute( f"""INSERT OR IGNORE INTO {table_name} (entity_key, feature_name, value, event_ts, created_ts) @@ -190,13 +191,17 @@ def online_write_batch( created_ts, ), ) - except Exception as e: + else: + # except Exception as e: print( f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" ) print( f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}' ) + r = conn.execute(""" + SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view'; + """) if progress: progress(1) From e326e9bf4647efae6794763700e44ef9a4480514 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 08:34:44 -0400 Subject: [PATCH 28/45] adding unit test and merging changes Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index a55eb4bbc5..49c871ccd0 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -120,7 +120,7 @@ def __init__( # noqa: C901 owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to - the online store for faster retrieval. + the online store for faster retrieval. """ super().__init__( name=name, @@ -162,7 +162,6 @@ def __init__( # noqa: C901 self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): self.source_feature_view_projections[odfv_source.name] = odfv_source - else: self.source_feature_view_projections[odfv_source.name] = ( odfv_source.projection From a54b5f813dec638c7ff29e9d3013c7501ce4418b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 14:41:08 -0400 Subject: [PATCH 29/45] almost got everything working and type validation behaving Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/base_feature_view.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index d7dc2237bd..a4eb3d6aec 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -161,6 +161,7 @@ def __eq__(self, other): or self.description != other.description or self.tags != other.tags or self.owner != other.owner + or self.source != other.source ): # This is meant to ignore the File Source change to Push Source if isinstance(type(self.source), type(other.source)): From 029c9cfcb5d4d0adc63e3393f9f1307df9344809 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 18 Sep 2024 16:16:18 -0400 Subject: [PATCH 30/45] cleaned up and have tests behaving Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/base_feature_view.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index a4eb3d6aec..d7dc2237bd 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -161,7 +161,6 @@ def __eq__(self, other): or self.description != other.description or self.tags != other.tags or self.owner != other.owner - or self.source != other.source ): # This is meant to ignore the File Source change to Push Source if isinstance(type(self.source), type(other.source)): From e2c6b35f062936d8d0854514e491128958fcc013 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 20 Sep 2024 20:35:15 -0400 Subject: [PATCH 31/45] adding print Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 600e1b20d8..d8662f179e 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -307,6 +307,7 @@ def _materialize_one( """ with GetSnowflakeConnection(self.repo_config.offline_store) as conn: + print(query) entities_to_write = conn.cursor().execute(query).fetchall()[0][0] if feature_view.batch_source.field_mapping is not None: From adf147fc72d40038c0e7abc191d5b000f91fab5f Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 21 Sep 2024 13:55:32 -0400 Subject: [PATCH 32/45] removing print Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/materialization/snowflake_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index d8662f179e..600e1b20d8 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -307,7 +307,6 @@ def _materialize_one( """ with GetSnowflakeConnection(self.repo_config.offline_store) as conn: - print(query) entities_to_write = conn.cursor().execute(query).fetchall()[0][0] if feature_view.batch_source.field_mapping is not None: From d34db1d132a3aedcc78aee90a4c86e488db6c66b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 22 Sep 2024 22:47:44 -0400 Subject: [PATCH 33/45] checking in progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/inference.py | 21 +++++++++---------- .../feast/infra/online_stores/sqlite.py | 8 +++---- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index b5efdd338a..e625500e54 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -283,18 +283,17 @@ def _infer_on_demand_features_and_entities( batch_source = getattr(source_feature_view, "batch_source") batch_field_mapping = getattr(batch_source or None, "field_mapping") - if batch_field_mapping: - for ( - original_col, - mapped_col, - ) in batch_field_mapping.items(): - if mapped_col in columns_to_exclude: - columns_to_exclude.remove(mapped_col) - columns_to_exclude.add(original_col) + for ( + original_col, + mapped_col, + ) in batch_field_mapping.items(): + if mapped_col in columns_to_exclude: + columns_to_exclude.remove(mapped_col) + columns_to_exclude.add(original_col) - table_column_names_and_types = ( - batch_source.get_table_column_names_and_types(config) - ) + table_column_names_and_types = batch_source.get_table_column_names_and_types( + config + ) for col_name, col_datatype in table_column_names_and_types: if col_name in columns_to_exclude: continue diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index e0c0a43853..7afa072657 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -192,16 +192,16 @@ def online_write_batch( ), ) else: - # except Exception as e: - print( - f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" - ) + # print( + # f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" + # ) print( f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}' ) r = conn.execute(""" SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view'; """) + print(f"table exists: {r.fetchall()}") if progress: progress(1) From 731bacba7c954d021f5891d6fced6a546588313c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 24 Sep 2024 22:10:03 -0400 Subject: [PATCH 34/45] updating test Signed-off-by: Francisco Javier Arceo --- .../test_local_feature_store.py | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index cc48295b20..e5a0c8ca07 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -11,7 +11,7 @@ from feast.entity import Entity from feast.feast_object import ALL_RESOURCE_TYPES from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY_ID, FeatureView +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig @@ -347,7 +347,7 @@ def test_apply_entities_and_feature_views(test_feature_store): "test_feature_store", [lazy_fixture("feature_store_with_local_registry")], ) -def test_apply_dummuy_entity_and_feature_view_columns(test_feature_store): +def test_apply_dummy_entity_and_feature_view_columns(test_feature_store): assert isinstance(test_feature_store, FeatureStore) # Create Feature Views batch_source = FileSource( @@ -359,14 +359,25 @@ def test_apply_dummuy_entity_and_feature_view_columns(test_feature_store): e1 = Entity(name="fs1_my_entity_1", description="something") - fv = FeatureView( - name="my_feature_view_no_entity", + fv_with_entity = FeatureView( + name="my_feature_view_with_entity", schema=[ Field(name="fs1_my_feature_1", dtype=Int64), Field(name="fs1_my_feature_2", dtype=String), Field(name="fs1_my_feature_3", dtype=Array(String)), Field(name="fs1_my_feature_4", dtype=Array(Bytes)), - Field(name="fs1_my_entity_2", dtype=Int64), + Field(name="fs1_my_entity_1", dtype=Int64), + ], + entities=[e1], + tags={"team": "matchmaking"}, + source=batch_source, + ttl=timedelta(minutes=5), + ) + + fv_no_entity = FeatureView( + name="my_feature_view_no_entity", + schema=[ + Field(name="fs1_my_feature_1", dtype=Int64), ], entities=[], tags={"team": "matchmaking"}, @@ -375,16 +386,22 @@ def test_apply_dummuy_entity_and_feature_view_columns(test_feature_store): ) # Check that the entity_columns are empty before applying - assert fv.entity_columns == [] + assert fv_no_entity.entities == [DUMMY_ENTITY_NAME] + assert fv_no_entity.entity_columns == [] + assert fv_with_entity.entity_columns[0].name == e1.name # Register Feature View - test_feature_store.apply([fv, e1]) - fv_actual = test_feature_store.get_feature_view("my_feature_view_no_entity") + test_feature_store.apply([e1, fv_no_entity, fv_with_entity]) + fv_from_online_store = test_feature_store.get_feature_view("my_feature_view_no_entity") # Note that after the apply() the feature_view serializes the Dummy Entity ID - assert fv.entity_columns[0].name == DUMMY_ENTITY_ID - assert fv_actual.entity_columns[0].name == DUMMY_ENTITY_ID + assert fv_no_entity.entity_columns[0].name == DUMMY_ENTITY_ID + assert fv_from_online_store.entity_columns[0].name == DUMMY_ENTITY_ID + assert fv_from_online_store.entities == [] + assert fv_no_entity.entities == [DUMMY_ENTITY_NAME] + assert fv_with_entity.entity_columns[0].name == e1.name + assert fv_with_entity.entities == [e1.name] test_feature_store.teardown() From 8479296fe0cc4eb966756a2cc3608fcd10f3e044 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 24 Sep 2024 22:21:38 -0400 Subject: [PATCH 35/45] adding test Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/passthrough_provider.py | 2 ++ .../tests/unit/test_on_demand_python_transformation.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index b0c67bcf15..bd3dac8a78 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -286,6 +286,8 @@ def ingest_df( for entity in feature_view.entity_columns } rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + print(rows_to_write) + # List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]] self.online_write_batch( self.repo_config, feature_view, rows_to_write, progress=None diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 95b8933476..d8532eade5 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -561,6 +561,8 @@ def setUp(self): online=True, source=driver_stats_source, ) + assert driver_stats_fv.entities == [driver.name] + assert driver_stats_fv.entity_columns == [driver.name] @on_demand_feature_view( entities=[driver], @@ -602,6 +604,10 @@ def python_stored_writes_feature_view( python_stored_writes_feature_view, ] ) + applied_fv = self.store.get_feature_view("driver_hourly_stats") + assert applied_fv.entities[0] == driver.name + assert applied_fv.entity_columns[0].name == driver.name + self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df ) From 3068c3b760cedeef1475b38b08e2467a25cfbbee Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 24 Sep 2024 23:26:01 -0400 Subject: [PATCH 36/45] linted and updated Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/inference.py | 4 +++- sdk/python/feast/on_demand_feature_view.py | 2 +- .../unit/local_feast_tests/test_local_feature_store.py | 4 +++- .../tests/unit/test_on_demand_python_transformation.py | 8 ++++---- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index e625500e54..96c410c6f2 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,9 +13,9 @@ from feast.infra.offline_stores.file_source import FileSource from feast.infra.offline_stores.redshift_source import RedshiftSource from feast.infra.offline_stores.snowflake_source import SnowflakeSource +from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import RepoConfig from feast.stream_feature_view import StreamFeatureView -from feast.on_demand_feature_view import OnDemandFeatureView from feast.types import String from feast.value_type import ValueType @@ -257,6 +257,8 @@ def _infer_features_and_entities( if field.name not in [feature.name for feature in fv.features]: fv.features.append(field) + fv.entity_columns = entity_columns + def _infer_on_demand_features_and_entities( fv: OnDemandFeatureView, diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 49c871ccd0..16bdaa676f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -3,7 +3,7 @@ import inspect import warnings from types import FunctionType -from typing import Any, Optional, Union, get_type_hints, List +from typing import Any, List, Optional, Union, get_type_hints import dill import pandas as pd diff --git a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py index e5a0c8ca07..0d48a4aa24 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py +++ b/sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py @@ -392,7 +392,9 @@ def test_apply_dummy_entity_and_feature_view_columns(test_feature_store): # Register Feature View test_feature_store.apply([e1, fv_no_entity, fv_with_entity]) - fv_from_online_store = test_feature_store.get_feature_view("my_feature_view_no_entity") + fv_from_online_store = test_feature_store.get_feature_view( + "my_feature_view_no_entity" + ) # Note that after the apply() the feature_view serializes the Dummy Entity ID assert fv_no_entity.entity_columns[0].name == DUMMY_ENTITY_ID diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index d8532eade5..cf92b6dbcb 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -380,15 +380,15 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]: self.store.apply( [driver, driver_stats_source, driver_stats_fv, python_view] ) - self.store.write_to_online_store( - feature_view_name="driver_hourly_stats", df=driver_df - ) - fv_applied = self.store.get_feature_view("driver_hourly_stats") assert fv_applied.entities == [driver.name] # Note here that after apply() is called, the entity_columns are populated with the join_key assert fv_applied.entity_columns[0].name == driver.join_key + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_df + ) + def test_python_transformation_returning_all_data_types(self): entity_rows = [ { From 238dc290531e182f43607129a8bd1151d216e0dd Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 24 Sep 2024 23:30:00 -0400 Subject: [PATCH 37/45] removed print Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/infra/passthrough_provider.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index bd3dac8a78..b0c67bcf15 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -286,8 +286,6 @@ def ingest_df( for entity in feature_view.entity_columns } rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - print(rows_to_write) - # List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]] self.online_write_batch( self.repo_config, feature_view, rows_to_write, progress=None From 06315b9420a5373f75cb54065e7a1a9080dfc19c Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 24 Sep 2024 23:32:55 -0400 Subject: [PATCH 38/45] updated tests to test actual behavior Signed-off-by: Francisco Javier Arceo --- .../tests/unit/test_on_demand_python_transformation.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index cf92b6dbcb..0e7e254587 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -562,7 +562,7 @@ def setUp(self): source=driver_stats_source, ) assert driver_stats_fv.entities == [driver.name] - assert driver_stats_fv.entity_columns == [driver.name] + assert driver_stats_fv.entity_columns == [] @on_demand_feature_view( entities=[driver], @@ -604,9 +604,10 @@ def python_stored_writes_feature_view( python_stored_writes_feature_view, ] ) - applied_fv = self.store.get_feature_view("driver_hourly_stats") - assert applied_fv.entities[0] == driver.name - assert applied_fv.entity_columns[0].name == driver.name + fv_applied = self.store.get_feature_view("driver_hourly_stats") + assert fv_applied.entities == [driver.name] + # Note here that after apply() is called, the entity_columns are populated with the join_key + assert fv_applied.entity_columns[0].name == driver.join_key self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df From 05efa37236c2779820cde896dcf637150121ab10 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 27 Sep 2024 23:34:13 -0400 Subject: [PATCH 39/45] checking in progress Signed-off-by: Francisco Javier Arceo --- .../feast/infra/online_stores/sqlite.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 7afa072657..6b5607ae9f 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -21,6 +21,7 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union +import pandas as pd from google.protobuf.internal.containers import RepeatedScalarFieldContainer from pydantic import StrictStr @@ -153,13 +154,6 @@ def online_write_batch( ) else: - if ( - table_name - == "test_on_demand_python_transformation_python_stored_writes_feature_view" - ): - print( - f"writing online batch for {table_name} - {feature_name} = {val}" - ) conn.execute( f""" UPDATE {table_name} @@ -177,8 +171,7 @@ def online_write_batch( ), ) - # try: - if True: + try: conn.execute( f"""INSERT OR IGNORE INTO {table_name} (entity_key, feature_name, value, event_ts, created_ts) @@ -191,13 +184,23 @@ def online_write_batch( created_ts, ), ) - else: + except Exception as e: # print( # f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" # ) print( f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}' ) + def get_table_data(conn): + x = conn.execute(f"select * from sqlite_master").fetchall() + y = conn.execute(f"select * from sqlite_master") + names = list(map(lambda x: x[0], y.description)) + return pd.DataFrame(x, columns=names) + + df = get_table_data(conn) + tmp = [ conn.execute(f"select count(*) from {table_name}").fetchall() for table_name in df['name'].values if table_name not in ['sqlite_autoindex_test_on_demand_python_transformation_driver_hourly_stats_1', 'test_on_demand_python_transformation_driver_hourly_stats_ek', 'sqlite_autoindex_test_on_demand_python_transformation_python_stored_writes_feature_view_1', 'test_on_demand_python_transformation_python_stored_writes_feature_view_ek']] + print(tmp) + r = conn.execute(""" SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view'; """) From 8c559c87c0eead6f344ae0a55c14c86b09fa7796 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 28 Sep 2024 05:37:06 -0400 Subject: [PATCH 40/45] changing typo Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e65ab0b605..308d55b140 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -613,7 +613,7 @@ def _make_inferences( update_feature_views_with_inferred_features_and_entities( odfvs_to_update, entities + entities_to_update, self.config ) - # TODO(kevjumba): Update schema inferrence + # TODO(kevjumba): Update schema inference for sfv in sfvs_to_update: if not sfv.schema: raise ValueError( @@ -909,7 +909,7 @@ def apply( # Validate all feature views and make inferences. self._validate_all_feature_views( - views_to_update, odfvs_to_update, sfvs_to_update + views_to_update, odfvs_to_update, sfvs_to_update, ) self._make_inferences( data_sources_to_update, From 8b8aa16a4ab8e64955c02485e1057eba90cea5c1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 28 Sep 2024 13:46:06 -0400 Subject: [PATCH 41/45] updating test Signed-off-by: Francisco Javier Arceo --- .../test_on_demand_python_transformation.py | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index 0e7e254587..c0312f61f2 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -629,16 +629,31 @@ def python_stored_writes_feature_view( def test_stored_writes(self): current_datetime = _utc_now() - entity_rows_to_write = [ + fv_entity_rows_to_write = [ + { + "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, + "avg_daily_trips": 2, + "event_timestamp": current_datetime, + "created": current_datetime, + } + ] + odfv_entity_rows_to_write = [ { "driver_id": 1001, "counter": 0, "input_datetime": current_datetime, } ] + fv_entity_rows_to_read = [ + { + "driver_id": 1001, + } + ] # Note that here we shouldn't have to pass the request source features for reading # because they should have already been written to the online store - entity_rows_to_read = [ + odfv_entity_rows_to_read = [ { "driver_id": 1001, "conv_rate": 0.25, @@ -647,14 +662,29 @@ def test_stored_writes(self): "input_datetime": current_datetime, } ] + print("storing fv features") + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", + df=fv_entity_rows_to_write, + ) + print("reading fv features") + online_python_response = self.store.get_online_features( + entity_rows=fv_entity_rows_to_read, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + ).to_dict() + print(online_python_response) print("storing odfv features") self.store.write_to_online_store( feature_view_name="python_stored_writes_feature_view", - df=entity_rows_to_write, + df=odfv_entity_rows_to_write, ) print("reading odfv features") online_python_response = self.store.get_online_features( - entity_rows=entity_rows_to_read, + entity_rows=odfv_entity_rows_to_read, features=[ "python_stored_writes_feature_view:conv_rate_plus_acc", "python_stored_writes_feature_view:current_datetime", From 4562a23ba25d5bcf9f5f8d2d8df4d29da157b4e0 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 28 Sep 2024 14:38:23 -0400 Subject: [PATCH 42/45] testing changes Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 14 +++-- sdk/python/feast/inference.py | 59 ++++++++++--------- .../feast/infra/online_stores/sqlite.py | 44 ++++---------- sdk/python/feast/on_demand_feature_view.py | 1 + .../test_on_demand_python_transformation.py | 10 ++++ 5 files changed, 61 insertions(+), 67 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 308d55b140..063dda4f53 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -889,12 +889,14 @@ def apply( if isinstance(fv, FeatureView): data_sources_set_to_update.add(fv.batch_source) if isinstance(fv, StreamFeatureView): - data_sources_set_to_update.add(fv.stream_source) + if fv.stream_source: + data_sources_set_to_update.add(fv.stream_source) if isinstance(fv, OnDemandFeatureView): for source_fvp in fv.source_feature_view_projections: - data_sources_set_to_update.add( - fv.source_feature_view_projections[source_fvp].batch_source - ) + if fv.source_feature_view_projections[source_fvp].batch_source: + data_sources_set_to_update.add( + fv.source_feature_view_projections[source_fvp].batch_source + ) else: pass @@ -909,7 +911,9 @@ def apply( # Validate all feature views and make inferences. self._validate_all_feature_views( - views_to_update, odfvs_to_update, sfvs_to_update, + views_to_update, + odfvs_to_update, + sfvs_to_update, ) self._make_inferences( data_sources_to_update, diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 96c410c6f2..5d5b6626c0 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -296,33 +296,13 @@ def _infer_on_demand_features_and_entities( table_column_names_and_types = batch_source.get_table_column_names_and_types( config ) - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: - field = Field( - name=col_name, - dtype=from_value_type( - batch_source.source_datatype_to_feast_value_type()(col_datatype) - ), - ) - if field.name not in [ - entity_column.name - for entity_column in entity_columns - if hasattr(entity_column, "name") - ]: - entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column - if run_inference_for_features: - feature_name = ( - batch_field_mapping[col_name] - if col_name in batch_field_mapping - else col_name - ) + if batch_field_mapping: + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: field = Field( - name=feature_name, + name=col_name, dtype=from_value_type( batch_source.source_datatype_to_feast_value_type()( col_datatype @@ -330,7 +310,30 @@ def _infer_on_demand_features_and_entities( ), ) if field.name not in [ - feature.name for feature in source_feature_view.features + entity_column.name + for entity_column in entity_columns + if hasattr(entity_column, "name") ]: - source_feature_view.features.append(field) + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + batch_field_mapping[col_name] + if col_name in batch_field_mapping + else col_name + ) + field = Field( + name=feature_name, + dtype=from_value_type( + batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + feature.name for feature in source_feature_view.features + ]: + source_feature_view.features.append(field) fv.entity_columns = entity_columns diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 6b5607ae9f..1b79b1a94b 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -21,7 +21,6 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union -import pandas as pd from google.protobuf.internal.containers import RepeatedScalarFieldContainer from pydantic import StrictStr @@ -171,40 +170,18 @@ def online_write_batch( ), ) - try: - conn.execute( - f"""INSERT OR IGNORE INTO {table_name} + conn.execute( + f"""INSERT OR IGNORE INTO {table_name} (entity_key, feature_name, value, event_ts, created_ts) VALUES (?, ?, ?, ?, ?)""", - ( - entity_key_bin, - feature_name, - val.SerializeToString(), - timestamp, - created_ts, - ), - ) - except Exception as e: - # print( - # f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}" - # ) - print( - f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}' - ) - def get_table_data(conn): - x = conn.execute(f"select * from sqlite_master").fetchall() - y = conn.execute(f"select * from sqlite_master") - names = list(map(lambda x: x[0], y.description)) - return pd.DataFrame(x, columns=names) - - df = get_table_data(conn) - tmp = [ conn.execute(f"select count(*) from {table_name}").fetchall() for table_name in df['name'].values if table_name not in ['sqlite_autoindex_test_on_demand_python_transformation_driver_hourly_stats_1', 'test_on_demand_python_transformation_driver_hourly_stats_ek', 'sqlite_autoindex_test_on_demand_python_transformation_python_stored_writes_feature_view_1', 'test_on_demand_python_transformation_python_stored_writes_feature_view_ek']] - print(tmp) - - r = conn.execute(""" - SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view'; - """) - print(f"table exists: {r.fetchall()}") + ( + entity_key_bin, + feature_name, + val.SerializeToString(), + timestamp, + created_ts, + ), + ) if progress: progress(1) @@ -271,7 +248,6 @@ def update( project = config.project for table in tables_to_keep: - print(f"updating {_table_id(project, table)}") conn.execute( f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 16bdaa676f..8f190d2c0a 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -328,6 +328,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: owner=self.owner, write_to_online_store=self.write_to_online_store, ) + print("*" * 40, "\n", spec) return OnDemandFeatureViewProto(spec=spec, meta=meta) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index c0312f61f2..c998975d87 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -596,6 +596,9 @@ def python_stored_writes_feature_view( print("running odfv transform") return output + assert python_stored_writes_feature_view.entities == [driver.name] + assert python_stored_writes_feature_view.entity_columns == [] + self.store.apply( [ driver, @@ -605,9 +608,16 @@ def python_stored_writes_feature_view( ] ) fv_applied = self.store.get_feature_view("driver_hourly_stats") + odfv_applied = self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view" + ) + assert fv_applied.entities == [driver.name] + assert odfv_applied.entites == [driver.name] + # Note here that after apply() is called, the entity_columns are populated with the join_key assert fv_applied.entity_columns[0].name == driver.join_key + assert odfv_applied.entity_columns[0].name == driver.join_key self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=driver_df From 14f837d7d6362f466dc12d6284a5a39e3d2857b7 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 28 Sep 2024 23:46:30 -0400 Subject: [PATCH 43/45] checking to see if thing still working Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/inference.py | 66 +++---- sdk/python/feast/on_demand_feature_view.py | 1 - .../test_on_demand_python_transformation.py | 171 +++++++++--------- 3 files changed, 117 insertions(+), 121 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 5d5b6626c0..4c879dada4 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -219,10 +219,13 @@ def _infer_features_and_entities( columns_to_exclude.remove(mapped_col) columns_to_exclude.add(original_col) + # this is what gets the right stuff table_column_names_and_types = fv.batch_source.get_table_column_names_and_types( config ) + print('\n', "*" * 50, '\ncolumn names and types\n', "*" * 50) + print(fv.name, list(zip(table_column_names_and_types))) for col_name, col_datatype in table_column_names_and_types: if col_name in columns_to_exclude: continue @@ -296,13 +299,37 @@ def _infer_on_demand_features_and_entities( table_column_names_and_types = batch_source.get_table_column_names_and_types( config ) - if batch_field_mapping: - for col_name, col_datatype in table_column_names_and_types: - if col_name in columns_to_exclude: - continue - elif col_name in join_keys: + batch_field_mapping = getattr(batch_source, "field_mapping", {}) + + for col_name, col_datatype in table_column_names_and_types: + if col_name in columns_to_exclude: + continue + elif col_name in join_keys: + field = Field( + name=col_name, + dtype=from_value_type( + batch_source.source_datatype_to_feast_value_type()( + col_datatype + ) + ), + ) + if field.name not in [ + entity_column.name + for entity_column in entity_columns + if hasattr(entity_column, "name") + ]: + entity_columns.append(field) + elif not re.match( + "^__|__$", col_name + ): # double underscores often signal an internal-use column + if run_inference_for_features: + feature_name = ( + batch_field_mapping[col_name] + if col_name in batch_field_mapping + else col_name + ) field = Field( - name=col_name, + name=feature_name, dtype=from_value_type( batch_source.source_datatype_to_feast_value_type()( col_datatype @@ -310,30 +337,7 @@ def _infer_on_demand_features_and_entities( ), ) if field.name not in [ - entity_column.name - for entity_column in entity_columns - if hasattr(entity_column, "name") + feature.name for feature in source_feature_view.features ]: - entity_columns.append(field) - elif not re.match( - "^__|__$", col_name - ): # double underscores often signal an internal-use column - if run_inference_for_features: - feature_name = ( - batch_field_mapping[col_name] - if col_name in batch_field_mapping - else col_name - ) - field = Field( - name=feature_name, - dtype=from_value_type( - batch_source.source_datatype_to_feast_value_type()( - col_datatype - ) - ), - ) - if field.name not in [ - feature.name for feature in source_feature_view.features - ]: - source_feature_view.features.append(field) + source_feature_view.features.append(field) fv.entity_columns = entity_columns diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 8f190d2c0a..16bdaa676f 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -328,7 +328,6 @@ def to_proto(self) -> OnDemandFeatureViewProto: owner=self.owner, write_to_online_store=self.write_to_online_store, ) - print("*" * 40, "\n", spec) return OnDemandFeatureViewProto(spec=spec, meta=meta) diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index c998975d87..635b23b80c 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -493,8 +493,7 @@ def test_invalid_python_transformation_raises_type_error_on_apply(): schema=[Field(name="driver_name_lower", dtype=String)], mode="python", ) - def python_view(inputs: dict[str, Any]) -> dict[str, Any]: - return {"driver_name_lower": []} + def python_view(inputs: dict[str, Any]) -> dict[str, Any]: return {"driver_name_lower": []} with pytest.raises( TypeError, @@ -506,7 +505,7 @@ def python_view(inputs: dict[str, Any]) -> dict[str, Any]: class TestOnDemandTransformationsWithWrites(unittest.TestCase): - def setUp(self): + def test_stored_writes(self): with tempfile.TemporaryDirectory() as data_dir: self.store = FeatureStore( config=RepoConfig( @@ -593,7 +592,6 @@ def python_stored_writes_feature_view( "counter": [c + 1 for c in inputs["counter"]], "input_datetime": [d for d in inputs["input_datetime"]], } - print("running odfv transform") return output assert python_stored_writes_feature_view.entities == [driver.name] @@ -613,15 +611,13 @@ def python_stored_writes_feature_view( ) assert fv_applied.entities == [driver.name] - assert odfv_applied.entites == [driver.name] + assert odfv_applied.entities == [driver.name] # Note here that after apply() is called, the entity_columns are populated with the join_key - assert fv_applied.entity_columns[0].name == driver.join_key + # assert fv_applied.entity_columns[0].name == driver.join_key + assert fv_applied.entity_columns == [] assert odfv_applied.entity_columns[0].name == driver.join_key - self.store.write_to_online_store( - feature_view_name="driver_hourly_stats", df=driver_df - ) assert len(self.store.list_all_feature_views()) == 2 assert len(self.store.list_feature_views()) == 1 assert len(self.store.list_on_demand_feature_views()) == 1 @@ -631,84 +627,81 @@ def python_stored_writes_feature_view( == self.store.get_feature_view("driver_hourly_stats").entity_columns ) assert ( - self.store.get_on_demand_feature_view( - "python_stored_writes_feature_view" - ).entity_columns - == self.store.get_feature_view("driver_hourly_stats").entity_columns - ) - - def test_stored_writes(self): - current_datetime = _utc_now() - fv_entity_rows_to_write = [ - { - "driver_id": 1001, - "conv_rate": 0.25, - "acc_rate": 0.25, - "avg_daily_trips": 2, - "event_timestamp": current_datetime, - "created": current_datetime, - } - ] - odfv_entity_rows_to_write = [ - { - "driver_id": 1001, - "counter": 0, - "input_datetime": current_datetime, - } - ] - fv_entity_rows_to_read = [ - { - "driver_id": 1001, - } - ] - # Note that here we shouldn't have to pass the request source features for reading - # because they should have already been written to the online store - odfv_entity_rows_to_read = [ - { - "driver_id": 1001, - "conv_rate": 0.25, - "acc_rate": 0.25, - "counter": 0, - "input_datetime": current_datetime, - } - ] - print("storing fv features") - self.store.write_to_online_store( - feature_view_name="driver_hourly_stats", - df=fv_entity_rows_to_write, - ) - print("reading fv features") - online_python_response = self.store.get_online_features( - entity_rows=fv_entity_rows_to_read, - features=[ - "driver_hourly_stats:conv_rate", - "driver_hourly_stats:acc_rate", - "driver_hourly_stats:avg_daily_trips", - ], - ).to_dict() - print(online_python_response) - print("storing odfv features") - self.store.write_to_online_store( - feature_view_name="python_stored_writes_feature_view", - df=odfv_entity_rows_to_write, - ) - print("reading odfv features") - online_python_response = self.store.get_online_features( - entity_rows=odfv_entity_rows_to_read, - features=[ - "python_stored_writes_feature_view:conv_rate_plus_acc", - "python_stored_writes_feature_view:current_datetime", - "python_stored_writes_feature_view:counter", - "python_stored_writes_feature_view:input_datetime", - ], - ).to_dict() - print(online_python_response) - assert sorted(list(online_python_response.keys())) == sorted( - [ - "driver_id", - "conv_rate_plus_acc", - "counter", - "current_datetime", - "input_datetime", + python_stored_writes_feature_view.entity_columns + == self.store.get_on_demand_feature_view("python_stored_writes_feature_view").entity_columns + ) + + current_datetime = _utc_now() + fv_entity_rows_to_write = [ + { + "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, + "avg_daily_trips": 2, + "event_timestamp": current_datetime, + "created": current_datetime, + } ] - ) + odfv_entity_rows_to_write = [ + { + "driver_id": 1001, + "counter": 0, + "input_datetime": current_datetime, + } + ] + fv_entity_rows_to_read = [ + { + "driver_id": 1001, + } + ] + # Note that here we shouldn't have to pass the request source features for reading + # because they should have already been written to the online store + odfv_entity_rows_to_read = [ + { + "driver_id": 1001, + "conv_rate": 0.25, + "acc_rate": 0.25, + "counter": 0, + "input_datetime": current_datetime, + } + ] + print("storing fv features") + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", + df=fv_entity_rows_to_write, + ) + print("reading fv features") + online_python_response = self.store.get_online_features( + entity_rows=fv_entity_rows_to_read, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + ).to_dict() + print(online_python_response) + print("storing odfv features") + self.store.write_to_online_store( + feature_view_name="python_stored_writes_feature_view", + df=odfv_entity_rows_to_write, + ) + print("reading odfv features") + online_python_response = self.store.get_online_features( + entity_rows=odfv_entity_rows_to_read, + features=[ + "python_stored_writes_feature_view:conv_rate_plus_acc", + "python_stored_writes_feature_view:current_datetime", + "python_stored_writes_feature_view:counter", + "python_stored_writes_feature_view:input_datetime", + ], + ).to_dict() + print(online_python_response) + assert sorted(list(online_python_response.keys())) == sorted( + [ + "driver_id", + "conv_rate_plus_acc", + "counter", + "current_datetime", + "input_datetime", + ] + ) From 082a8604ddfe428d868c5a306e7669fb15a808c6 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 29 Sep 2024 15:09:59 -0400 Subject: [PATCH 44/45] removed print Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/inference.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 4c879dada4..3858691376 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -224,8 +224,6 @@ def _infer_features_and_entities( config ) - print('\n', "*" * 50, '\ncolumn names and types\n', "*" * 50) - print(fv.name, list(zip(table_column_names_and_types))) for col_name, col_datatype in table_column_names_and_types: if col_name in columns_to_exclude: continue From 1d52d2941c6f21dede910cc3446ad68730a7a9fd Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 29 Sep 2024 15:11:58 -0400 Subject: [PATCH 45/45] undo change for odfv file Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 16bdaa676f..1b75d23ed4 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -120,7 +120,7 @@ def __init__( # noqa: C901 owner (optional): The owner of the on demand feature view, typically the email of the primary maintainer. write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to - the online store for faster retrieval. + the online store for faster retrieval. """ super().__init__( name=name, @@ -162,6 +162,7 @@ def __init__( # noqa: C901 self.source_request_sources[odfv_source.name] = odfv_source elif isinstance(odfv_source, FeatureViewProjection): self.source_feature_view_projections[odfv_source.name] = odfv_source + else: self.source_feature_view_projections[odfv_source.name] = ( odfv_source.projection