Skip to content

Commit

Permalink
downgrade to common 3.44
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Smelov committed Feb 17, 2023
1 parent 436e4a4 commit 491d92e
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 150 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ spec:

## Release notes

### 0.1.0

+ Migrated to Books & Pages concept

### 0.0.3

+ Publishing to Kafka support
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ jar {

dependencies {
api platform('com.exactpro.th2:bom:4.1.0')
implementation 'com.exactpro.th2:common:5.1.0-dev-version-5-4085018593-8adee33-SNAPSHOT'
implementation 'com.exactpro.th2:common-utils:0.0.1-book-and-page-3836172629-SNAPSHOT'
implementation 'com.exactpro.th2:common:3.44.0'

implementation 'org.apache.kafka:kafka-clients:3.4.0'

Expand Down
19 changes: 8 additions & 11 deletions src/main/kotlin/com/exactpro/th2/kafka/client/KafkaConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package com.exactpro.th2.kafka.client
import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageMetadata
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.common.utils.message.sessionAlias
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.message.sessionAlias
import com.google.protobuf.UnsafeByteOperations
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
Expand All @@ -46,7 +45,6 @@ import java.util.concurrent.CompletableFuture

class KafkaConnection(
private val config: Config,
private val factory: CommonFactory,
private val messageProcessor: RawMessageProcessor,
private val eventSender: EventSender,
kafkaClientsFactory: KafkaClientsFactory
Expand All @@ -55,12 +53,11 @@ class KafkaConnection(
private val producer: Producer<String, ByteArray> = kafkaClientsFactory.getKafkaProducer()

fun publish(message: RawMessage) {
val alias = message.sessionAlias ?: error("Message '${message.id.logId}' does not contain session alias.")
val alias = message.sessionAlias
val kafkaStream = config.aliasToTopicAndKey[alias] ?: KafkaStream(config.aliasToTopic[alias]?.topic ?: error("Session alias '$alias' not found."), null)
val value = message.body.toByteArray()
val messageIdBuilder = message.id.toBuilder().apply {
val messageIdBuilder = message.metadata.id.toBuilder().apply {
direction = Direction.SECOND
bookName = factory.boxConfiguration.bookName
setConnectionId(connectionIdBuilder.setSessionGroup(config.aliasToSessionGroup.getValue(alias)))
}

Expand All @@ -76,11 +73,11 @@ class KafkaConnection(
producer.send(kafkaRecord) { _, exception: Throwable? ->
val outMessage = messageFuture.get()
if (exception == null) {
val msgText = "Message '${outMessage.id.logId}' sent to Kafka"
val msgText = "Message '${outMessage.logId}' sent to Kafka"
LOGGER.info(msgText)
eventSender.onEvent(msgText, "Send message", outMessage)
} else {
throw RuntimeException("Failed to send message '${outMessage.id.logId}' to Kafka", exception)
throw RuntimeException("Failed to send message '${outMessage.logId}' to Kafka", exception)
}
}
}
Expand Down Expand Up @@ -138,7 +135,7 @@ class KafkaConnection(
?: config.topicAndKeyToAlias[KafkaStream(record.topic(), record.key(), true)]
?: continue

val messageID = factory.newMessageIDBuilder()
val messageID = MessageID.newBuilder()
.setConnectionId(
ConnectionID.newBuilder()
.setSessionAlias(alias)
Expand Down
25 changes: 7 additions & 18 deletions src/main/kotlin/com/exactpro/th2/kafka/client/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.message.bookName
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.message.toJson
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.common.schema.message.DeliveryMetadata
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.kafka.client.utility.storeEvent
import mu.KotlinLogging
import java.util.Deque
Expand Down Expand Up @@ -73,34 +70,26 @@ fun main(args: Array<String>) {
}
}.apply { resources += "message processor" to ::close }

val eventSender = EventSender(factory.eventBatchRouter, factory.rootEventId)
val eventSender = EventSender(factory.eventBatchRouter, EventID.newBuilder().setId(factory.rootEventId).build())

if (config.createTopics) KafkaConnection.createTopics(config)
val connection = KafkaConnection(config, factory, messageProcessor, eventSender, KafkaClientsFactory(config))
val connection = KafkaConnection(config, messageProcessor, eventSender, KafkaClientsFactory(config))
.apply { resources += "kafka connection" to ::close }

Executors.newSingleThreadExecutor().apply {
resources += "executor service" to { this.shutdownNow() }
execute(connection)
}

val mqListener: (DeliveryMetadata, RawMessageBatch) -> Unit = { metadata, batch ->
val mqListener: (String, RawMessageBatch) -> Unit = { consumerTag, batch ->
LOGGER.trace { "Batch with ${batch.messagesCount} messages received from MQ"}
for (message in batch.messagesList) {
LOGGER.trace { "Message ${message.id.logId} extracted from batch." }

val bookName = message.bookName
if (bookName.isNotEmpty() && bookName != factory.boxConfiguration.bookName) {
val errorText = "Expected bookName: '${factory.boxConfiguration.bookName}', actual '$bookName' in message ${message.id.logId}"
LOGGER.error { errorText }
eventSender.onEvent(errorText, "Error", status = Event.Status.FAILED)
continue
}
LOGGER.trace { "Message ${message.logId} extracted from batch." }

runCatching {
connection.publish(message)
}.onFailure {
val errorText = "Could not publish message ${message.id.logId}. Consumer tag ${metadata.consumerTag}"
val errorText = "Could not publish message ${message.logId}. Consumer tag $consumerTag"
LOGGER.error(it) { errorText }
eventSender.onEvent(errorText, "SendError", message, it)
}
Expand Down Expand Up @@ -140,7 +129,7 @@ class EventSender(private val eventRouter: MessageRouter<EventBatch>, private va
.type(type)

if (message != null) {
event.messageID(message.id)
event.messageID(message.metadata.id)
}

if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package com.exactpro.th2.kafka.client
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.grpc.RawMessageBatch
import com.exactpro.th2.common.utils.message.id
import com.exactpro.th2.common.utils.message.logId
import com.exactpro.th2.common.utils.message.toTimestamp
import com.exactpro.th2.common.utils.message.direction
import com.exactpro.th2.common.utils.message.sessionGroup
import com.exactpro.th2.common.message.direction
import com.exactpro.th2.common.message.logId
import com.exactpro.th2.common.message.sessionGroup
import com.exactpro.th2.common.message.toTimestamp
import mu.KotlinLogging
import java.time.Instant
import java.util.concurrent.Executors
Expand Down Expand Up @@ -42,9 +41,9 @@ class RawMessageProcessor(
if (messageAndCallback === TERMINAL_MESSAGE) break
val (messageBuilder, onMessageBuilt) = messageAndCallback

messageBuilder.metadataBuilder.idBuilder.apply {
messageBuilder.metadataBuilder.apply {
timestamp = Instant.now().toTimestamp()
sequence = when (messageBuilder.direction) {
idBuilder.sequence = when (messageBuilder.direction) {
Direction.FIRST -> firstSequence()
Direction.SECOND -> secondSequence()
else -> error("Unrecognized direction")
Expand Down Expand Up @@ -72,7 +71,7 @@ class RawMessageProcessor(

fun addMessage(message: RawMessage) = lock.withLock {
batchBuilder.addMessages(message)
LOGGER.trace { "Message ${message.id.logId} added to batch." }
LOGGER.trace { "Message ${message.logId} added to batch." }
when (batchBuilder.messagesCount) {
1 -> flusherFuture = batchFlusherExecutor.schedule(::enqueueBatch, maxFlushTime, maxFlushTimeUnit)
maxBatchSize -> enqueueBatch()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,23 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@file:JvmName("EventStoreExtensions")
package com.exactpro.th2.kafka.client.utility

import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.event.EventUtils
import com.exactpro.th2.common.event.IBodyData
import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.schema.message.MessageRouter
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.google.protobuf.MessageOrBuilder
import com.google.protobuf.util.JsonFormat
import mu.KotlinLogging

private val LOGGER = KotlinLogging.logger { }
Expand All @@ -40,12 +32,6 @@ fun MessageRouter<EventBatch>.storeEvent(
parentEventId: EventID,
) = storeEvent(event.toProto(parentEventId))

@Throws(JsonProcessingException::class)
fun MessageRouter<EventBatch>.storeEvent(
event: Event,
bookName: String,
) = storeEvent(event.toProto(bookName))

@Throws(JsonProcessingException::class)
private fun MessageRouter<EventBatch>.storeEvent(
protoEvent: com.exactpro.th2.common.grpc.Event,
Expand All @@ -57,78 +43,4 @@ private fun MessageRouter<EventBatch>.storeEvent(
}
LOGGER.debug("Event {} sent", protoEvent.id.id)
return protoEvent.id
}

/**
* @param parentEventId the ID of the root parent that all events should be attached.
* @param events events to store
*/
@Throws(JsonProcessingException::class)
fun MessageRouter<EventBatch>.storeEvents(
parentEventId: EventID,
vararg events: Event,
) = storeEvents(
EventBatch.newBuilder().setParentEventId(parentEventId),
{ event -> event.toProto(parentEventId) },
*events
)

/**
* @param bookName the book name of the root parent that all events should be attached.
* Events will be stored as a root events (without attaching to any parent).
* @param events events to store
*/
@Throws(JsonProcessingException::class)
fun MessageRouter<EventBatch>.storeEvents(
bookName: String,
vararg events: Event,
) = storeEvents(
EventBatch.newBuilder(),
{ event -> event.toProto(bookName) },
*events
)

@Throws(JsonProcessingException::class)
private fun MessageRouter<EventBatch>.storeEvents(
batchBuilder: EventBatch.Builder,
toProto: (Event) -> com.exactpro.th2.common.grpc.Event,
vararg events: Event,
) {
try {
batchBuilder.apply {
for (event in events) {
addEvents(toProto(event))
}
}
send(batchBuilder.build())
} catch (e: Exception) {
throw RuntimeException("Events '${events.map { it.id }}' store failure", e)
}
LOGGER.debug("Events {} sent", events.map { it.id })
}

// TODO: maybe we should move it to common library
fun Event.addException(t: Throwable) {
var error: Throwable? = t
do {
bodyData(EventUtils.createMessageBean(error?.toString()))
error = error?.cause
} while (error != null)
}

// TODO: probably we should move it to common library
fun createProtoMessageBean(msg: MessageOrBuilder): IBodyData {
return ProtoMessageData(msg)
}

@JsonSerialize(using = ProtoMessageSerializer::class)
class ProtoMessageData(
@JsonIgnore
val messageOrBuilder: MessageOrBuilder
) : IBodyData

class ProtoMessageSerializer : StdSerializer<ProtoMessageData>(ProtoMessageData::class.java) {
override fun serialize(value: ProtoMessageData, gen: JsonGenerator, provider: SerializerProvider) {
gen.writeRawValue(JsonFormat.printer().print(value.messageOrBuilder))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
package com.exactpro.th2.kafka.client

import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.message.bookName
import com.exactpro.th2.common.utils.message.direction
import com.exactpro.th2.common.utils.message.sessionAlias
import com.exactpro.th2.common.utils.message.sessionGroup
import com.exactpro.th2.common.schema.box.configuration.BoxConfiguration
import com.exactpro.th2.common.schema.factory.CommonFactory
import com.exactpro.th2.common.message.direction
import com.exactpro.th2.common.message.sessionAlias
import com.exactpro.th2.common.message.sessionGroup
import java.time.Duration
import com.google.protobuf.UnsafeByteOperations
import org.apache.kafka.clients.consumer.Consumer
Expand All @@ -43,11 +39,6 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

class KafkaConnectionTest {
private val commonFactory: CommonFactory = mock {
on { boxConfiguration } doReturn BoxConfiguration().apply { bookName = "book_01" }
on { newMessageIDBuilder() } doReturn MessageID.newBuilder().setBookName("book_01")
}

private val testMessageText = "QWERTY"
private val messageProcessor: RawMessageProcessor = mock()
private val eventSender: EventSender = mock()
Expand Down Expand Up @@ -87,7 +78,6 @@ class KafkaConnectionTest {
aliasToTopicAndKey = mapOf("alias_03" to KafkaStream("topic_03", "key_03", true)),
sessionGroups = mapOf("group_01" to listOf("alias_01"))
),
commonFactory,
messageProcessor,
eventSender,
kafkaClientsFactory
Expand All @@ -108,7 +98,6 @@ class KafkaConnectionTest {
verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture(), processorCallbackCaptor.capture())

val outMessage = messageBuilderCaptor.firstValue.build()
assertThat(outMessage.bookName).isEqualTo("book_01")
assertThat(outMessage.sessionAlias).isEqualTo("alias_01")
assertThat(outMessage.sessionGroup).isEqualTo("group_01")
assertThat(outMessage.direction).isEqualTo(Direction.SECOND)
Expand Down Expand Up @@ -153,7 +142,6 @@ class KafkaConnectionTest {
verify(messageProcessor, only()).onMessage(messageBuilderCaptor.capture(), any())

val messageBuilder = messageBuilderCaptor.firstValue
assertThat(messageBuilder.bookName).isEqualTo("book_01")
assertThat(messageBuilder.sessionAlias).isEqualTo("alias_03")
assertThat(messageBuilder.sessionGroup).isEqualTo("alias_03") // if no group name provided sessionAlias is used as group name
assertThat(messageBuilder.direction).isEqualTo(Direction.FIRST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class RawMessageProcessorTest {
currentSequences[message.direction] = message.sequence

// verify timestamp ordering
val timestamp = with(message.metadata.id.timestamp) {
val timestamp = with(message.metadata.timestamp) {
TimeUnit.SECONDS.toNanos(seconds) + nanos
}
assertThat(timestamp)
Expand Down Expand Up @@ -164,7 +164,7 @@ class RawMessageProcessorTest {
.forEach { (_, batchesWithTimestamp) ->
batchesWithTimestamp.asSequence()
.forEachIndexed { index, (batch, endTimestamp) ->
val startTimestamp = batch.getMessages(0).metadata.id.timestamp.toInstant().toEpochMilli()
val startTimestamp = batch.getMessages(0).metadata.timestamp.toInstant().toEpochMilli()
val flushTime = endTimestamp - startTimestamp
if (index < batchesWithTimestamp.lastIndex) {
assertThat(flushTime)
Expand Down

0 comments on commit 491d92e

Please sign in to comment.