Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Aug 2, 2023
1 parent 8c650cd commit e0580d2
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,14 @@ class KinesisSink private (
)
// If Kinesis was already unhealthy, the background check is already running.
// It can happen when the collector switches back and forth between Kinesis and SQS.
if (kinesisHealthy) checkKinesisHealth()
kinesisHealthy = false
if (kinesisHealthy) {
this.synchronized {
if (kinesisHealthy) {
kinesisHealthy = false
checkKinesisHealth()
}
}
}
writeBatchToSqsWithRetries(failedRecords, sqs, minBackoff, maxRetries)
case None =>
log.error(s"No SQS buffer defined for stream $streamName. Retrying to send the events to Kinesis")
Expand All @@ -240,8 +246,14 @@ class KinesisSink private (
} else {
// If SQS was already unhealthy, the background check is already running.
// It can happen when the collector switches back and forth between Kinesis and SQS.
if (sqsHealthy) checkSqsHealth()
sqsHealthy = false
if (sqsHealthy) {
this.synchronized {
if (sqsHealthy) {
sqsHealthy = false
checkSqsHealth()
}
}
}
log.error(
s"Maximum number of retries reached for SQS buffer ${sqs.sqsBufferName} for ${failedRecords.size} records. Retrying in Kinesis"
)
Expand Down

0 comments on commit e0580d2

Please sign in to comment.