Skip to content

Commit

Permalink
Add failOnError and return status options to apoc.cypher.timeboxed (#711
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gem-neo4j authored Jan 21, 2025
1 parent fb799e4 commit 36fdd57
Show file tree
Hide file tree
Showing 5 changed files with 1,406 additions and 973 deletions.
78 changes: 75 additions & 3 deletions core/src/main/java/apoc/cypher/Timeboxed.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package apoc.cypher;

import static apoc.util.Util.toBoolean;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import apoc.Pools;
Expand All @@ -30,9 +31,12 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.kernel.api.QueryLanguage;
import org.neo4j.kernel.api.procedure.QueryLanguageScope;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
Expand Down Expand Up @@ -63,16 +67,39 @@ public class Timeboxed {

@NotThreadSafe
@Procedure("apoc.cypher.runTimeboxed")
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_5})
@Description("Terminates a Cypher statement if it has not finished before the set timeout (ms).")
public Stream<CypherStatementMapResult> runTimeboxedCypher5(
@Name(value = "statement", description = "The Cypher statement to run.") String cypher,
@Name(value = "params", description = "The parameters for the given Cypher statement.")
Map<String, Object> params,
@Name(value = "timeout", description = "The maximum time, in milliseconds, the statement can run for.")
long timeout) {
return runTimeboxed(cypher, params, timeout, new HashMap<>());
}

@NotThreadSafe
@Procedure("apoc.cypher.runTimeboxed")
@QueryLanguageScope(scope = {QueryLanguage.CYPHER_25})
@Description("Terminates a Cypher statement if it has not finished before the set timeout (ms).")
public Stream<CypherStatementMapResult> runTimeboxed(
@Name(value = "statement", description = "The Cypher statement to run.") String cypher,
@Name(value = "params", description = "The parameters for the given Cypher statement.")
Map<String, Object> params,
@Name(value = "timeout", description = "The maximum time the statement can run for.") long timeout) {
@Name(value = "timeout", description = "The maximum time, in milliseconds, the statement can run for.")
long timeout,
@Name(
value = "config",
defaultValue = "{}",
description = "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }")
Map<String, Object> config) {

final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<>(100);
final AtomicReference<Transaction> txAtomic = new AtomicReference<>();

boolean failOnError = toBoolean(config.get("failOnError"));
boolean appendStatusRow = toBoolean(config.get("appendStatusRow"));

// run query to be timeboxed in a separate thread to enable proper tx termination
// if we'd run this in current thread, a tx.terminate would kill the transaction the procedure call uses itself.
pools.getDefaultExecutorService().submit(() -> {
Expand All @@ -89,9 +116,22 @@ public Stream<CypherStatementMapResult> runTimeboxed(
final Map<String, Object> map = result.next();
offerToQueue(queue, map, timeout);
}
if (appendStatusRow) {
Map<String, Object> map = statusMap(true, false, null);
offerToQueue(queue, map, timeout);
}
innerTx.commit();
} catch (TransactionTerminatedException e) {
log.warn("query " + cypher + " has been terminated");
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
} catch (QueryExecutionException e) {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, false, e.getMessage());
offerToQueue(queue, map, timeout);
}
} finally {
offerToQueue(queue, POISON, timeout);
txAtomic.set(null);
Expand All @@ -107,6 +147,10 @@ public Stream<CypherStatementMapResult> runTimeboxed(
log.debug(
"tx is null, either the other transaction finished gracefully or has not yet been start.");
} else {
if (appendStatusRow || failOnError) {
Map<String, Object> map = statusMap(false, true, null);
offerToQueue(queue, map, timeout);
}
tx.terminate();
offerToQueue(queue, POISON, timeout);
log.warn("terminating transaction, putting POISON onto queue");
Expand All @@ -128,8 +172,26 @@ public boolean hasNext() {
try {
nextElement = queue.poll(timeout, MILLISECONDS);
if (nextElement == null) {
log.warn("couldn't grab queue element, aborting - this should never happen");
hasFinished = true;
// Wait a little bit longer and try again, waiting exactly the timeout means that
// there might be a slight timing issue with termination vs. setting the queue as terminated
nextElement = queue.poll(100, MILLISECONDS);
// If it is still null, then accept and move on
if (nextElement == null) {
log.warn("Empty queue, aborting.");
if (failOnError) {
throw new RuntimeException("The query has been terminated.");
}
hasFinished = true;
}
}

if (failOnError && nextElement.get("wasSuccessful").equals(Boolean.FALSE)) {
if (nextElement.get("failedWithError").equals(Boolean.TRUE)) {
throw new RuntimeException("The inner query errored with: " + nextElement.get("error"));
}
if (nextElement.get("wasTerminated").equals(Boolean.TRUE)) {
throw new RuntimeException("The query has been terminated.");
}
} else {
hasFinished = POISON.equals(nextElement);
}
Expand All @@ -145,10 +207,20 @@ public Map<String, Object> next() {
return nextElement;
}
};

return StreamSupport.stream(Spliterators.spliteratorUnknownSize(queueConsumer, Spliterator.ORDERED), false)
.map(CypherStatementMapResult::new);
}

private Map<String, Object> statusMap(boolean successful, boolean terminated, String errorMessage) {
Map<String, Object> map = new HashMap<>();
map.put("wasSuccessful", successful ? Boolean.TRUE : Boolean.FALSE);
map.put("wasTerminated", terminated ? Boolean.TRUE : Boolean.FALSE);
map.put("failedWithError", errorMessage == null ? Boolean.FALSE : Boolean.TRUE);
map.put("error", errorMessage);
return map;
}

private void offerToQueue(BlockingQueue<Map<String, Object>> queue, Map<String, Object> map, long timeout) {
try {
boolean hasBeenAdded = queue.offer(map, timeout, MILLISECONDS);
Expand Down
114 changes: 108 additions & 6 deletions core/src/test/java/apoc/cypher/CypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static apoc.util.TestUtil.testCallEmpty;
import static apoc.util.TestUtil.testFail;
import static apoc.util.TestUtil.testResult;
import static apoc.util.TransactionTestUtil.checkTerminationGuard;
import static apoc.util.TransactionTestUtil.checkTransactionTimeReasonable;
import static apoc.util.TransactionTestUtil.lastTransactionChecks;
import static apoc.util.TransactionTestUtil.terminateTransactionAsync;
Expand All @@ -38,6 +37,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import apoc.text.Strings;
Expand All @@ -54,11 +54,14 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.neo4j.configuration.GraphDatabaseInternalSettings;
import org.neo4j.configuration.GraphDatabaseSettings;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.QueryExecutionException;
Expand All @@ -78,6 +81,7 @@ public class CypherTest {
@ClassRule
public static DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.allow_file_urls, true)
.withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true)
.withSetting(
GraphDatabaseSettings.load_csv_file_url_root,
new File("src/test/resources").toPath().toAbsolutePath());
Expand Down Expand Up @@ -217,16 +221,114 @@ public void testWithTimeout() {
}

@Test
public void testRunTimeboxedWithTermination() {
public void testRunTimeboxedWithInvalidQuerySyntax() {
final String query = "CYPHER 25 CALL apoc.cypher.runTimeboxed('RETUN 0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("Invalid input 'RETUN'"));
}

@Test
public void testRunTimeboxedWithInvalidQueries() {
final String query = "CYPHER 25 CALL apoc.cypher.runTimeboxed('RETURN 1/0', null, 20000, {failOnError: true})";
QueryExecutionException e = assertThrows(QueryExecutionException.class, () -> testCall(db, query, (r) -> {}));
Throwable except = ExceptionUtils.getRootCause(e);
TestCase.assertTrue(except instanceof RuntimeException);
TestCase.assertTrue(except.getMessage().contains("The inner query errored with: / by zero"));
}

@Test
public void testRunTimeboxedWithSuccessStatus() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1 AS a";
final String query =
"CYPHER 25 CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("wasSuccessful"));
});
}

@Test
public void testRunTimeboxedWithErrorReported() {
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0 AS a";
final String query =
"CALL apoc.cypher.runTimeboxed('UNWIND range(0, 10) AS id CALL apoc.util.sleep(2000) RETURN 0', null, 20000)";
checkTerminationGuard(db, query);
"CYPHER 25 CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithErrorReportedAfterSomeSuccesses() {
// this query throws an error because 1/0
final String innerQuery = "UNWIND [1, 1, 0] AS i RETURN 1/i AS a";
final String query =
"CYPHER 25 CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns nothing and terminate before `timeout`
long timeout = 50000L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(1L, value.get("a"));

assertTrue(r.hasNext());
innerResult = r.next();
value = (Map<String, Object>) innerResult.get("value");
assertEquals(true, value.get("failedWithError"));
assertEquals("/ by zero", value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationReported() {
final String innerQuery = "CALL apoc.util.sleep(10999) RETURN 0";
final String query =
"CYPHER 25 CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout, {appendStatusRow: true})";

// check that the query returns the status row that it was terminated
long timeout = 500L;
testResult(db, query, Map.of("innerQuery", innerQuery, "timeout", timeout), r -> {
assertTrue(r.hasNext());
Map<String, Object> innerResult = r.next();
Map<String, Object> value = (Map<String, Object>) innerResult.get("value");
assertEquals(false, value.get("wasSuccessful"));
assertEquals(true, value.get("wasTerminated"));
assertEquals(false, value.get("failedWithError"));
assertNull(value.get("error"));
});
}

@Test
public void testRunTimeboxedWithTerminationInnerTransaction1() {
// this query throws an error because of ` AS 'a'`
final String innerQuery = "CALL apoc.util.sleep(1000) RETURN 1 AS 'a'";
// this query throws an error because 1/0
final String innerQuery = "RETURN 1/0";
final String query = "CALL apoc.cypher.runTimeboxed($innerQuery, null, $timeout)";

long timeBefore = System.currentTimeMillis();
Expand Down
28 changes: 0 additions & 28 deletions core/src/test/resources/procedures/common/procedures.json
Original file line number Diff line number Diff line change
Expand Up @@ -1770,34 +1770,6 @@
"isDeprecated" : false,
"type" : "MAP"
} ]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER) :: (value :: MAP)",
"name" : "apoc.cypher.runTimeboxed",
"description" : "Terminates a Cypher statement if it has not finished before the set timeout (ms).",
"returnDescription" : [ {
"name" : "value",
"description" : "The result returned from the Cypher statement.",
"isDeprecated" : false,
"type" : "MAP"
} ],
"deprecatedBy" : null,
"argumentDescription" : [ {
"name" : "statement",
"description" : "The Cypher statement to run.",
"isDeprecated" : false,
"type" : "STRING"
}, {
"name" : "params",
"description" : "The parameters for the given Cypher statement.",
"isDeprecated" : false,
"type" : "MAP"
}, {
"name" : "timeout",
"description" : "The maximum time the statement can run for.",
"isDeprecated" : false,
"type" : "INTEGER"
} ]
}, {
"isDeprecated" : false,
"signature" : "apoc.cypher.runWrite(statement :: STRING, params :: MAP) :: (value :: MAP)",
Expand Down
42 changes: 42 additions & 0 deletions core/src/test/resources/procedures/cypher25/procedures.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,46 @@
[
{
"isDeprecated": false,
"signature": "apoc.cypher.runTimeboxed(statement :: STRING, params :: MAP, timeout :: INTEGER, config = {} :: MAP) :: (value :: MAP)",
"name": "apoc.cypher.runTimeboxed",
"description": "Terminates a Cypher statement if it has not finished before the set timeout (ms).",
"returnDescription": [
{
"name": "value",
"description": "The result returned from the Cypher statement.",
"isDeprecated": false,
"type": "MAP"
}
],
"deprecatedBy": null,
"argumentDescription": [
{
"name": "statement",
"description": "The Cypher statement to run.",
"isDeprecated": false,
"type": "STRING"
},
{
"name": "params",
"description": "The parameters for the given Cypher statement.",
"isDeprecated": false,
"type": "MAP"
},
{
"name": "timeout",
"description": "The maximum time, in milliseconds, the statement can run for.",
"isDeprecated": false,
"type": "INTEGER"
},
{
"name": "config",
"description": "{ failOnError = false :: BOOLEAN, appendStatusRow = false :: BOOLEAN }",
"isDeprecated": false,
"default": "DefaultParameterValue{value={}, type=MAP}",
"type": "MAP"
}
]
},
{
"isDeprecated": true,
"signature": "apoc.refactor.deleteAndReconnect(path :: PATH, nodes :: LIST<NODE>, config = {} :: MAP) :: (nodes :: LIST<NODE>, relationships :: LIST<RELATIONSHIP>)",
Expand Down
Loading

0 comments on commit 36fdd57

Please sign in to comment.