From 8c650cdfba85b90650c5490ee8f5a215237cbf07 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Tue, 1 Aug 2023 17:25:43 +0200 Subject: [PATCH] Run background check for Kinesis if it is made unhealthy and SQS buffer is activated (close #315) --- .../sinks/KinesisSink.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 8c7f4396b..21c4f7957 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -213,16 +213,20 @@ class KinesisSink private ( ) scheduleRetryToKinesis(failedRecords, nextBackoff, retriesLeft - 1) } else { - kinesisHealthy = false log.error(s"Maximum number of retries reached for Kinesis stream $streamName for ${failedRecords.size} records") maybeSqs match { case Some(sqs) => log.error( s"SQS buffer ${sqs.sqsBufferName} defined for stream $streamName. Retrying to send the events to SQS" ) + // If Kinesis was already unhealthy, the background check is already running. + // It can happen when the collector switches back and forth between Kinesis and SQS. + if (kinesisHealthy) checkKinesisHealth() + kinesisHealthy = false writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries) case None => log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis") + kinesisHealthy = false writeBatchToKinesisWithRetries(failedRecords, maxBackoff, maxRetries) } } @@ -234,6 +238,9 @@ class KinesisSink private ( ) scheduleRetryToSqs(failedRecords, sqs, nextBackoff, retriesLeft - 1) } else { + // If SQS was already unhealthy, the background check is already running. + // It can happen when the collector switches back and forth between Kinesis and SQS. + if (sqsHealthy) checkSqsHealth() sqsHealthy = false log.error( s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis" @@ -353,6 +360,7 @@ class KinesisSink private ( private def checkKinesisHealth(): Unit = { val healthRunnable = new Runnable { override def run() { + log.info(s"Starting background check for Kinesis stream $streamName") while (!kinesisHealthy) { Try { val describeRequest = new DescribeStreamSummaryRequest() @@ -379,6 +387,7 @@ class KinesisSink private ( private def checkSqsHealth(): Unit = maybeSqs.foreach { sqs => val healthRunnable = new Runnable { override def run() { + log.info(s"Starting background check for SQS buffer ${sqs.sqsBufferName}") while (!sqsHealthy) { Try { sqs.sqsClient.getQueueUrl(sqs.sqsBufferName)