Skip to content

Commit

Permalink
introduce kafka_settings to KafkaOptions
Browse files Browse the repository at this point in the history
Signed-off-by: zerafachris PERSONAL <[email protected]>
  • Loading branch information
zerafachris committed Jan 6, 2025
1 parent 29d13d0 commit e75d483
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
14 changes: 13 additions & 1 deletion sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def __init__(
message_format: StreamFormat,
topic: str,
watermark_delay_threshold: Optional[timedelta] = None,
kafka_settings: Optional[Dict[str, str]] = None,
):
self.kafka_bootstrap_servers = kafka_bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark_delay_threshold = watermark_delay_threshold or None
self.kafka_settings = kafka_settings or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -65,11 +67,16 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0
else kafka_options_proto.watermark_delay_threshold.ToTimedelta()
)
kafka_settings = None
if kafka_options_proto.HasField("kafka_settings"):
kafka_settings = kafka_options_proto.kafka_settings

kafka_options = cls(
kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_settings,
)

return kafka_options
Expand All @@ -85,12 +92,17 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
if self.watermark_delay_threshold is not None:
watermark_delay_threshold = Duration()
watermark_delay_threshold.FromTimedelta(self.watermark_delay_threshold)

kafka_settings = None
if self.kafka_settings is not None:
kafka_settings = self.kafka_settings

kafka_options_proto = DataSourceProto.KafkaOptions(
kafka_bootstrap_servers=self.kafka_bootstrap_servers,
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark_delay_threshold=watermark_delay_threshold,
kafka_settings=kafka_settings,
)

return kafka_options_proto
Expand Down Expand Up @@ -388,7 +400,7 @@ def __init__(
if bootstrap_servers:
warnings.warn(
(
"The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. "
"The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. "
"Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter."
),
DeprecationWarning,
Expand Down
7 changes: 4 additions & 3 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,17 @@ def _ingest_stream_data(self) -> StreamTable:
.option("startingOffsets", "latest") # Query start
)

# for k,v in self.data_source.kafka_options.kafka_settings.items():
# stream_df = stream_df.option(k,v)
if self.data_source.kafka_options.kafka_settings is not None:
for k,v in self.data_source.kafka_options.kafka_settings.items():
stream_df = stream_df.option(k,v)

stream_df = stream_df.load().selectExpr("CAST(value AS STRING)")

if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
raise ValueError("kafka source message format is not json format")
stream_df = stream_df.select(
from_json(
col("value"),
Expand Down

0 comments on commit e75d483

Please sign in to comment.