Skip to content

Commit

Permalink
apply formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Sep 26, 2024
1 parent abea7b0 commit a70233e
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 260 deletions.
203 changes: 96 additions & 107 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.touk.nussknacker.compatibility.common


import com.typesafe.config.ConfigValueFactory.fromAnyRef
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -21,17 +20,28 @@ import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder
import pl.touk.nussknacker.engine.flink.util.transformer.{FlinkBaseComponentProvider, FlinkKafkaComponentProvider}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaSpec, KafkaTestUtils, UnspecializedTopicName}
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer.{ProcessSettingsPreparer, UnoptimizedSerializationPreparer}
import pl.touk.nussknacker.engine.process.ExecutionConfigPreparer.{
ProcessSettingsPreparer,
UnoptimizedSerializationPreparer
}
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerDataFactory
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkJobConfig}
import pl.touk.nussknacker.engine.schemedkafka._
import pl.touk.nussknacker.engine.schemedkafka.encode.ToAvroSchemaBasedEncoder
import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{
MockConfluentSchemaRegistryClientFactory,
MockSchemaRegistryClient
}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaRegistryClientFactory, SchemaVersionOption}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ExistingSchemaVersion,
LatestSchemaVersion,
SchemaRegistryClientFactory,
SchemaVersionOption
}
import pl.touk.nussknacker.engine.testing.LocalModelData

import java.nio.charset.StandardCharsets
Expand All @@ -57,16 +67,20 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
import pl.touk.nussknacker.engine.spel.SpelExtension._
import KafkaUniversalComponentTransformer._

override lazy val config: Config = ConfigFactory.load()
override lazy val config: Config = ConfigFactory
.load()
.withValue("components.mockKafka.config.kafkaAddress", fromAnyRef(kafkaServer.kafkaAddress))
.withValue("components.mockKafka.config.lowLevelComponentsEnabled", fromAnyRef(true))
.withValue("components.kafka.disabled", fromAnyRef(true))
.withValue("components.mockKafka.disabled", fromAnyRef(false))
.withValue("components.mockKafka.config.kafkaProperties.\"schema.registry.url\"", fromAnyRef("not_used"))
// we turn off auto registration to do it on our own passing mocked schema registry client
//For tests we want to read from the beginning...
// For tests we want to read from the beginning...
.withValue("components.mockKafka.config.kafkaProperties.\"auto.offset.reset\"", fromAnyRef("earliest"))
.withValue(s"components.mockKafka.config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}", fromAnyRef(false))
.withValue(
s"components.mockKafka.config.kafkaEspProperties.${AvroSerializersRegistrar.autoRegisterRecordSchemaIdSerializationProperty}",
fromAnyRef(false)
)
.withValue("rocksDB.checkpointDataUri", fromAnyRef("file:///tmp/rocksDBCheckpointDataUri"))

private val secondsToWaitForAvro = 30
Expand All @@ -76,44 +90,53 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
protected val avroEncoder: ToAvroSchemaBasedEncoder = ToAvroSchemaBasedEncoder(ValidationMode.strict)

private val givenNotMatchingAvroObj = avroEncoder.encodeRecordOrError(
Map("first" -> "Zenon", "last" -> "Nowak"), RecordSchemaV1
Map("first" -> "Zenon", "last" -> "Nowak"),
RecordSchemaV1
)

private val givenMatchingAvroObj = avroEncoder.encodeRecordOrError(
Map("first" -> "Jan", "last" -> "Kowalski"), RecordSchemaV1
Map("first" -> "Jan", "last" -> "Kowalski"),
RecordSchemaV1
)

private val givenMatchingAvroObjConvertedToV2 = avroEncoder.encodeRecordOrError(
Map("first" -> "Jan", "middle" -> null, "last" -> "Kowalski"), RecordSchemaV2
Map("first" -> "Jan", "middle" -> null, "last" -> "Kowalski"),
RecordSchemaV2
)

private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError(
Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"), RecordSchemaV2
Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"),
RecordSchemaV2
)

private val givenSecondMatchingAvroObj = avroEncoder.encodeRecordOrError(
Map("firstname" -> "Jan"), SecondRecordSchemaV1
Map("firstname" -> "Jan"),
SecondRecordSchemaV1
)

private def avroProcess(topicConfig: TopicConfig, versionOption: SchemaVersionOption, validationMode: ValidationMode = ValidationMode.strict) =
private def avroProcess(
topicConfig: TopicConfig,
versionOption: SchemaVersionOption,
validationMode: ValidationMode = ValidationMode.strict
) =
ScenarioBuilder
.streaming("avro-test")
.parallelism(1)
.source(
"start",
"kafka",
topicParamName.value -> s"'${topicConfig.input}'".spel,
topicParamName.value -> s"'${topicConfig.input}'".spel,
schemaVersionParamName.value -> versionOptionParam(versionOption).spel
)
.filter("name-filter", "#input.first == 'Jan'".spel)
.emptySink(
"end",
"kafka",
sinkKeyParamName.value -> "".spel,
sinkRawEditorParamName.value -> "true".spel,
sinkValueParamName.value -> "#input".spel,
topicParamName.value -> s"'${topicConfig.output}'".spel,
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel,
sinkKeyParamName.value -> "".spel,
sinkRawEditorParamName.value -> "true".spel,
sinkValueParamName.value -> "#input".spel,
topicParamName.value -> s"'${topicConfig.output}'".spel,
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel,
sinkValidationModeParamName.value -> s"'${validationMode.name}'".spel
)

Expand All @@ -124,23 +147,23 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
.source(
"start",
"kafka",
topicParamName.value -> s"'${topicConfig.input}'".spel,
topicParamName.value -> s"'${topicConfig.input}'".spel,
schemaVersionParamName.value -> versionOptionParam(versionOption).spel
)
.emptySink(
"end",
"kafka",
sinkKeyParamName.value -> "".spel,
sinkRawEditorParamName.value -> "true".spel,
sinkValueParamName.value -> s"{first: #input.first, last: #input.last}".spel,
topicParamName.value -> s"'${topicConfig.output}'".spel,
sinkKeyParamName.value -> "".spel,
sinkRawEditorParamName.value -> "true".spel,
sinkValueParamName.value -> s"{first: #input.first, last: #input.last}".spel,
topicParamName.value -> s"'${topicConfig.output}'".spel,
sinkValidationModeParamName.value -> s"'${ValidationMode.strict.name}'".spel,
schemaVersionParamName.value -> "'1'".spel
schemaVersionParamName.value -> "'1'".spel
)

private def versionOptionParam(versionOption: SchemaVersionOption) =
versionOption match {
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case ExistingSchemaVersion(version) => s"'$version'"
}

Expand Down Expand Up @@ -198,7 +221,7 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
}

test("should throw exception when record doesn't match to schema") {
val topicConfig = createAndRegisterTopicConfig("error-record-matching", RecordSchemas)
val topicConfig = createAndRegisterTopicConfig("error-record-matching", RecordSchemas)
val secondTopicConfig = createAndRegisterTopicConfig("error-second-matching", SecondRecordSchemaV1)

sendAvro(givenSecondMatchingAvroObj, secondTopicConfig.input)
Expand All @@ -218,13 +241,14 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
consumer.consumeWithConsumerRecord(topic, secondsToWaitForAvro).head
}

private def consumeOneAvroMessage(topic: String) = valueDeserializer.deserialize(topic, consumeOneRawAvroMessage(topic).value())
private def consumeOneAvroMessage(topic: String) =
valueDeserializer.deserialize(topic, consumeOneRawAvroMessage(topic).value())

protected def creator: DefaultConfigCreator = new DefaultConfigCreator

private var registrar: FlinkProcessRegistrar = _
private lazy val valueSerializer = new KafkaAvroSerializer(schemaRegistryMockClient)
private lazy val valueDeserializer = new KafkaAvroDeserializer(schemaRegistryMockClient)
private lazy val valueSerializer = new KafkaAvroSerializer(schemaRegistryMockClient)
private lazy val valueDeserializer = new KafkaAvroDeserializer(schemaRegistryMockClient)

class MockFlinkKafkaComponentProvider extends FlinkKafkaComponentProvider {
override protected def schemaRegistryClientFactory: SchemaRegistryClientFactory =
Expand Down Expand Up @@ -257,8 +281,14 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
ProcessSettingsPreparer(modelData),
new UnoptimizedSerializationPreparer(modelData),
new ExecutionConfigPreparer {
override def prepareExecutionConfig(config: ExecutionConfig)(jobData: JobData, deploymentData: DeploymentData): Unit = {
AvroSerializersRegistrar.registerGenericRecordSchemaIdSerializationIfNeed(config, new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient), kafkaConfig)
override def prepareExecutionConfig(
config: ExecutionConfig
)(jobData: JobData, deploymentData: DeploymentData): Unit = {
AvroSerializersRegistrar.registerGenericRecordSchemaIdSerializationIfNeed(
config,
new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient),
kafkaConfig
)
}
}
)
Expand Down Expand Up @@ -288,9 +318,9 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
val topicConfig = TopicConfig(name, schemas)

schemas.foreach(schema => {
val inputSubject = ConfluentUtils.topicSubject(UnspecializedTopicName(topicConfig.input), topicConfig.isKey)
val inputSubject = ConfluentUtils.topicSubject(UnspecializedTopicName(topicConfig.input), topicConfig.isKey)
val outputSubject = ConfluentUtils.topicSubject(UnspecializedTopicName(topicConfig.output), topicConfig.isKey)
val parsedSchema = ConfluentUtils.convertToAvroSchema(schema)
val parsedSchema = ConfluentUtils.convertToAvroSchema(schema)
schemaRegistryMockClient.register(inputSubject, parsedSchema)
schemaRegistryMockClient.register(outputSubject, parsedSchema)
})
Expand All @@ -305,7 +335,7 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit
case class TopicConfig(input: String, output: String, schemas: List[Schema], isKey: Boolean)

object TopicConfig {
private final val inputPrefix = "test.generic.avro.input."
private final val inputPrefix = "test.generic.avro.input."
private final val outputPrefix = "test.generic.avro.output."

def apply(testName: String, schemas: List[Schema]): TopicConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetec
trait BaseSchemaCompatibilityTest extends AnyFunSuite with Matchers {

test("Cheks schema compatibility") {
val detection = TypeInformationDetection.instance
val typingResult = Typed.record(Map("int" -> Typed[Int]))
val detection = TypeInformationDetection.instance
val typingResult = Typed.record(Map("int" -> Typed[Int]))
val executionConfigWithoutKryo: ExecutionConfig = new ExecutionConfig
val serializerSnapshot = detection.forType(typingResult)
val serializerSnapshot = detection
.forType(typingResult)
.createSerializer(executionConfigWithoutKryo)
serializerSnapshot.snapshotConfiguration().resolveSchemaCompatibility(serializerSnapshot)
serializerSnapshot
.snapshotConfiguration()
.resolveSchemaCompatibility(serializerSnapshot)
.isCompatibleAsIs shouldBe true
}

}
Loading

0 comments on commit a70233e

Please sign in to comment.