diff --git a/README.md b/README.md index 7097296..dc74b3e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# KafkaConnect (0.0.3) +# KafkaConnect (0.1.0) The "KafkaConnect" component is responsible for the communication with Kafka; ## Configuration @@ -39,6 +39,7 @@ This configuration should be specified in the custom configuration block in sche timeSpanUnit : "MILLISECONDS" reconnectBackoffMs: 50 reconnectBackoffMaxMs: 1000 + kafkaConnectionEvents: true ``` Parameters: @@ -62,6 +63,7 @@ Parameters: + timeSpanUnit time unit for `timeSpan` + reconnectBackoffMs - The amount of time in milliseconds to wait before attempting to reconnect to a given host. Should be positive. + reconnectBackoffMaxMs - The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Should be positive. ++ kafkaConnectionEvents - Generate TH2 events on lost connection and restore connection to Kafka. `false` by default. ## Reconnect behaviour @@ -98,6 +100,10 @@ spec: ## Release notes +### 0.1.0 + ++ Migrated to Books & Pages concept + ### 0.0.3 + Publishing to Kafka support diff --git a/build.gradle b/build.gradle index d40665e..6884cab 100644 --- a/build.gradle +++ b/build.gradle @@ -46,7 +46,8 @@ jar { dependencies { api platform('com.exactpro.th2:bom:4.1.0') - implementation 'com.exactpro.th2:common:3.44.0' + implementation 'com.exactpro.th2:common:5.1.0-dev-version-5-4085018593-8adee33-SNAPSHOT' + implementation 'com.exactpro.th2:common-utils:0.0.1-book-and-page-3836172629-SNAPSHOT' implementation 'org.apache.kafka:kafka-clients:3.4.0' diff --git a/gradle.properties b/gradle.properties index 5bd2496..9ab1cd5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ kotlin.code.style=official kotlin_version=1.6.21 -release_version = 0.0.3 +release_version = 0.1.0 description='Kafka Client' vcs_url=https://github.com/th2-net/th2-conn-kafka diff --git a/src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt b/src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt index ebb62a7..4b91a3c 100644 --- a/src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt +++ b/src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt @@ -96,6 +96,11 @@ class Config( */ val reconnectBackoffMaxMs: Int = 1000, + /** + * Generate TH2 event on connect|disconnect Kafka + */ + val kafkaConnectionEvents: Boolean = false, + val createTopics: Boolean = false, val topicsToCreate: List = emptyList(), val newTopicsPartitions: Int = 1, diff --git a/src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt b/src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt index fb529c5..d34c57b 100644 --- a/src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt +++ b/src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt @@ -16,12 +16,15 @@ package com.exactpro.th2.kafka.client +import com.exactpro.th2.common.event.Event import com.exactpro.th2.common.grpc.ConnectionID import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.grpc.RawMessageMetadata -import com.exactpro.th2.common.message.logId +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.schema.factory.CommonFactory import com.google.protobuf.UnsafeByteOperations import mu.KotlinLogging import org.apache.kafka.clients.admin.AdminClient @@ -33,14 +36,17 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.TimeoutException import java.io.Closeable import java.time.Duration import java.time.Instant import java.util.HashSet import java.util.Collections +import java.util.concurrent.CompletableFuture class KafkaConnection( private val config: Config, + private val factory: CommonFactory, private val messageProcessor: RawMessageProcessor, private val eventSender: EventSender, kafkaClientsFactory: KafkaClientsFactory @@ -49,39 +55,67 @@ class KafkaConnection( private val producer: Producer = kafkaClientsFactory.getKafkaProducer() fun publish(message: RawMessage) { - val alias = message.metadata.id.connectionId.sessionAlias + val alias = message.sessionAlias ?: error("Message '${message.id.logId}' does not contain session alias.") val kafkaStream = config.aliasToTopicAndKey[alias] ?: KafkaStream(config.aliasToTopic[alias]?.topic ?: error("Session alias '$alias' not found."), null) val value = message.body.toByteArray() - val messageIdBuilder = message.metadata.id.toBuilder().apply { + val messageIdBuilder = message.id.toBuilder().apply { direction = Direction.SECOND + bookName = factory.boxConfiguration.bookName setConnectionId(connectionIdBuilder.setSessionGroup(config.aliasToSessionGroup.getValue(alias))) } + val messageFuture = CompletableFuture() messageProcessor.onMessage( RawMessage.newBuilder() .setMetadata(message.metadata.toBuilder().setId(messageIdBuilder)) - .setBody(message.body) + .setBody(message.body), + messageFuture::complete ) val kafkaRecord = ProducerRecord(kafkaStream.topic, kafkaStream.key, value) producer.send(kafkaRecord) { _, exception: Throwable? -> + val outMessage = messageFuture.get() if (exception == null) { - val msgText = "Message '${message.logId}' sent to Kafka" + val msgText = "Message '${outMessage.id.logId}' sent to Kafka" LOGGER.info(msgText) - eventSender.onEvent(msgText, "Send message", message) + eventSender.onEvent(msgText, "Send message", outMessage) } else { - throw RuntimeException("Failed to send message '${message.logId}' to Kafka", exception) + throw RuntimeException("Failed to send message '${outMessage.id.logId}' to Kafka", exception) } } } + private fun isKafkaAvailable(): Boolean = try { + consumer.listTopics(POLL_TIMEOUT) + true + } catch (e: TimeoutException) { + false + } + override fun run() = try { val startTimestamp = Instant.now().toEpochMilli() consumer.subscribe(config.topicToAlias.keys + config.topicAndKeyToAlias.map { it.key.topic }) while (!Thread.currentThread().isInterrupted) { val records: ConsumerRecords = consumer.poll(POLL_TIMEOUT) - if (records.isEmpty) continue + + if (records.isEmpty) { + if (config.kafkaConnectionEvents && !isKafkaAvailable()) { + val failedToConnectMessage = "Failed to connect Kafka" + LOGGER.error(failedToConnectMessage) + eventSender.onEvent(failedToConnectMessage, CONNECTIVITY_EVENT_TYPE, status = Event.Status.FAILED) + + while (!Thread.currentThread().isInterrupted && !isKafkaAvailable()) { + /* wait for connection */ + } + + val connectionRestoredMessage = "Kafka connection restored" + LOGGER.info(connectionRestoredMessage) + eventSender.onEvent(connectionRestoredMessage, CONNECTIVITY_EVENT_TYPE) + } + continue + } + LOGGER.trace { "Batch with ${records.count()} records polled from Kafka" } val topicsToSkip: MutableSet = HashSet() @@ -96,7 +130,7 @@ class KafkaConnection( val msgText = "Inactivity period exceeded ($inactivityPeriod ms). Skipping unread messages in '$topicToSkip' topic." LOGGER.info { msgText } - eventSender.onEvent(msgText, "ConnectivityServiceEvent") + eventSender.onEvent(msgText, CONNECTIVITY_EVENT_TYPE) } else { if (record.topic() in topicsToSkip) continue @@ -104,7 +138,7 @@ class KafkaConnection( ?: config.topicAndKeyToAlias[KafkaStream(record.topic(), record.key(), true)] ?: continue - val messageID = MessageID.newBuilder() + val messageID = factory.newMessageIDBuilder() .setConnectionId( ConnectionID.newBuilder() .setSessionAlias(alias) @@ -127,6 +161,8 @@ class KafkaConnection( } } } + } catch (e: InterruptedException) { + LOGGER.info("Polling thread interrupted") } catch (e: Exception) { val errorMessage = "Failed to read messages from Kafka" LOGGER.error(errorMessage, e) @@ -143,6 +179,7 @@ class KafkaConnection( companion object { private val LOGGER = KotlinLogging.logger {} private val POLL_TIMEOUT = Duration.ofMillis(100L) + private const val CONNECTIVITY_EVENT_TYPE = "ConnectivityServiceEvent" fun createTopics(config: Config) { if (config.topicsToCreate.isEmpty()) return diff --git a/src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt b/src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt index e8a6c60..e5356ef 100644 --- a/src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt @@ -19,15 +19,17 @@ package com.exactpro.th2.kafka.client import com.exactpro.th2.common.event.Event -import com.exactpro.th2.common.event.EventUtils.toEventID import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.EventID import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.grpc.RawMessageBatch -import com.exactpro.th2.common.message.logId +import com.exactpro.th2.common.message.bookName import com.exactpro.th2.common.message.toJson import com.exactpro.th2.common.schema.factory.CommonFactory +import com.exactpro.th2.common.schema.message.DeliveryMetadata import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId import com.exactpro.th2.kafka.client.utility.storeEvent import mu.KotlinLogging import java.util.Deque @@ -65,14 +67,16 @@ fun main(args: Array) { val messageProcessor = RawMessageProcessor(config.batchSize, config.timeSpan, config.timeSpanUnit) { LOGGER.trace { "Sending batch with ${it.messagesCount} messages to MQ." } it.runCatching(messageRouterRawBatch::send) - .onFailure { e -> LOGGER.error(e) { "Could not send message batch to MQ: ${it.toJson()}" } } + .onFailure { e -> LOGGER.error(e) { + it.messagesOrBuilderList + "Could not send message batch to MQ: ${it.toJson()}" } + } }.apply { resources += "message processor" to ::close } - val rootEventId = toEventID(factory.rootEventId) ?: error("Failed to get root event id") - val eventSender = EventSender(factory.eventBatchRouter, rootEventId) + val eventSender = EventSender(factory.eventBatchRouter, factory.rootEventId) if (config.createTopics) KafkaConnection.createTopics(config) - val connection = KafkaConnection(config, messageProcessor, eventSender, KafkaClientsFactory(config)) + val connection = KafkaConnection(config, factory, messageProcessor, eventSender, KafkaClientsFactory(config)) .apply { resources += "kafka connection" to ::close } Executors.newSingleThreadExecutor().apply { @@ -80,15 +84,23 @@ fun main(args: Array) { execute(connection) } - val mqListener: (String, RawMessageBatch) -> Unit = { consumerTag, batch -> + val mqListener: (DeliveryMetadata, RawMessageBatch) -> Unit = { metadata, batch -> LOGGER.trace { "Batch with ${batch.messagesCount} messages received from MQ"} for (message in batch.messagesList) { - LOGGER.trace { "Message ${message.logId} extracted from batch." } + LOGGER.trace { "Message ${message.id.logId} extracted from batch." } + + val bookName = message.bookName + if (bookName.isNotEmpty() && bookName != factory.boxConfiguration.bookName) { + val errorText = "Expected bookName: '${factory.boxConfiguration.bookName}', actual '$bookName' in message ${message.id.logId}" + LOGGER.error { errorText } + eventSender.onEvent(errorText, "Error", status = Event.Status.FAILED) + continue + } runCatching { connection.publish(message) }.onFailure { - val errorText = "Could not publish message ${message.logId}. Consumer tag $consumerTag" + val errorText = "Could not publish message ${message.id.logId}. Consumer tag ${metadata.consumerTag}" LOGGER.error(it) { errorText } eventSender.onEvent(errorText, "SendError", message, it) } @@ -102,7 +114,6 @@ fun main(args: Array) { }.onFailure { throw IllegalStateException("Failed to subscribe to input queue", it) } - }.onFailure { LOGGER.error(it) { "Error during working with Kafka connection. Exiting the program" } exitProcess(2) @@ -119,7 +130,8 @@ class EventSender(private val eventRouter: MessageRouter, private va type: String, message: RawMessage? = null, exception: Throwable? = null, - status: Event.Status? = null + status: Event.Status? = null, + parentEventId: EventID? = null ) { val event = Event .start() @@ -128,7 +140,7 @@ class EventSender(private val eventRouter: MessageRouter, private va .type(type) if (message != null) { - event.messageID(message.metadata.id) + event.messageID(message.id) } if (exception != null) { @@ -139,6 +151,6 @@ class EventSender(private val eventRouter: MessageRouter, private va event.status(status) } - eventRouter.storeEvent(event, message?.parentEventId ?: rootEventId) + eventRouter.storeEvent(event, parentEventId ?: rootEventId) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessor.kt b/src/main/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessor.kt index ad565a6..0082987 100644 --- a/src/main/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessor.kt +++ b/src/main/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessor.kt @@ -3,9 +3,11 @@ package com.exactpro.th2.kafka.client import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.grpc.RawMessageBatch -import com.exactpro.th2.common.message.direction -import com.exactpro.th2.common.message.logId -import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.utils.message.id +import com.exactpro.th2.common.utils.message.logId +import com.exactpro.th2.common.utils.message.toTimestamp +import com.exactpro.th2.common.utils.message.direction +import com.exactpro.th2.common.utils.message.sessionGroup import mu.KotlinLogging import java.time.Instant import java.util.concurrent.Executors @@ -26,7 +28,7 @@ class RawMessageProcessor( private val maxFlushTimeUnit: TimeUnit, private val onBatch: (RawMessageBatch) -> Unit ) : AutoCloseable { - private val messageQueue: BlockingQueue = LinkedBlockingQueue() + private val messageQueue: BlockingQueue Unit>> = LinkedBlockingQueue() private val batchQueue: BlockingQueue = LinkedBlockingQueue() private val batchFlusherExecutor = Executors.newSingleThreadScheduledExecutor() @@ -36,20 +38,23 @@ class RawMessageProcessor( val builders: MutableMap = HashMap() while (true) { - val messageBuilder: RawMessage.Builder = messageQueue.take() - if (messageBuilder === TERMINAL_MESSAGE) break + val messageAndCallback = messageQueue.take() + if (messageAndCallback === TERMINAL_MESSAGE) break + val (messageBuilder, onMessageBuilt) = messageAndCallback - messageBuilder.metadataBuilder.apply { + messageBuilder.metadataBuilder.idBuilder.apply { timestamp = Instant.now().toTimestamp() - idBuilder.sequence = when (messageBuilder.direction) { + sequence = when (messageBuilder.direction) { Direction.FIRST -> firstSequence() Direction.SECOND -> secondSequence() else -> error("Unrecognized direction") } } - val sessionGroup: String = messageBuilder.metadata.id.connectionId.sessionGroup - builders.getOrPut(sessionGroup, ::BatchHolder).addMessage(messageBuilder) + val sessionGroup: String = checkNotNull(messageBuilder.sessionGroup) { "sessionGroup should be assigned to all messages" } + val message = messageBuilder.build() + onMessageBuilt(message) + builders.getOrPut(sessionGroup, ::BatchHolder).addMessage(message) } builders.values.forEach(BatchHolder::enqueueBatch) @@ -65,10 +70,9 @@ class RawMessageProcessor( private var flusherFuture: Future<*> = CompletableFuture.completedFuture(null) private val lock: Lock = ReentrantLock() - fun addMessage(messageBuilder: RawMessage.Builder) = lock.withLock { - val message = messageBuilder.build() + fun addMessage(message: RawMessage) = lock.withLock { batchBuilder.addMessages(message) - LOGGER.trace { "Message ${message.logId} added to batch." } + LOGGER.trace { "Message ${message.id.logId} added to batch." } when (batchBuilder.messagesCount) { 1 -> flusherFuture = batchFlusherExecutor.schedule(::enqueueBatch, maxFlushTime, maxFlushTimeUnit) maxBatchSize -> enqueueBatch() @@ -92,8 +96,8 @@ class RawMessageProcessor( } } - fun onMessage(messageBuilder: RawMessage.Builder) { - messageQueue.add(messageBuilder) + fun onMessage(messageBuilder: RawMessage.Builder, onMessageBuilt: (RawMessage) -> Unit = EMPTY_CALLBACK) { + messageQueue.add(messageBuilder to onMessageBuilt) } override fun close() { @@ -111,7 +115,8 @@ class RawMessageProcessor( companion object { private val LOGGER = KotlinLogging.logger {} - private val TERMINAL_MESSAGE = RawMessage.newBuilder() + private val EMPTY_CALLBACK: (RawMessage) -> Unit = {} + private val TERMINAL_MESSAGE: Pair Unit> = RawMessage.newBuilder() to EMPTY_CALLBACK private val TERMINAL_BATCH = RawMessageBatch.newBuilder().build() private const val TERMINATION_WAIT_TIMEOUT_MS = 5_000L diff --git a/src/main/kotlin/com/exactpro/th2/kafka/client/utility/EventStoreExtensions.kt b/src/main/kotlin/com/exactpro/th2/kafka/client/utility/EventStoreExtensions.kt index 8ebbd94..9f88576 100644 --- a/src/main/kotlin/com/exactpro/th2/kafka/client/utility/EventStoreExtensions.kt +++ b/src/main/kotlin/com/exactpro/th2/kafka/client/utility/EventStoreExtensions.kt @@ -17,6 +17,7 @@ package com.exactpro.th2.kafka.client.utility import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.event.EventUtils import com.exactpro.th2.common.event.IBodyData import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.EventID @@ -39,6 +40,12 @@ fun MessageRouter.storeEvent( parentEventId: EventID, ) = storeEvent(event.toProto(parentEventId)) +@Throws(JsonProcessingException::class) +fun MessageRouter.storeEvent( + event: Event, + bookName: String, +) = storeEvent(event.toProto(bookName)) + @Throws(JsonProcessingException::class) private fun MessageRouter.storeEvent( protoEvent: com.exactpro.th2.common.grpc.Event, @@ -52,6 +59,68 @@ private fun MessageRouter.storeEvent( return protoEvent.id } +/** + * @param parentEventId the ID of the root parent that all events should be attached. + * @param events events to store + */ +@Throws(JsonProcessingException::class) +fun MessageRouter.storeEvents( + parentEventId: EventID, + vararg events: Event, +) = storeEvents( + EventBatch.newBuilder().setParentEventId(parentEventId), + { event -> event.toProto(parentEventId) }, + *events +) + +/** + * @param bookName the book name of the root parent that all events should be attached. + * Events will be stored as a root events (without attaching to any parent). + * @param events events to store + */ +@Throws(JsonProcessingException::class) +fun MessageRouter.storeEvents( + bookName: String, + vararg events: Event, +) = storeEvents( + EventBatch.newBuilder(), + { event -> event.toProto(bookName) }, + *events +) + +@Throws(JsonProcessingException::class) +private fun MessageRouter.storeEvents( + batchBuilder: EventBatch.Builder, + toProto: (Event) -> com.exactpro.th2.common.grpc.Event, + vararg events: Event, +) { + try { + batchBuilder.apply { + for (event in events) { + addEvents(toProto(event)) + } + } + send(batchBuilder.build()) + } catch (e: Exception) { + throw RuntimeException("Events '${events.map { it.id }}' store failure", e) + } + LOGGER.debug("Events {} sent", events.map { it.id }) +} + +// TODO: maybe we should move it to common library +fun Event.addException(t: Throwable) { + var error: Throwable? = t + do { + bodyData(EventUtils.createMessageBean(error?.toString())) + error = error?.cause + } while (error != null) +} + +// TODO: probably we should move it to common library +fun createProtoMessageBean(msg: MessageOrBuilder): IBodyData { + return ProtoMessageData(msg) +} + @JsonSerialize(using = ProtoMessageSerializer::class) class ProtoMessageData( @JsonIgnore diff --git a/src/test/kotlin/com/exactpro/th2/kafka/client/ConfigTest.kt b/src/test/kotlin/com/exactpro/th2/kafka/client/ConfigTest.kt index 5dce072..a14a510 100644 --- a/src/test/kotlin/com/exactpro/th2/kafka/client/ConfigTest.kt +++ b/src/test/kotlin/com/exactpro/th2/kafka/client/ConfigTest.kt @@ -16,10 +16,8 @@ package com.exactpro.th2.kafka.client -import org.junit.jupiter.api.Assertions.* +import org.assertj.core.api.Assertions.assertThatThrownBy import kotlin.test.Test -import kotlin.test.assertContains -import kotlin.test.assertFailsWith class ConfigTest { @Test @@ -42,57 +40,50 @@ class ConfigTest { @Test fun `empty alias mappings`() { - val e = assertFailsWith { - Config() - } - assertNotNull(e.message) - assertContains(e.message!!, "aliasToTopic") - assertContains(e.message!!, "aliasToTopicAndKey") + assertThatThrownBy { Config() } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll("aliasToTopic", "aliasToTopicAndKey") } @Test fun `duplicated aliases`() { - val e = assertFailsWith { - Config( - aliasToTopic = mapOf("alias_01" to KafkaTopic("topic_01"), "alias_02" to KafkaTopic("topic_02")), - aliasToTopicAndKey = mapOf("alias_03" to KafkaStream("topic_03", null), "alias_01" to KafkaStream("topic_04", null)) - ) - } - assertNotNull(e.message) - assertContains(e.message!!, "aliasToTopic") - assertContains(e.message!!, "aliasToTopicAndKey") + assertThatThrownBy { Config( + aliasToTopic = mapOf("alias_01" to KafkaTopic("topic_01"), "alias_02" to KafkaTopic("topic_02")), + aliasToTopicAndKey = mapOf("alias_03" to KafkaStream("topic_03", null), "alias_01" to KafkaStream("topic_04", null)) + ) } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll("aliasToTopic", "aliasToTopicAndKey") } @Test fun `duplicated topic in aliasToTopic`() { - val e = assertFailsWith { + assertThatThrownBy { Config(aliasToTopic = mapOf( "alias_01" to KafkaTopic("topic_01"), "alias_02" to KafkaTopic("topic_02"), "alias_03" to KafkaTopic("topic_01") )) } - assertNotNull(e.message) - assertContains(e.message!!, "aliasToTopic") + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll("aliasToTopic") } @Test fun `duplicated stream in aliasToTopicAndKey`() { - val e = assertFailsWith { + assertThatThrownBy { Config(aliasToTopicAndKey = mapOf( "alias_01" to KafkaStream("topic_01", null), "alias_02" to KafkaStream("topic_02", null), "alias_03" to KafkaStream("topic_01", null) )) } - assertNotNull(e.message) - assertContains(e.message!!, "aliasToTopic") - assertContains(e.message!!, "aliasToTopicAndKey") + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll ("aliasToTopic", "aliasToTopicAndKey") } @Test fun `same topics in aliasToTopicAndKey and aliasToTopic`() { - val e = assertFailsWith { + assertThatThrownBy { Config( aliasToTopic = mapOf( "alias_01" to KafkaTopic("topic_01"), @@ -105,14 +96,13 @@ class ConfigTest { ) ) } - assertNotNull(e.message) - assertContains(e.message!!, "aliasToTopic") - assertContains(e.message!!, "aliasToTopicAndKey") + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll ("aliasToTopic", "aliasToTopicAndKey") } @Test fun `duplicated alias in sessionGroups`() { - val e = assertFailsWith { + assertThatThrownBy { Config( aliasToTopic = mapOf("alias_01" to KafkaTopic("topic_01")), sessionGroups = mapOf( @@ -122,7 +112,7 @@ class ConfigTest { ) ) } - assertNotNull(e.message) - assertContains(e.message!!, "sessionGroups") + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContainingAll ("sessionGroups") } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/kafka/client/KafkaConnectionTest.kt b/src/test/kotlin/com/exactpro/th2/kafka/client/KafkaConnectionTest.kt index c068873..32f7c35 100644 --- a/src/test/kotlin/com/exactpro/th2/kafka/client/KafkaConnectionTest.kt +++ b/src/test/kotlin/com/exactpro/th2/kafka/client/KafkaConnectionTest.kt @@ -17,10 +17,14 @@ package com.exactpro.th2.kafka.client import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.common.grpc.RawMessage -import com.exactpro.th2.common.message.direction -import com.exactpro.th2.common.message.sessionAlias -import com.exactpro.th2.common.message.sessionGroup +import com.exactpro.th2.common.message.bookName +import com.exactpro.th2.common.utils.message.direction +import com.exactpro.th2.common.utils.message.sessionAlias +import com.exactpro.th2.common.utils.message.sessionGroup +import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration +import com.exactpro.th2.common.schema.factory.CommonFactory import java.time.Duration import com.google.protobuf.UnsafeByteOperations import org.apache.kafka.clients.consumer.Consumer @@ -29,17 +33,21 @@ import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerRecord -import org.junit.jupiter.api.Assertions.* +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import kotlin.test.Test import org.mockito.kotlin.* import java.lang.IllegalStateException import java.time.Instant import java.util.concurrent.Executors import java.util.concurrent.TimeUnit -import kotlin.test.Test -import kotlin.test.assertFailsWith - class KafkaConnectionTest { + private val commonFactory: CommonFactory = mock { + on { boxConfiguration } doReturn BoxConfiguration().apply { bookName = "book_01" } + on { newMessageIDBuilder() } doReturn MessageID.newBuilder().setBookName("book_01") + } + private val testMessageText = "QWERTY" private val messageProcessor: RawMessageProcessor = mock() private val eventSender: EventSender = mock() @@ -79,6 +87,7 @@ class KafkaConnectionTest { aliasToTopicAndKey = mapOf("alias_03" to KafkaStream("topic_03", "key_03", true)), sessionGroups = mapOf("group_01" to listOf("alias_01")) ), + commonFactory, messageProcessor, eventSender, kafkaClientsFactory @@ -94,25 +103,29 @@ class KafkaConnectionTest { connection.publish(testMessage) - val recordCaptor = argumentCaptor>() - val callbackCaptor = argumentCaptor() - verify(kafkaProducer, only()).send(recordCaptor.capture(), callbackCaptor.capture()) + val messageBuilderCaptor = argumentCaptor() + val processorCallbackCaptor = argumentCaptor<(RawMessage) -> Unit>() + verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture(), processorCallbackCaptor.capture()) - callbackCaptor.firstValue.onCompletion(null, null) - val kafkaRecord = recordCaptor.firstValue - assertEquals(kafkaRecord.topic(), "topic_01") - assertEquals(null, kafkaRecord.key()) - assertEquals(testMessageText, String(kafkaRecord.value())) - verify(eventSender, only()).onEvent(any(), eq("Send message"), eq(testMessage), eq(null), eq(null)) + val outMessage = messageBuilderCaptor.firstValue.build() + assertThat(outMessage.bookName).isEqualTo("book_01") + assertThat(outMessage.sessionAlias).isEqualTo("alias_01") + assertThat(outMessage.sessionGroup).isEqualTo("group_01") + assertThat(outMessage.direction).isEqualTo(Direction.SECOND) + assertThat(outMessage.body.toStringUtf8()).isEqualTo(testMessageText) - val messageBuilderCaptor = argumentCaptor() - verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture()) + processorCallbackCaptor.firstValue.invoke(outMessage) // complete CompletableFuture - val messageBuilder = messageBuilderCaptor.firstValue - assertEquals("alias_01", messageBuilder.sessionAlias) - assertEquals("group_01", messageBuilder.sessionGroup) - assertEquals(Direction.SECOND, messageBuilder.direction) - assertEquals(testMessageText, messageBuilder.body.toStringUtf8()) + val recordCaptor = argumentCaptor>() + val producerCallbackCaptor = argumentCaptor() + verify(kafkaProducer, only()).send(recordCaptor.capture(), producerCallbackCaptor.capture()) + + producerCallbackCaptor.firstValue.onCompletion(null, null) + val kafkaRecord = recordCaptor.firstValue + assertThat(kafkaRecord.topic()).isEqualTo("topic_01") + assertThat(kafkaRecord.key()).isNull() + assertThat(String(kafkaRecord.value())).isEqualTo(testMessageText) + verify(eventSender, only()).onEvent(any(), eq("Send message"), eq(outMessage), eq(null), eq(null), eq(null)) } @Test @@ -123,9 +136,9 @@ class KafkaConnectionTest { } .build() - assertFailsWith { - connection.publish(testMessage) - } + assertThatThrownBy { connection.publish(testMessage) } + .isInstanceOf(IllegalStateException::class.java) + .hasMessageContaining("alias") } @Test @@ -137,12 +150,13 @@ class KafkaConnectionTest { executor.awaitTermination(1000, TimeUnit.SECONDS) val messageBuilderCaptor = argumentCaptor() - verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture()) + verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture(), any()) val messageBuilder = messageBuilderCaptor.firstValue - assertEquals("alias_03", messageBuilder.sessionAlias) - assertEquals("alias_03", messageBuilder.sessionGroup) - assertEquals(Direction.FIRST, messageBuilder.direction) - assertEquals(testMessageText, messageBuilder.body.toStringUtf8()) + assertThat(messageBuilder.bookName).isEqualTo("book_01") + assertThat(messageBuilder.sessionAlias).isEqualTo("alias_03") + assertThat(messageBuilder.sessionGroup).isEqualTo("alias_03") // if no group name provided sessionAlias is used as group name + assertThat(messageBuilder.direction).isEqualTo(Direction.FIRST) + assertThat(messageBuilder.body.toStringUtf8()).isEqualTo(testMessageText) } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessorTest.kt b/src/test/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessorTest.kt index c967b51..f74f621 100644 --- a/src/test/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessorTest.kt +++ b/src/test/kotlin/com/exactpro/th2/kafka/client/RawMessageProcessorTest.kt @@ -27,7 +27,7 @@ import com.exactpro.th2.common.message.sequence import com.exactpro.th2.common.util.toInstant import org.assertj.core.api.Assertions.assertThat import org.assertj.core.data.Percentage -import org.junit.jupiter.api.Test +import kotlin.test.Test import java.time.Instant import java.util.EnumMap import java.util.Random @@ -135,7 +135,7 @@ class RawMessageProcessorTest { currentSequences[message.direction] = message.sequence // verify timestamp ordering - val timestamp = with(message.metadata.timestamp) { + val timestamp = with(message.metadata.id.timestamp) { TimeUnit.SECONDS.toNanos(seconds) + nanos } assertThat(timestamp) @@ -164,7 +164,7 @@ class RawMessageProcessorTest { .forEach { (_, batchesWithTimestamp) -> batchesWithTimestamp.asSequence() .forEachIndexed { index, (batch, endTimestamp) -> - val startTimestamp = batch.getMessages(0).metadata.timestamp.toInstant().toEpochMilli() + val startTimestamp = batch.getMessages(0).metadata.id.timestamp.toInstant().toEpochMilli() val flushTime = endTimestamp - startTimestamp if (index < batchesWithTimestamp.lastIndex) { assertThat(flushTime)