Skip to content

Commit

Permalink
Merge branch 'master' into odfv-pluggable
Browse files Browse the repository at this point in the history
  • Loading branch information
tokoko authored Feb 17, 2024
2 parents f0180e6 + dd96150 commit b3221fb
Show file tree
Hide file tree
Showing 61 changed files with 876 additions and 791 deletions.
3 changes: 1 addition & 2 deletions sdk/python/feast/importer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import importlib
from typing import Optional

from feast.errors import (
FeastClassImportError,
Expand All @@ -8,7 +7,7 @@
)


def import_class(module_name: str, class_name: str, class_type: Optional[str] = None):
def import_class(module_name: str, class_name: str, class_type: str = ""):
"""
Dynamically loads and returns a class from a module.
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from types import MethodType
from typing import List, Optional
from typing import List, Optional, no_type_check

import pandas as pd
from pyspark.sql import DataFrame, SparkSession
Expand Down Expand Up @@ -76,6 +76,8 @@ def ingest_stream_feature_view(
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query

# In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40).
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
if self.format == "json":
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod
from types import MethodType
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -50,19 +50,22 @@ def __init__(
self.sfv = sfv
self.data_source = data_source

@abstractmethod
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
"""
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
"""
raise NotImplementedError

@abstractmethod
def _ingest_stream_data(self) -> StreamTable:
"""
Ingests data into a StreamTable.
"""
raise NotImplementedError

@abstractmethod
def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
Applies transformations on top of StreamTable object. Since stream engines use lazy
Expand All @@ -71,6 +74,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
"""
raise NotImplementedError

@abstractmethod
def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
"""
Launches a job to persist stream data to the online store and/or offline store, depending
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/feature_servers/aws_lambda/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Literal

from pydantic import StrictBool, StrictStr
from pydantic.typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/feature_servers/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ class BaseFeatureServerConfig(FeastConfigBaseModel):
enabled: StrictBool = False
"""Whether the feature server should be launched."""

feature_logging: Optional[FeatureLoggingConfig]
feature_logging: Optional[FeatureLoggingConfig] = None
""" Feature logging configuration """
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Literal

from pydantic import StrictBool
from pydantic.typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pydantic.typing import Literal
from typing import Literal

from feast.infra.feature_servers.base_config import BaseFeatureServerConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, List, Literal, Optional, Sequence, Union, cast

import dill
import pandas
import pandas as pd
import pyarrow
from tqdm import tqdm
Expand Down Expand Up @@ -178,9 +179,9 @@ def _materialize_one(
self.repo_config.batch_engine.partitions
)

spark_df.foreachPartition(
lambda x: _process_by_partition(x, spark_serialized_artifacts)
)
spark_df.mapInPandas(
lambda x: _map_by_partition(x, spark_serialized_artifacts), "status int"
).count() # dummy action to force evaluation

return SparkMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
Expand Down Expand Up @@ -225,38 +226,40 @@ def unserialize(self):
return feature_view, online_store, repo_config


def _process_by_partition(rows, spark_serialized_artifacts: _SparkSerializedArtifacts):
"""Load pandas df to online store"""

# convert to pyarrow table
dicts = []
for row in rows:
dicts.append(row.asDict())
def _map_by_partition(iterator, spark_serialized_artifacts: _SparkSerializedArtifacts):
for pdf in iterator:
if pdf.shape[0] == 0:
print("Skipping")
return

df = pd.DataFrame.from_records(dicts)
if df.shape[0] == 0:
print("Skipping")
return
table = pyarrow.Table.from_pandas(pdf)

table = pyarrow.Table.from_pandas(df)
(
feature_view,
online_store,
repo_config,
) = spark_serialized_artifacts.unserialize()

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

# unserialize artifacts
feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize()
join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
rows_to_write = _convert_arrow_to_proto(
table, feature_view, join_key_to_value_type
)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

rows_to_write = _convert_arrow_to_proto(table, feature_view, join_key_to_value_type)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)
yield pd.DataFrame(
[pd.Series(range(1, 2))]
) # dummy result because mapInPandas needs to return something
6 changes: 2 additions & 4 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click
import pandas as pd
from colorama import Fore, Style
from pydantic import Field, StrictStr
from pydantic import ConfigDict, Field, StrictStr
from pytz import utc
from tqdm import tqdm

Expand Down Expand Up @@ -72,9 +72,7 @@ class SnowflakeMaterializationEngineConfig(FeastConfigBaseModel):

schema_: Optional[str] = Field("PUBLIC", alias="schema")
""" Snowflake schema name """

class Config:
allow_population_by_field_name = True
model_config = ConfigDict(populate_by_name=True)


@dataclass
Expand Down
22 changes: 10 additions & 12 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -19,8 +20,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from pydantic import StrictStr, field_validator
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

from feast import flags_helper
Expand Down Expand Up @@ -72,13 +72,6 @@ def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())


class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""

Expand All @@ -102,10 +95,15 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""
table_create_disposition: Literal[
"CREATE_NEVER", "CREATE_IF_NEEDED"
] = "CREATE_IF_NEEDED"
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED.
Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition
"""

@validator("billing_project_id")
@field_validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -18,7 +19,6 @@
import pyarrow
import pyarrow as pa
from pydantic import StrictStr
from pydantic.typing import Literal
from pytz import utc

from feast import OnDemandFeatureView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:

table_name = destination_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import warnings
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union

import numpy as np
import pandas
import pyarrow
import pyarrow as pa
import sqlalchemy
from pydantic.types import StrictStr
from pydantic.typing import Literal
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
Expand All @@ -32,7 +31,7 @@
from feast.infra.provider import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastBaseModel, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.type_map import pa_to_mssql_type
from feast.usage import log_exceptions_and_usage
Expand All @@ -43,7 +42,7 @@
EntitySchema = Dict[str, np.dtype]


class MsSqlServerOfflineStoreConfig(FeastBaseModel):
class MsSqlServerOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for SQL Server"""

type: Literal["mssql"] = "mssql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
# Make sure the field mapping is correct and convert the datetime datasources.
if timestamp_field in df:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Iterator,
KeysView,
List,
Literal,
Optional,
Tuple,
Union,
Expand All @@ -19,7 +20,6 @@
import pyarrow as pa
from jinja2 import BaseLoader, Environment
from psycopg2 import sql
from pydantic.typing import Literal
from pytz import utc

from feast.data_source import DataSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
suffix: Optional[str] = None,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
destination_name = self.get_prefixed_table_name(destination_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyspark.sql import SparkSession

from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.contrib.spark_offline_store.spark import (
SparkOfflineStoreConfig,
)
Expand Down Expand Up @@ -68,10 +69,10 @@ def create_data_source(
self,
df: pd.DataFrame,
destination_name: str,
timestamp_field="ts",
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Optional[Dict[str, str]] = None,
**kwargs,
timestamp_field: Optional[str] = "ts",
) -> DataSource:
if timestamp_field in df:
df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True)
Expand Down Expand Up @@ -119,3 +120,7 @@ def create_saved_dataset_destination(self) -> SavedDatasetSparkStorage:

def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def create_logged_features_destination(self) -> LoggingDestination:
# No implementation of LoggingDestination for Spark offline store.
return None # type: ignore
Loading

0 comments on commit b3221fb

Please sign in to comment.