Skip to content

Commit

Permalink
Merge branch 'master' into without-book-page
Browse files Browse the repository at this point in the history
# Conflicts:
#	README.md
#	gradle.properties
#	src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt
#	src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt
#	src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt
#	src/main/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessor.kt
#	src/main/kotlin/com/exactpro/th2/kafka/client/utility/EventStoreExtensions.kt
#	src/test/kotlin/com/exactpro/th2/kafka/client/ConfigTest.kt
#	src/test/kotlin/com/exactpro/th2/kafka/client/KafkaConnectionTest.kt
#	src/test/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessorTest.kt
  • Loading branch information
Oleg Smelov committed Feb 17, 2023
2 parents d0a25db + 9c179de commit 436e4a4
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 109 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# KafkaConnect (0.0.3)
# KafkaConnect (0.1.0)
The "KafkaConnect" component is responsible for the communication with Kafka;

## Configuration
Expand Down Expand Up @@ -39,6 +39,7 @@ This configuration should be specified in the custom configuration block in sche
timeSpanUnit : "MILLISECONDS"
reconnectBackoffMs: 50
reconnectBackoffMaxMs: 1000
kafkaConnectionEvents: true
```
Parameters:
Expand All @@ -62,6 +63,7 @@ Parameters:
+ timeSpanUnit time unit for `timeSpan`
+ reconnectBackoffMs - The amount of time in milliseconds to wait before attempting to reconnect to a given host. Should be positive.
+ reconnectBackoffMaxMs - The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Should be positive.
+ kafkaConnectionEvents - Generate TH2 events on lost connection and restore connection to Kafka. `false` by default.

## Reconnect behaviour

Expand Down Expand Up @@ -98,6 +100,10 @@ spec:

## Release notes

### 0.1.0

+ Migrated to Books & Pages concept

### 0.0.3

+ Publishing to Kafka support
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jar {

dependencies {
api platform('com.exactpro.th2:bom:4.1.0')
implementation 'com.exactpro.th2:common:3.44.0'
implementation 'com.exactpro.th2:common:5.1.0-dev-version-5-4085018593-8adee33-SNAPSHOT'
implementation 'com.exactpro.th2:common-utils:0.0.1-book-and-page-3836172629-SNAPSHOT'

implementation 'org.apache.kafka:kafka-clients:3.4.0'

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kotlin.code.style=official
kotlin_version=1.6.21
release_version = 0.0.3
release_version = 0.1.0
description='Kafka Client'
vcs_url=https://github.com/th2-net/th2-conn-kafka
5 changes: 5 additions & 0 deletions src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class Config(
*/
val reconnectBackoffMaxMs: Int = 1000,

/**
* Generate TH2 event on connect|disconnect Kafka
*/
val kafkaConnectionEvents: Boolean = false,

val createTopics: Boolean = false,
val topicsToCreate: List<String> = emptyList(),
val newTopicsPartitions: Int = 1,
Expand Down
59 changes: 48 additions & 11 deletions src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.exactpro.th2.kafka.client

import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageMetadata
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.common.utils.message.sessionAlias
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.google.protobuf.UnsafeByteOperations
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
Expand All @@ -33,14 +36,17 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import java.io.Closeable
import java.time.Duration
import java.time.Instant
import java.util.HashSet
import java.util.Collections
import java.util.concurrent.CompletableFuture

class KafkaConnection(
private val config: Config,
private val factory: CommonFactory,
private val messageProcessor: RawMessageProcessor,
private val eventSender: EventSender,
kafkaClientsFactory: KafkaClientsFactory
Expand All @@ -49,39 +55,67 @@ class KafkaConnection(
private val producer: Producer<String, ByteArray> = kafkaClientsFactory.getKafkaProducer()

fun publish(message: RawMessage) {
val alias = message.metadata.id.connectionId.sessionAlias
val alias = message.sessionAlias ?: error("Message '${message.id.logId}' does not contain session alias.")
val kafkaStream = config.aliasToTopicAndKey[alias] ?: KafkaStream(config.aliasToTopic[alias]?.topic ?: error("Session alias '$alias' not found."), null)
val value = message.body.toByteArray()
val messageIdBuilder = message.metadata.id.toBuilder().apply {
val messageIdBuilder = message.id.toBuilder().apply {
direction = Direction.SECOND
bookName = factory.boxConfiguration.bookName
setConnectionId(connectionIdBuilder.setSessionGroup(config.aliasToSessionGroup.getValue(alias)))
}

val messageFuture = CompletableFuture<RawMessage>()
messageProcessor.onMessage(
RawMessage.newBuilder()
.setMetadata(message.metadata.toBuilder().setId(messageIdBuilder))
.setBody(message.body)
.setBody(message.body),
messageFuture::complete
)

val kafkaRecord = ProducerRecord<String, ByteArray>(kafkaStream.topic, kafkaStream.key, value)
producer.send(kafkaRecord) { _, exception: Throwable? ->
val outMessage = messageFuture.get()
if (exception == null) {
val msgText = "Message '${message.logId}' sent to Kafka"
val msgText = "Message '${outMessage.id.logId}' sent to Kafka"
LOGGER.info(msgText)
eventSender.onEvent(msgText, "Send message", message)
eventSender.onEvent(msgText, "Send message", outMessage)
} else {
throw RuntimeException("Failed to send message '${message.logId}' to Kafka", exception)
throw RuntimeException("Failed to send message '${outMessage.id.logId}' to Kafka", exception)
}
}
}

private fun isKafkaAvailable(): Boolean = try {
consumer.listTopics(POLL_TIMEOUT)
true
} catch (e: TimeoutException) {
false
}

override fun run() = try {
val startTimestamp = Instant.now().toEpochMilli()
consumer.subscribe(config.topicToAlias.keys + config.topicAndKeyToAlias.map { it.key.topic })

while (!Thread.currentThread().isInterrupted) {
val records: ConsumerRecords<String?, ByteArray> = consumer.poll(POLL_TIMEOUT)
if (records.isEmpty) continue

if (records.isEmpty) {
if (config.kafkaConnectionEvents && !isKafkaAvailable()) {
val failedToConnectMessage = "Failed to connect Kafka"
LOGGER.error(failedToConnectMessage)
eventSender.onEvent(failedToConnectMessage, CONNECTIVITY_EVENT_TYPE, status = Event.Status.FAILED)

while (!Thread.currentThread().isInterrupted && !isKafkaAvailable()) {
/* wait for connection */
}

val connectionRestoredMessage = "Kafka connection restored"
LOGGER.info(connectionRestoredMessage)
eventSender.onEvent(connectionRestoredMessage, CONNECTIVITY_EVENT_TYPE)
}
continue
}

LOGGER.trace { "Batch with ${records.count()} records polled from Kafka" }

val topicsToSkip: MutableSet<String> = HashSet()
Expand All @@ -96,15 +130,15 @@ class KafkaConnection(
val msgText =
"Inactivity period exceeded ($inactivityPeriod ms). Skipping unread messages in '$topicToSkip' topic."
LOGGER.info { msgText }
eventSender.onEvent(msgText, "ConnectivityServiceEvent")
eventSender.onEvent(msgText, CONNECTIVITY_EVENT_TYPE)
} else {
if (record.topic() in topicsToSkip) continue

val alias = config.topicToAlias[record.topic()]
?: config.topicAndKeyToAlias[KafkaStream(record.topic(), record.key(), true)]
?: continue

val messageID = MessageID.newBuilder()
val messageID = factory.newMessageIDBuilder()
.setConnectionId(
ConnectionID.newBuilder()
.setSessionAlias(alias)
Expand All @@ -127,6 +161,8 @@ class KafkaConnection(
}
}
}
} catch (e: InterruptedException) {
LOGGER.info("Polling thread interrupted")
} catch (e: Exception) {
val errorMessage = "Failed to read messages from Kafka"
LOGGER.error(errorMessage, e)
Expand All @@ -143,6 +179,7 @@ class KafkaConnection(
companion object {
private val LOGGER = KotlinLogging.logger {}
private val POLL_TIMEOUT = Duration.ofMillis(100L)
private const val CONNECTIVITY_EVENT_TYPE = "ConnectivityServiceEvent"

fun createTopics(config: Config) {
if (config.topicsToCreate.isEmpty()) return
Expand Down
38 changes: 25 additions & 13 deletions src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
package com.exactpro.th2.kafka.client

import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.event.EventUtils.toEventID
import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.message.bookName
import com.exactpro.th2.common.message.toJson
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.common.schema.message.DeliveryMetadata
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.kafka.client.utility.storeEvent
import mu.KotlinLogging
import java.util.Deque
Expand Down Expand Up @@ -65,30 +67,40 @@ fun main(args: Array<String>) {
val messageProcessor = RawMessageProcessor(config.batchSize, config.timeSpan, config.timeSpanUnit) {
LOGGER.trace { "Sending batch with ${it.messagesCount} messages to MQ." }
it.runCatching(messageRouterRawBatch::send)
.onFailure { e -> LOGGER.error(e) { "Could not send message batch to MQ: ${it.toJson()}" } }
.onFailure { e -> LOGGER.error(e) {
it.messagesOrBuilderList
"Could not send message batch to MQ: ${it.toJson()}" }
}
}.apply { resources += "message processor" to ::close }

val rootEventId = toEventID(factory.rootEventId) ?: error("Failed to get root event id")
val eventSender = EventSender(factory.eventBatchRouter, rootEventId)
val eventSender = EventSender(factory.eventBatchRouter, factory.rootEventId)

if (config.createTopics) KafkaConnection.createTopics(config)
val connection = KafkaConnection(config, messageProcessor, eventSender, KafkaClientsFactory(config))
val connection = KafkaConnection(config, factory, messageProcessor, eventSender, KafkaClientsFactory(config))
.apply { resources += "kafka connection" to ::close }

Executors.newSingleThreadExecutor().apply {
resources += "executor service" to { this.shutdownNow() }
execute(connection)
}

val mqListener: (String, RawMessageBatch) -> Unit = { consumerTag, batch ->
val mqListener: (DeliveryMetadata, RawMessageBatch) -> Unit = { metadata, batch ->
LOGGER.trace { "Batch with ${batch.messagesCount} messages received from MQ"}
for (message in batch.messagesList) {
LOGGER.trace { "Message ${message.logId} extracted from batch." }
LOGGER.trace { "Message ${message.id.logId} extracted from batch." }

val bookName = message.bookName
if (bookName.isNotEmpty() && bookName != factory.boxConfiguration.bookName) {
val errorText = "Expected bookName: '${factory.boxConfiguration.bookName}', actual '$bookName' in message ${message.id.logId}"
LOGGER.error { errorText }
eventSender.onEvent(errorText, "Error", status = Event.Status.FAILED)
continue
}

runCatching {
connection.publish(message)
}.onFailure {
val errorText = "Could not publish message ${message.logId}. Consumer tag $consumerTag"
val errorText = "Could not publish message ${message.id.logId}. Consumer tag ${metadata.consumerTag}"
LOGGER.error(it) { errorText }
eventSender.onEvent(errorText, "SendError", message, it)
}
Expand All @@ -102,7 +114,6 @@ fun main(args: Array<String>) {
}.onFailure {
throw IllegalStateException("Failed to subscribe to input queue", it)
}

}.onFailure {
LOGGER.error(it) { "Error during working with Kafka connection. Exiting the program" }
exitProcess(2)
Expand All @@ -119,7 +130,8 @@ class EventSender(private val eventRouter: MessageRouter<EventBatch>, private va
type: String,
message: RawMessage? = null,
exception: Throwable? = null,
status: Event.Status? = null
status: Event.Status? = null,
parentEventId: EventID? = null
) {
val event = Event
.start()
Expand All @@ -128,7 +140,7 @@ class EventSender(private val eventRouter: MessageRouter<EventBatch>, private va
.type(type)

if (message != null) {
event.messageID(message.metadata.id)
event.messageID(message.id)
}

if (exception != null) {
Expand All @@ -139,6 +151,6 @@ class EventSender(private val eventRouter: MessageRouter<EventBatch>, private va
event.status(status)
}

eventRouter.storeEvent(event, message?.parentEventId ?: rootEventId)
eventRouter.storeEvent(event, parentEventId ?: rootEventId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package com.exactpro.th2.kafka.client
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.message.direction
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.message.toTimestamp
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.common.utils.message.toTimestamp
import com.exactpro.th2.common.utils.message.direction
import com.exactpro.th2.common.utils.message.sessionGroup
import mu.KotlinLogging
import java.time.Instant
import java.util.concurrent.Executors
Expand All @@ -26,7 +28,7 @@ class RawMessageProcessor(
private val maxFlushTimeUnit: TimeUnit,
private val onBatch: (RawMessageBatch) -> Unit
) : AutoCloseable {
private val messageQueue: BlockingQueue<RawMessage.Builder> = LinkedBlockingQueue()
private val messageQueue: BlockingQueue<Pair<RawMessage.Builder, (RawMessage) -> Unit>> = LinkedBlockingQueue()
private val batchQueue: BlockingQueue<RawMessageBatch> = LinkedBlockingQueue()
private val batchFlusherExecutor = Executors.newSingleThreadScheduledExecutor()

Expand All @@ -36,20 +38,23 @@ class RawMessageProcessor(
val builders: MutableMap<String, BatchHolder> = HashMap()

while (true) {
val messageBuilder: RawMessage.Builder = messageQueue.take()
if (messageBuilder === TERMINAL_MESSAGE) break
val messageAndCallback = messageQueue.take()
if (messageAndCallback === TERMINAL_MESSAGE) break
val (messageBuilder, onMessageBuilt) = messageAndCallback

messageBuilder.metadataBuilder.apply {
messageBuilder.metadataBuilder.idBuilder.apply {
timestamp = Instant.now().toTimestamp()
idBuilder.sequence = when (messageBuilder.direction) {
sequence = when (messageBuilder.direction) {
Direction.FIRST -> firstSequence()
Direction.SECOND -> secondSequence()
else -> error("Unrecognized direction")
}
}

val sessionGroup: String = messageBuilder.metadata.id.connectionId.sessionGroup
builders.getOrPut(sessionGroup, ::BatchHolder).addMessage(messageBuilder)
val sessionGroup: String = checkNotNull(messageBuilder.sessionGroup) { "sessionGroup should be assigned to all messages" }
val message = messageBuilder.build()
onMessageBuilt(message)
builders.getOrPut(sessionGroup, ::BatchHolder).addMessage(message)
}

builders.values.forEach(BatchHolder::enqueueBatch)
Expand All @@ -65,10 +70,9 @@ class RawMessageProcessor(
private var flusherFuture: Future<*> = CompletableFuture.completedFuture(null)
private val lock: Lock = ReentrantLock()

fun addMessage(messageBuilder: RawMessage.Builder) = lock.withLock {
val message = messageBuilder.build()
fun addMessage(message: RawMessage) = lock.withLock {
batchBuilder.addMessages(message)
LOGGER.trace { "Message ${message.logId} added to batch." }
LOGGER.trace { "Message ${message.id.logId} added to batch." }
when (batchBuilder.messagesCount) {
1 -> flusherFuture = batchFlusherExecutor.schedule(::enqueueBatch, maxFlushTime, maxFlushTimeUnit)
maxBatchSize -> enqueueBatch()
Expand All @@ -92,8 +96,8 @@ class RawMessageProcessor(
}
}

fun onMessage(messageBuilder: RawMessage.Builder) {
messageQueue.add(messageBuilder)
fun onMessage(messageBuilder: RawMessage.Builder, onMessageBuilt: (RawMessage) -> Unit = EMPTY_CALLBACK) {
messageQueue.add(messageBuilder to onMessageBuilt)
}

override fun close() {
Expand All @@ -111,7 +115,8 @@ class RawMessageProcessor(

companion object {
private val LOGGER = KotlinLogging.logger {}
private val TERMINAL_MESSAGE = RawMessage.newBuilder()
private val EMPTY_CALLBACK: (RawMessage) -> Unit = {}
private val TERMINAL_MESSAGE: Pair<RawMessage.Builder, (RawMessage) -> Unit> = RawMessage.newBuilder() to EMPTY_CALLBACK
private val TERMINAL_BATCH = RawMessageBatch.newBuilder().build()
private const val TERMINATION_WAIT_TIMEOUT_MS = 5_000L

Expand Down
Loading

0 comments on commit 436e4a4

Please sign in to comment.