Skip to content

Commit

Permalink
fix: include a final return clause within constructed query (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince authored Jan 9, 2025
1 parent dca9d21 commit f2f502a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,29 @@ abstract class Neo4jCypherIT {
}
}

@Neo4jSink(
cypher =
[
CypherStrategy(
TOPIC,
"CREATE (p:Person) SET p.name = event.firstName, p.surname = event.lastName RETURN p")])
@Test
fun `should create node with a statement that returns a value`(
@TopicProducer(TOPIC) producer: ConvertingKafkaProducer,
session: Session
) = runTest {
producer.publish(
valueSchema = Schema.STRING_SCHEMA, value = """{"firstName": "john", "lastName": "doe"}""")

eventually(30.seconds) { session.run("MATCH (n:Person) RETURN n", emptyMap()).single() }
.get("n")
.asNode() should
{
it.labels() shouldBe listOf("Person")
it.asMap() shouldBe mapOf("name" to "john", "surname" to "doe")
}
}

@Neo4jSink(
cypher =
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class CypherHandler(
}
})
.callRawCypher("WITH * $query")
.returning(Cypher.literalNull())
.build())

logger.debug("using cypher query '{}' for topic '{}'", rewrittenQuery, topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event CALL {WITH * CREATE (n:Node) SET n = event}",
"UNWIND ${'$'}events AS message WITH message.value AS event CALL {WITH * CREATE (n:Node) SET n = event} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -90,7 +90,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __value}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __value} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -126,7 +126,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __key}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __key} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -168,7 +168,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -211,7 +211,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}",
"UNWIND ${'$'}events AS message WITH message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -253,7 +253,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header}",
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = __header} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -308,7 +308,7 @@ class CypherHandlerTest : HandlerTest() {
null,
messages.slice(0..4),
Query(
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}",
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value} RETURN NULL",
mapOf(
"events" to
(1..5).map { seq ->
Expand All @@ -325,7 +325,7 @@ class CypherHandlerTest : HandlerTest() {
null,
messages.slice(5..9),
Query(
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}",
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value} RETURN NULL",
mapOf(
"events" to
(6..10).map { seq ->
Expand All @@ -342,7 +342,7 @@ class CypherHandlerTest : HandlerTest() {
null,
messages.slice(10..12),
Query(
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value}",
"UNWIND ${'$'}events AS message WITH message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n.id = __value} RETURN NULL",
mapOf(
"events" to
(11..13).map { seq ->
Expand All @@ -369,7 +369,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -401,7 +401,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -440,7 +440,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down Expand Up @@ -480,7 +480,7 @@ class CypherHandlerTest : HandlerTest() {
null,
listOf(sinkMessage),
Query(
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event}",
"UNWIND ${'$'}events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * CREATE (n:Node) SET n = event} RETURN NULL",
mapOf(
"events" to
listOf(
Expand Down

0 comments on commit f2f502a

Please sign in to comment.