Skip to content

Commit

Permalink
feat: support kafka connect built-in types (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince authored Jan 9, 2025
1 parent 7ecee27 commit dca9d21
Show file tree
Hide file tree
Showing 9 changed files with 678 additions and 155 deletions.
434 changes: 292 additions & 142 deletions common/src/main/kotlin/org/neo4j/connectors/kafka/data/DynamicTypes.kt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.connectors.kafka.data

import io.kotest.matchers.shouldBe
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.ZoneOffset
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.data.DynamicTypes.MILLIS_PER_DAY

class DynamicTypesTest {
@Test
fun `kafka connect date values should be returned as local date`() {
DynamicTypes.fromConnectValue(
Date.SCHEMA,
java.util.Date(LocalDate.of(2000, 1, 1).toEpochDay() * MILLIS_PER_DAY)) shouldBe
LocalDate.of(2000, 1, 1)
}

@Test
fun `kafka connect time values should be returned as local time`() {
DynamicTypes.fromConnectValue(
Time.SCHEMA,
java.util.Date(
OffsetDateTime.of(
LocalDate.EPOCH, LocalTime.of(23, 59, 59, 999_000_000), ZoneOffset.UTC)
.toInstant()
.toEpochMilli())) shouldBe LocalTime.of(23, 59, 59, 999_000_000)
}

@Test
fun `kafka connect timestamp values should be returned as local time`() {
DynamicTypes.fromConnectValue(
Timestamp.SCHEMA,
java.util.Date(
OffsetDateTime.of(
LocalDate.of(2000, 1, 1), LocalTime.of(23, 59, 59, 999_000_000), ZoneOffset.UTC)
.toInstant()
.toEpochMilli())) shouldBe LocalDateTime.of(2000, 1, 1, 23, 59, 59, 999_000_000)
}

@Test
fun `kafka connect decimal values should be returned as string`() {
DynamicTypes.fromConnectValue(
Decimal.schema(4), BigDecimal(BigInteger("12345678901234567890"), 4)) shouldBe
"1234567890123456.7890"
DynamicTypes.fromConnectValue(
Decimal.schema(6), BigDecimal(BigInteger("12345678901234567890"), 6)) shouldBe
"12345678901234.567890"
DynamicTypes.fromConnectValue(
Decimal.schema(6), BigDecimal(BigInteger("123456000000"), 6)) shouldBe "123456.000000"
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<netty.version>4.1.116.Final</netty.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- to match what is shipped with the Kafka Connect version used in our integration test environment -->
<protobuf.version>3.24.4</protobuf.version>
<protobuf.version>3.25.5</protobuf.version>
<reactor.version>2024.0.1</reactor.version>
<slf4j.version>1.7.36</slf4j.version>
<sortpom-maven-plugin.version>4.0.0</sortpom-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import java.math.BigDecimal
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.data.DynamicTypes
import org.neo4j.connectors.kafka.data.PropertyType
import org.neo4j.connectors.kafka.data.PropertyType.schema
import org.neo4j.connectors.kafka.testing.DateSupport
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
Expand Down Expand Up @@ -630,6 +638,62 @@ abstract class Neo4jCudIT {
}
}

@Neo4jSink(cud = [CudStrategy(TOPIC)])
@Test
fun `should create node from struct with connect types`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {
session.run("CREATE CONSTRAINT FOR (n:Foo) REQUIRE n.id IS KEY").consume()
session.run("CREATE CONSTRAINT FOR (n:Bar) REQUIRE n.id IS KEY").consume()

val propertiesSchema =
SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("height", Decimal.schema(2))
.field("dob", Date.SCHEMA)
.field("tob", Time.SCHEMA)
.field("tsob", Timestamp.SCHEMA)
.build()
val createNodeSchema =
SchemaBuilder.struct()
.field("type", Schema.STRING_SCHEMA)
.field("op", Schema.STRING_SCHEMA)
.field("labels", SchemaBuilder.array(Schema.STRING_SCHEMA))
.field("properties", propertiesSchema)
.build()

producer.publish(
valueSchema = createNodeSchema,
value =
Struct(createNodeSchema)
.put("type", "node")
.put("op", "create")
.put("labels", listOf("Foo", "Bar"))
.put(
"properties",
Struct(propertiesSchema)
.put("id", 1L)
.put("height", BigDecimal.valueOf(185, 2))
.put("dob", DateSupport.date(1978, 1, 15))
.put("tob", DateSupport.time(7, 45, 12, 999))
.put("tsob", DateSupport.timestamp(1978, 1, 15, 7, 45, 12, 999))))

eventually(30.seconds) { session.run("MATCH (n) RETURN n", emptyMap()).single() }
.get("n")
.asNode() should
{
it.labels() shouldBe listOf("Foo", "Bar")
it.asMap() shouldBe
mapOf(
"id" to 1L,
"height" to "1.85",
"dob" to LocalDate.of(1978, 1, 15),
"tob" to LocalTime.of(7, 45, 12, 999000000),
"tsob" to LocalDateTime.of(1978, 1, 15, 7, 45, 12, 999000000))
}
}

@Neo4jSink(cud = [CudStrategy(TOPIC)])
@Test
fun `should create relationship`(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import java.math.BigDecimal
import java.math.BigInteger
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
Expand All @@ -33,9 +35,13 @@ import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.stream.Stream
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -45,6 +51,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource
import org.neo4j.connectors.kafka.configuration.PayloadMode
import org.neo4j.connectors.kafka.data.DynamicTypes
import org.neo4j.connectors.kafka.data.PropertyType
import org.neo4j.connectors.kafka.testing.DateSupport
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
Expand Down Expand Up @@ -217,7 +224,7 @@ abstract class Neo4jCypherIT {
cypher = [CypherStrategy(TOPIC, "CREATE (p:Data) SET p.value = event")],
schemaControlValueCompatibility = SchemaCompatibilityMode.NONE)
@ParameterizedTest
@ArgumentsSource(AvroSimpleTypes::class)
@ArgumentsSource(SimpleTypes::class)
fun `should support connect simple types`(
schema: Schema,
value: Any?,
Expand All @@ -236,7 +243,7 @@ abstract class Neo4jCypherIT {
}
}

object AvroSimpleTypes : ArgumentsProvider {
object SimpleTypes : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> {
return Stream.of(
Arguments.of(Schema.INT8_SCHEMA, Byte.MAX_VALUE, Byte.MAX_VALUE),
Expand All @@ -260,7 +267,47 @@ abstract class Neo4jCypherIT {
Arguments.of(
Schema.OPTIONAL_BYTES_SCHEMA,
"a string".encodeToByteArray(),
"a string".encodeToByteArray()))
"a string".encodeToByteArray()),
)
}
}

@Neo4jSink(
cypher = [CypherStrategy(TOPIC, "CREATE (p:Data) SET p.value = event")],
schemaControlValueCompatibility = SchemaCompatibilityMode.NONE)
@ParameterizedTest
@ArgumentsSource(ConnectTypes::class)
fun `should support connect types`(
schema: Schema,
value: Any?,
expected: Any?,
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {
producer.publish(valueSchema = schema, value = value)

eventually(30.seconds) { session.run("MATCH (n:Data) RETURN n", emptyMap()).single() }
.get("n")
.asNode() should
{
it.labels() shouldBe listOf("Data")
it.asMap() shouldBe mapOf("value" to expected)
}
}

object ConnectTypes : ArgumentsProvider {
override fun provideArguments(context: ExtensionContext?): Stream<out Arguments> {
return Stream.of(
Arguments.of(Date.SCHEMA, DateSupport.date(2000, 1, 1), LocalDate.of(2000, 1, 1)),
Arguments.of(
Time.SCHEMA, DateSupport.time(23, 59, 59, 999), LocalTime.of(23, 59, 59, 999000000)),
Arguments.of(
Timestamp.SCHEMA,
DateSupport.timestamp(2000, 1, 1, 23, 59, 59, 999),
LocalDateTime.of(2000, 1, 1, 23, 59, 59, 999000000)),
Arguments.of(Decimal.schema(4), BigDecimal(BigInteger("1234567890"), 4), "123456.7890"),
Arguments.of(
Decimal.schema(6), BigDecimal(BigInteger("1234567890000"), 6), "1234567.890000"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import java.math.BigDecimal
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit
import kotlin.time.Duration.Companion.seconds
import org.apache.kafka.connect.data.Date
import org.apache.kafka.connect.data.Decimal
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.data.Time
import org.apache.kafka.connect.data.Timestamp
import org.junit.jupiter.api.Test
import org.neo4j.connectors.kafka.data.DynamicTypes
import org.neo4j.connectors.kafka.data.PropertyType
import org.neo4j.connectors.kafka.data.PropertyType.schema
import org.neo4j.connectors.kafka.testing.DateSupport
import org.neo4j.connectors.kafka.testing.TestSupport.runTest
import org.neo4j.connectors.kafka.testing.format.KafkaConverter
import org.neo4j.connectors.kafka.testing.format.KeyValueConverter
Expand Down Expand Up @@ -125,6 +133,52 @@ abstract class Neo4jNodePatternIT {
}
}

@Neo4jSink(
nodePattern =
[
NodePatternStrategy(
TOPIC, "(:User{!id,height,dob,tob,tsob})", mergeNodeProperties = false)])
@Test
fun `should create node from struct containing connect types`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {
session.run("CREATE CONSTRAINT FOR (n:User) REQUIRE n.id IS KEY").consume()

SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("height", Decimal.schema(2))
.field("dob", Date.SCHEMA)
.field("tob", Time.SCHEMA)
.field("tsob", Timestamp.SCHEMA)
.build()
.let { schema ->
producer.publish(
valueSchema = schema,
value =
Struct(schema)
.put("id", 1L)
.put("height", BigDecimal.valueOf(185, 2))
.put("dob", DateSupport.date(1978, 1, 15))
.put("tob", DateSupport.time(7, 45, 12, 999))
.put("tsob", DateSupport.timestamp(1978, 1, 15, 7, 45, 12, 999)))
}

eventually(30.seconds) { session.run("MATCH (n:User) RETURN n", emptyMap()).single() }
.get("n")
.asNode() should
{
it.labels() shouldBe listOf("User")
it.asMap() shouldBe
mapOf(
"id" to 1L,
"height" to "1.85",
"dob" to LocalDate.of(1978, 1, 15),
"tob" to LocalTime.of(7, 45, 12, 999000000),
"tsob" to LocalDateTime.of(1978, 1, 15, 7, 45, 12, 999000000))
}
}

@Neo4jSink(
nodePattern =
[NodePatternStrategy(TOPIC, "(:User{!id,name,surname})", mergeNodeProperties = false)])
Expand Down
Loading

0 comments on commit dca9d21

Please sign in to comment.