From 29d13d0544889d1e030447aedc263d60e1193554 Mon Sep 17 00:00:00 2001 From: zerafachris PERSONAL Date: Mon, 6 Jan 2025 21:15:30 +0100 Subject: [PATCH] modify spark kafka processor to remove repeated code Signed-off-by: zerafachris PERSONAL --- .../infra/contrib/spark_kafka_processor.py | 66 +++++++++---------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index e148000bc9..c51c0869f5 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -80,52 +80,48 @@ def ingest_stream_feature_view( @no_type_check def _ingest_stream_data(self) -> StreamTable: """Only supports json and avro formats currently.""" + stream_df = ( + self.spark.readStream.format("kafka") + .option( + "kafka.bootstrap.servers", + self.data_source.kafka_options.kafka_bootstrap_servers, + ) + .option("subscribe", self.data_source.kafka_options.topic) + .option("startingOffsets", "latest") # Query start + ) + + # for k,v in self.data_source.kafka_options.kafka_settings.items(): + # stream_df = stream_df.option(k,v) + + stream_df = stream_df.load().selectExpr("CAST(value AS STRING)") + if self.format == "json": if not isinstance( self.data_source.kafka_options.message_format, JsonFormat ): raise ValueError("kafka source message format is not jsonformat") - stream_df = ( - self.spark.readStream.format("kafka") - .option( - "kafka.bootstrap.servers", - self.data_source.kafka_options.kafka_bootstrap_servers, - ) - .option("subscribe", self.data_source.kafka_options.topic) - .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( - from_json( - col("value"), - self.data_source.kafka_options.message_format.schema_json, - ).alias("table") - ) - .select("table.*") + stream_df = stream_df.select( + from_json( + col("value"), + self.data_source.kafka_options.message_format.schema_json, + ).alias("table") ) - else: + elif self.format == "avro": if not isinstance( self.data_source.kafka_options.message_format, AvroFormat ): raise ValueError("kafka source message format is not avro format") - stream_df = ( - self.spark.readStream.format("kafka") - .option( - "kafka.bootstrap.servers", - self.data_source.kafka_options.kafka_bootstrap_servers, - ) - .option("subscribe", self.data_source.kafka_options.topic) - .option("startingOffsets", "latest") # Query start - .load() - .selectExpr("CAST(value AS STRING)") - .select( - from_avro( - col("value"), - self.data_source.kafka_options.message_format.schema_json, - ).alias("table") - ) - .select("table.*") + stream_df = stream_df.select( + from_avro( + col("value"), + self.data_source.kafka_options.message_format.schema_json, + ).alias("table") ) + else: + raise ValueError("kafka source message format is not currently supported") + + stream_df = stream_df.select("table.*") + return stream_df def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: