Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding write capabability to online store to on demand feature … #4418

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ac9907a
merged changes
franciscojavierarceo Sep 24, 2024
75bf9b0
saving progress
franciscojavierarceo Aug 17, 2024
6d19458
merged changes to odfv
franciscojavierarceo Sep 24, 2024
f5ba043
linted
franciscojavierarceo Aug 18, 2024
f06bca3
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
bf8bed9
updated test case
franciscojavierarceo Aug 21, 2024
163e940
saving progress
franciscojavierarceo Aug 21, 2024
0673caa
merging
franciscojavierarceo Sep 24, 2024
abc345b
merged
franciscojavierarceo Sep 24, 2024
7f7b56b
merged
franciscojavierarceo Sep 24, 2024
27d47ce
merging
franciscojavierarceo Sep 24, 2024
46c9534
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
001f1df
adding entity to odfv
franciscojavierarceo Aug 29, 2024
746a4e0
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
78b9e60
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
a6aedd1
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
7d7a35c
debugging
franciscojavierarceo Sep 1, 2024
35dc3ad
merged
franciscojavierarceo Sep 24, 2024
a67f46b
merging
franciscojavierarceo Sep 24, 2024
a4dcd3c
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
05d1cfa
Merging changes continued
franciscojavierarceo Sep 7, 2024
bc36ca8
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
c8502c5
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
90c92ee
checking in progress
franciscojavierarceo Sep 9, 2024
85a640a
adding logs
franciscojavierarceo Sep 10, 2024
8b19e84
updating permissions
franciscojavierarceo Sep 10, 2024
463f2a8
going to error out on purpose
franciscojavierarceo Sep 10, 2024
81d27ae
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
f64c5e7
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
5451380
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
47fdd51
adding print
franciscojavierarceo Sep 21, 2024
7804125
removing print
franciscojavierarceo Sep 21, 2024
2efbd88
checking in progress
franciscojavierarceo Sep 23, 2024
140a1e2
updating test
franciscojavierarceo Sep 25, 2024
7cce27d
adding test
franciscojavierarceo Sep 25, 2024
fd5beda
linted and updated
franciscojavierarceo Sep 25, 2024
4c941a9
removed print
franciscojavierarceo Sep 25, 2024
272b0f2
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# We need to attach the time stamp fields to the underlying data sources
# and cascade the dependencies
update_feature_views_with_inferred_features_and_entities(
odfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
for sfv in sfvs_to_update:
if not sfv.schema:
Expand All @@ -618,8 +623,13 @@ def _make_inferences(
for odfv in odfvs_to_update:
odfv.infer_features()

odfvs_to_write = [
odfv for odfv in odfvs_to_update if odfv.write_to_online_store
]
# Update to include ODFVs with write to online store
fvs_to_update_map = {
view.name: view for view in [*views_to_update, *sfvs_to_update]
view.name: view
for view in [*views_to_update, *sfvs_to_update, *odfvs_to_write]
}
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
Expand Down Expand Up @@ -847,6 +857,11 @@ def apply(
]
sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
odfvs_with_writes_to_update = [
ob
for ob in objects
if isinstance(ob, OnDemandFeatureView) and ob.write_to_online_store
]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
data_sources_set_to_update = {
ob for ob in objects if isinstance(ob, DataSource)
Expand All @@ -868,10 +883,20 @@ def apply(
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
for fv in itertools.chain(
views_to_update, sfvs_to_update, odfvs_with_writes_to_update
):
if isinstance(fv, FeatureView):
data_sources_set_to_update.add(fv.batch_source)
if isinstance(fv, StreamFeatureView):
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
data_sources_set_to_update.add(
fv.source_feature_view_projections[source_fvp].batch_source
)
else:
pass

for odfv in odfvs_to_update:
for v in odfv.source_request_sources.values():
Expand Down Expand Up @@ -989,7 +1014,9 @@ def apply(
tables_to_delete: List[FeatureView] = (
views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
)
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore
tables_to_keep: List[FeatureView] = (
views_to_update + sfvs_to_update + odfvs_with_writes_to_update
) # type: ignore

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -1444,19 +1471,18 @@ def write_to_online_store(
inputs: Optional the dictionary object to be written
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view: FeatureView = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
raise FeatureViewNotFoundException(feature_view_name, self.project)
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")
if df is None and inputs is not None:
if isinstance(inputs, dict):
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
except Exception as _:
Expand All @@ -1465,8 +1491,20 @@ def write_to_online_store(
pass
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")
if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError

provider = self._get_provider()
provider.ingest_df(feature_view, df)
if isinstance(feature_view, OnDemandFeatureView):
# TODO: add projection mapping
projection_mapping = {}
provider.ingest_df(feature_view, df, projection_mapping)
else:
provider.ingest_df(feature_view, df)

def write_to_offline_store(
self,
Expand Down
28 changes: 15 additions & 13 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

entity_columns = []
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
Expand All @@ -233,9 +234,9 @@ def _infer_features_and_entities(
),
)
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
entity_column.name for entity_column in entity_columns
]:
fv.entity_columns.append(field)
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
Expand All @@ -256,6 +257,8 @@ def _infer_features_and_entities(
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)

fv.entity_columns = entity_columns


def _infer_on_demand_features_and_entities(
fv: OnDemandFeatureView,
Expand All @@ -282,18 +285,17 @@ def _infer_on_demand_features_and_entities(

batch_source = getattr(source_feature_view, "batch_source")
batch_field_mapping = getattr(batch_source or None, "field_mapping")
if batch_field_mapping:
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = (
batch_source.get_table_column_names_and_types(config)
)
table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
Expand Down
41 changes: 31 additions & 10 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ def online_write_batch(
)

else:
if (
table_name
== "test_on_demand_python_transformation_python_stored_writes_feature_view"
):
print(
f"writing online batch for {table_name} - {feature_name} = {val}"
)
conn.execute(
f"""
UPDATE {table_name}
Expand All @@ -175,18 +182,31 @@ def online_write_batch(
),
)

conn.execute(
f"""INSERT OR IGNORE INTO {table_name}
# try:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this once testing is done

if True:
conn.execute(
f"""INSERT OR IGNORE INTO {table_name}
(entity_key, feature_name, value, event_ts, created_ts)
VALUES (?, ?, ?, ?, ?)""",
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
),
)
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
),
)
else:
# print(
# f"error writing online batch for {table_name} - {feature_name} = {val}\n {e}"
# )
print(
f'querying all records for table: {conn.execute(f"select * from {table_name}").fetchall()}'
)
r = conn.execute("""
SELECT * FROM sqlite_master WHERE type='table' and name = 'test_on_demand_python_transformation_python_stored_writes_feature_view';
""")
print(f"table exists: {r.fetchall()}")
if progress:
progress(1)

Expand Down Expand Up @@ -253,6 +273,7 @@ def update(
project = config.project

for table in tables_to_keep:
print(f"updating {_table_id(project, table)}")
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
Expand Down
41 changes: 28 additions & 13 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa
from tqdm import tqdm

from feast import importer
from feast import OnDemandFeatureView, importer
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
Expand Down Expand Up @@ -276,23 +276,38 @@ def ingest_df(
self,
feature_view: FeatureView,
df: pd.DataFrame,
field_mapping: Optional[Dict] = None,
):
table = pa.Table.from_pandas(df)

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
if isinstance(feature_view, OnDemandFeatureView):
table = _run_pyarrow_field_mapping(table, field_mapping)
join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)
else:
# Note: A dictionary mapping of column names in this data
# source to feature names in a feature table or view. Only used for feature
# columns, not entity or timestamp columns.
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
join_keys = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)

def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
if feature_view.batch_source.field_mapping is not None:
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def __init__( # noqa: C901
owner (optional): The owner of the on demand feature view, typically the email
of the primary maintainer.
write_to_online_store (optional): A boolean that indicates whether to write the on demand feature view to
the online store for faster retrieval.
the online store for faster retrieval.
"""
super().__init__(
name=name,
Expand Down Expand Up @@ -162,7 +162,6 @@ def __init__( # noqa: C901
self.source_request_sources[odfv_source.name] = odfv_source
elif isinstance(odfv_source, FeatureViewProjection):
self.source_feature_view_projections[odfv_source.name] = odfv_source

else:
self.source_feature_view_projections[odfv_source.name] = (
odfv_source.projection
Expand Down
36 changes: 34 additions & 2 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from enum import Enum
from typing import Dict, Union

import pyarrow

from feast.value_type import ValueType

PRIMITIVE_FEAST_TYPES_TO_VALUE_TYPES = {
Expand All @@ -30,6 +33,10 @@
}


def _utc_now() -> datetime:
return datetime.now(tz=timezone.utc)


class ComplexFeastType(ABC):
"""
A ComplexFeastType represents a structured type that is recognized by Feast.
Expand Down Expand Up @@ -103,7 +110,6 @@ def __hash__(self):
Float64 = PrimitiveFeastType.FLOAT64
UnixTimestamp = PrimitiveFeastType.UNIX_TIMESTAMP


SUPPORTED_BASE_TYPES = [
Invalid,
String,
Expand Down Expand Up @@ -159,7 +165,6 @@ def __str__(self):

FeastType = Union[ComplexFeastType, PrimitiveFeastType]


VALUE_TYPES_TO_FEAST_TYPES: Dict["ValueType", FeastType] = {
ValueType.UNKNOWN: Invalid,
ValueType.BYTES: Bytes,
Expand All @@ -180,6 +185,33 @@ def __str__(self):
ValueType.UNIX_TIMESTAMP_LIST: Array(UnixTimestamp),
}

FEAST_TYPES_TO_PYARROW_TYPES = {
String: pyarrow.string(),
Bool: pyarrow.bool_(),
Int32: pyarrow.int32(),
Int64: pyarrow.int64(),
Float32: pyarrow.float32(),
Float64: pyarrow.float64(),
# Note: datetime only supports microseconds https://github.com/python/cpython/blob/3.8/Lib/datetime.py#L1559
UnixTimestamp: pyarrow.timestamp("us", tz=_utc_now().tzname()),
}


def from_feast_to_pyarrow_type(feast_type: FeastType) -> pyarrow.DataType:
"""
Converts a Feast type to a PyArrow type.

Args:
feast_type: The Feast type to be converted.

Raises:
ValueError: The conversion could not be performed.
"""
if feast_type in FEAST_TYPES_TO_PYARROW_TYPES:
return FEAST_TYPES_TO_PYARROW_TYPES[feast_type]

raise ValueError(f"Could not convert Feast type {feast_type} to PyArrow type.")


def from_value_type(
value_type: ValueType,
Expand Down
Loading
Loading