You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Batch the data using foreachBatch and then write the data into S3.
Should be using S3 as checkpoint location but using HDFS is also possible,
The problem i am facing here is, when i am using foreachbatch the shard-commits folder is not getting updated for each batch, its not writing anything more than shardt-commit/0
attached is the checkpoint dump file its generating when writing to local.
The behaviour is same when i am writing the checkpoint to S3 as well.
I have tried increasing the number of attempts to 20 thiniking it might be a problem with EC of S3 but when tried on my local or HDFS I have the same problem
Quick help will be much appreciated.
The text was updated successfully, but these errors were encountered:
Hi,
I am using this repo to build structured streaming kinesis data streams using spark 2.4.0.
I wanted to understand if we have support for foreachbatch sinks.
https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
To explain further,
My use case is
The problem i am facing here is, when i am using foreachbatch the shard-commits folder is not getting updated for each batch, its not writing anything more than shardt-commit/0
attached is the checkpoint dump file its generating when writing to local.
The behaviour is same when i am writing the checkpoint to S3 as well.
checkpoint.zip
Sample code is below
''' val roundStreamDf = sc.readStream
.format("kinesis")
.option("streamName",streamName)
.option("endpointUrl",s"https://kinesis.${region}.amazonaws.com")
.option("awsAccessKeyId", acceskeyId)
.option("awsSecretKey", secretAccessKey)
.option("startingposition",START_POS)
.option("kinesis.client.describeShardInterval","3600")
.option("kinesis.client.avoidEmptyBatches",true)
.load()
val roundStreamIntermediateDf = roundStreamDf
.selectExpr("cast (data as STRING) jsonData")
.select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
.select("round.*")
roundStreamIntermediateDf
.writeStream
.foreachBatch{(batchDf: DataFrame, batchId: Long ) =>
val RoundDf = commonRoundDataProcess(batchDf)
RoundDf.persist()
while(opNameIterator.hasNext){
val opName = opNameIterator.next()
val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName)
if(!finalRoundDf.head(1).isEmpty){ accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA)
}
}
RoundDf.unpersist()
}
.outputMode("Update")
.trigger(Trigger.ProcessingTime("60 seconds"))
.option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/")
.start()
.awaitTermination()'''
Th code basically read the stream and have some processes that needs to be carried out in foreachBatch Sink and then be pushed to S3.
attached is the error log.
error.log
I have tried increasing the number of attempts to 20 thiniking it might be a problem with EC of S3 but when tried on my local or HDFS I have the same problem
Quick help will be much appreciated.
The text was updated successfully, but these errors were encountered: