diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 25475fcb4c..bc8ebbbd8b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -41,11 +41,13 @@ def __init__( message_format: StreamFormat, topic: str, watermark_delay_threshold: Optional[timedelta] = None, + kafka_settings: Optional[Dict[str, str]] = None, ): self.kafka_bootstrap_servers = kafka_bootstrap_servers self.message_format = message_format self.topic = topic self.watermark_delay_threshold = watermark_delay_threshold or None + self.kafka_settings = kafka_settings or None @classmethod def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): @@ -65,11 +67,16 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): if kafka_options_proto.watermark_delay_threshold.ToNanoseconds() == 0 else kafka_options_proto.watermark_delay_threshold.ToTimedelta() ) + kafka_settings = None + if kafka_options_proto.HasField("kafka_settings"): + kafka_settings = kafka_options_proto.kafka_settings + kafka_options = cls( kafka_bootstrap_servers=kafka_options_proto.kafka_bootstrap_servers, message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, watermark_delay_threshold=watermark_delay_threshold, + kafka_settings=kafka_settings, ) return kafka_options @@ -85,12 +92,17 @@ def to_proto(self) -> DataSourceProto.KafkaOptions: if self.watermark_delay_threshold is not None: watermark_delay_threshold = Duration() watermark_delay_threshold.FromTimedelta(self.watermark_delay_threshold) + + kafka_settings = None + if self.kafka_settings is not None: + kafka_settings = self.kafka_settings kafka_options_proto = DataSourceProto.KafkaOptions( kafka_bootstrap_servers=self.kafka_bootstrap_servers, message_format=self.message_format.to_proto(), topic=self.topic, watermark_delay_threshold=watermark_delay_threshold, + kafka_settings=kafka_settings, ) return kafka_options_proto @@ -388,7 +400,7 @@ def __init__( if bootstrap_servers: warnings.warn( ( - "The 'bootstrap_servers' parameter has been deprecated in favor of 'kafka_bootstrap_servers'. " + "The 'bootstrap_servers' parameter has been deprecated in favour of 'kafka_bootstrap_servers'. " "Feast 0.25 and onwards will not support the 'bootstrap_servers' parameter." ), DeprecationWarning, diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index c51c0869f5..874fb403b4 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -90,8 +90,9 @@ def _ingest_stream_data(self) -> StreamTable: .option("startingOffsets", "latest") # Query start ) - # for k,v in self.data_source.kafka_options.kafka_settings.items(): - # stream_df = stream_df.option(k,v) + if self.data_source.kafka_options.kafka_settings is not None: + for k,v in self.data_source.kafka_options.kafka_settings.items(): + stream_df = stream_df.option(k,v) stream_df = stream_df.load().selectExpr("CAST(value AS STRING)") @@ -99,7 +100,7 @@ def _ingest_stream_data(self) -> StreamTable: if not isinstance( self.data_source.kafka_options.message_format, JsonFormat ): - raise ValueError("kafka source message format is not jsonformat") + raise ValueError("kafka source message format is not json format") stream_df = stream_df.select( from_json( col("value"),