From 74d2871ef4b55e31a7a542d8450db3ec1aa6dd01 Mon Sep 17 00:00:00 2001 From: MetallFoX Date: Thu, 3 Jun 2021 20:45:02 +0300 Subject: [PATCH] Add embedded kafka system (#1) --- README.md | 1 + build.gradle | 2 +- env-mq-kafka-embedded/build.gradle | 11 ++++ .../mq/kafka/embedded/EmbeddedKafkaSystem.kt | 57 +++++++++++++++++++ .../kafka/embedded/EmbeddedKafkaSystemTest.kt | 35 ++++++++++++ settings.gradle | 1 + 6 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 env-mq-kafka-embedded/build.gradle create mode 100644 env-mq-kafka-embedded/src/main/java/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystem.kt create mode 100644 env-mq-kafka-embedded/src/test/kotlin/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystemTest.kt diff --git a/README.md b/README.md index 8a05bf8..9ab2ac3 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ class SomeEnvironment : Environment( mapOf( "RABBIT" to RabbitContainerSystem(), "IBMMQ" to IbmMQContainerSystem(), + "KAFKA" to KafkaContainerSystem(), "REDIS" to RedisContainerSystem(), "POSTGRES" to PostgreSqlContainerSystem(), "ORACLE" to OracleContainerSystem(), diff --git a/build.gradle b/build.gradle index 3a085da..993731d 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ buildscript { ext.testContainers_version = '1.15.2' ext.wiremock_version = '2.27.2' ext.klogging_version = '2.0.6' - ext.libVersion = "2.1.0" + ext.libVersion = "2.2.0" ext.libGroup = 'io.github.adven27' repositories { mavenCentral() diff --git a/env-mq-kafka-embedded/build.gradle b/env-mq-kafka-embedded/build.gradle new file mode 100644 index 0000000..739d828 --- /dev/null +++ b/env-mq-kafka-embedded/build.gradle @@ -0,0 +1,11 @@ +plugins { + id 'java-library' +} + +apply from: "$rootDir/gradle/publish.gradle" + +dependencies { + api project(':env-core') + api("org.springframework.kafka:spring-kafka-test:2.7.1") + testImplementation("org.jetbrains.kotlin:kotlin-test-junit") +} diff --git a/env-mq-kafka-embedded/src/main/java/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystem.kt b/env-mq-kafka-embedded/src/main/java/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystem.kt new file mode 100644 index 0000000..98958b2 --- /dev/null +++ b/env-mq-kafka-embedded/src/main/java/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystem.kt @@ -0,0 +1,57 @@ +package io.github.adven27.env.mq.kafka.embedded + +import io.github.adven27.env.core.Environment.Companion.setProperties +import io.github.adven27.env.core.Environment.Prop +import io.github.adven27.env.core.Environment.Prop.Companion.set +import io.github.adven27.env.core.ExternalSystem +import org.springframework.kafka.test.EmbeddedKafkaBroker + +open class EmbeddedKafkaSystem( + topics: Array, + @Suppress("SpreadOperator") + private val embeddedKafka: EmbeddedKafkaBroker = EmbeddedKafkaBroker( + NUMBER_OF_BROKERS, + CONTROLLED_SHUTDOWN, + NUMBER_OF_PARTITIONS, + *topics + ) +) : ExternalSystem { + private var config: Config = Config() + private var isRunning = false + + override fun running() = isRunning + + @Suppress("unused") + fun config(): Config = config + + override fun start() { + embeddedKafka.afterPropertiesSet() + config = Config(PROP_BOOTSTRAPSERVERS set embeddedKafka.brokersAsString) + isRunning = true + } + + override fun stop() { + embeddedKafka.destroy() + isRunning = false + } + + override fun describe() = super.describe() + "\n\t" + config.asMap().entries.joinToString("\n\t") { it.toString() } + + data class Config(val bootstrapServers: Prop = PROP_BOOTSTRAPSERVERS set "PLAINTEXT://localhost:$DEFAULT_KAFKA_PORT") { + init { + asMap().setProperties() + } + + fun asMap() = mapOf(bootstrapServers.pair()) + } + + companion object { + private const val DEFAULT_KAFKA_PORT = 9093 + + private const val NUMBER_OF_BROKERS = 1 + private const val NUMBER_OF_PARTITIONS = 1 + private const val CONTROLLED_SHUTDOWN = true + + const val PROP_BOOTSTRAPSERVERS = "env.mq.kafka.bootstrapServers" + } +} diff --git a/env-mq-kafka-embedded/src/test/kotlin/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystemTest.kt b/env-mq-kafka-embedded/src/test/kotlin/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystemTest.kt new file mode 100644 index 0000000..9c4fb9c --- /dev/null +++ b/env-mq-kafka-embedded/src/test/kotlin/io/github/adven27/env/mq/kafka/embedded/EmbeddedKafkaSystemTest.kt @@ -0,0 +1,35 @@ +package io.github.adven27.env.mq.kafka.embedded + +import io.github.adven27.env.core.Environment +import io.github.adven27.env.mq.kafka.embedded.EmbeddedKafkaSystem.Companion.PROP_BOOTSTRAPSERVERS +import org.junit.After +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertTrue +import org.junit.Test + +class EmbeddedKafkaSystemTest { + private lateinit var sut: SomeEnvironment + + @Test + fun embeddedKafkaSystemStartsInEnvironment() { + sut = SomeEnvironment().apply { up() } + + assertTrue(sut.kafka().running()) + + val bootstrapServers = sut.kafka().config().bootstrapServers.value + assertNotNull(bootstrapServers) + assertEquals(bootstrapServers, System.getProperty(PROP_BOOTSTRAPSERVERS)) + } + + @After + fun tearDown() { + sut.down() + } +} + +class SomeEnvironment : Environment( + "EMBEDDED_KAFKA" to EmbeddedKafkaSystem(topics = arrayOf("some-topic")) +) { + fun kafka() = systems["EMBEDDED_KAFKA"] as EmbeddedKafkaSystem +} diff --git a/settings.gradle b/settings.gradle index d2ba1f9..960d67a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ include 'env-db-postgresql' include 'env-db-db2' include 'env-db-oracle' include 'env-mq-kafka' +include 'env-mq-kafka-embedded' include 'env-mq-rabbit' include 'env-mq-redis' include 'env-mq-ibmmq'