Skip to content

Commit

Permalink
Run background check for Kinesis if it is made unhealthy and SQS buff…
Browse files Browse the repository at this point in the history
…er is activated (close #315)
  • Loading branch information
benjben committed Aug 1, 2023
1 parent d9ee385 commit 8c650cd
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,20 @@ class KinesisSink private (
)
scheduleRetryToKinesis(failedRecords, nextBackoff, retriesLeft - 1)
} else {
kinesisHealthy = false
log.error(s"Maximum number of retries reached for Kinesis stream $streamName for ${failedRecords.size} records")
maybeSqs match {
case Some(sqs) =>
log.error(
s"SQS buffer ${sqs.sqsBufferName} defined for stream $streamName. Retrying to send the events to SQS"
)
// If Kinesis was already unhealthy, the background check is already running.
// It can happen when the collector switches back and forth between Kinesis and SQS.
if (kinesisHealthy) checkKinesisHealth()
kinesisHealthy = false
writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries)
case None =>
log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis")
kinesisHealthy = false
writeBatchToKinesisWithRetries(failedRecords, maxBackoff, maxRetries)
}
}
Expand All @@ -234,6 +238,9 @@ class KinesisSink private (
)
scheduleRetryToSqs(failedRecords, sqs, nextBackoff, retriesLeft - 1)
} else {
// If SQS was already unhealthy, the background check is already running.
// It can happen when the collector switches back and forth between Kinesis and SQS.
if (sqsHealthy) checkSqsHealth()
sqsHealthy = false
log.error(
s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis"
Expand Down Expand Up @@ -353,6 +360,7 @@ class KinesisSink private (
private def checkKinesisHealth(): Unit = {
val healthRunnable = new Runnable {
override def run() {
log.info(s"Starting background check for Kinesis stream $streamName")
while (!kinesisHealthy) {
Try {
val describeRequest = new DescribeStreamSummaryRequest()
Expand All @@ -379,6 +387,7 @@ class KinesisSink private (
private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs =>
val healthRunnable = new Runnable {
override def run() {
log.info(s"Starting background check for SQS buffer ${sqs.sqsBufferName}")
while (!sqsHealthy) {
Try {
sqs.sqsClient.getQueueUrl(sqs.sqsBufferName)
Expand Down

0 comments on commit 8c650cd

Please sign in to comment.