Skip to content

Commit

Permalink
modify spark kafka processor to remove repeated code
Browse files Browse the repository at this point in the history
Signed-off-by: zerafachris PERSONAL <[email protected]>
  • Loading branch information
zerafachris committed Jan 6, 2025
1 parent b97da6c commit 29d13d0
Showing 1 changed file with 31 additions and 35 deletions.
66 changes: 31 additions & 35 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,52 +80,48 @@ def ingest_stream_feature_view(
@no_type_check
def _ingest_stream_data(self) -> StreamTable:
"""Only supports json and avro formats currently."""
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
)

# for k,v in self.data_source.kafka_options.kafka_settings.items():
# stream_df = stream_df.option(k,v)

stream_df = stream_df.load().selectExpr("CAST(value AS STRING)")

if self.format == "json":
if not isinstance(
self.data_source.kafka_options.message_format, JsonFormat
):
raise ValueError("kafka source message format is not jsonformat")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
stream_df = stream_df.select(
from_json(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
elif self.format == "avro":
if not isinstance(
self.data_source.kafka_options.message_format, AvroFormat
):
raise ValueError("kafka source message format is not avro format")
stream_df = (
self.spark.readStream.format("kafka")
.option(
"kafka.bootstrap.servers",
self.data_source.kafka_options.kafka_bootstrap_servers,
)
.option("subscribe", self.data_source.kafka_options.topic)
.option("startingOffsets", "latest") # Query start
.load()
.selectExpr("CAST(value AS STRING)")
.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
.select("table.*")
stream_df = stream_df.select(
from_avro(
col("value"),
self.data_source.kafka_options.message_format.schema_json,
).alias("table")
)
else:
raise ValueError("kafka source message format is not currently supported")

stream_df = stream_df.select("table.*")

return stream_df

def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
Expand Down

0 comments on commit 29d13d0

Please sign in to comment.