diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index dfea2fe29d..2cc9868170 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -231,6 +231,7 @@ private StressMetrics run(OpDistributionFactory operations, final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings); + final CountDownLatch anyFailed = new CountDownLatch(1); final CountDownLatch releaseConsumers = new CountDownLatch(1); final CountDownLatch done = new CountDownLatch(threadCount); final CountDownLatch start = new CountDownLatch(threadCount); @@ -238,7 +239,7 @@ private StressMetrics run(OpDistributionFactory operations, for (int i = 0; i < threadCount; i++) { consumers[i] = new Consumer(operations, isWarmup, - done, start, releaseConsumers, workManager, metrics, rateLimiter); + done, start, releaseConsumers, anyFailed, workManager, metrics, rateLimiter); } // starting worker threadCount @@ -266,7 +267,16 @@ private StressMetrics run(OpDistributionFactory operations, if (durationUnits != null) { - Uninterruptibles.sleepUninterruptibly(duration, durationUnits); + try { + if(settings.errors.failFast) { + // I'm assuming Consumers don't finish successfully ahead of set duration + anyFailed.await(duration, durationUnits); + } else { + done.await(duration, durationUnits); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } workManager.stop(); } else if (opCount <= 0) @@ -402,6 +412,7 @@ public class Consumer extends Thread implements MeasurementSink private final CountDownLatch done; private final CountDownLatch start; private final CountDownLatch releaseConsumers; + private final CountDownLatch anyFailed; public final Queue measurementsRecycling; public final Queue measurementsReporting; public Consumer(OpDistributionFactory operations, @@ -409,6 +420,7 @@ public Consumer(OpDistributionFactory operations, CountDownLatch done, CountDownLatch start, CountDownLatch releaseConsumers, + CountDownLatch anyFailed, WorkManager workManager, StressMetrics metrics, UniformRateLimiter rateLimiter) @@ -417,6 +429,7 @@ public Consumer(OpDistributionFactory operations, this.done = done; this.start = start; this.releaseConsumers = releaseConsumers; + this.anyFailed = anyFailed; this.metrics = metrics; this.opStream = new StreamOfOperations(opDistribution, rateLimiter, workManager); this.measurementsRecycling = new SpscArrayQueue(8*1024); @@ -462,6 +475,10 @@ public void run() while (true) { + if (settings.errors.failFast && anyFailed.getCount() == 0) { + success = false; + break; + } // Assumption: All ops are thread local, operations are never shared across threads. Operation op = opStream.nextOp(); if (op == null) @@ -493,6 +510,7 @@ public void run() output.printException(e); success = false; + anyFailed.countDown(); opStream.abort(); metrics.cancel(); return; @@ -503,6 +521,7 @@ public void run() { System.err.println(e.getMessage()); success = false; + anyFailed.countDown(); } finally { diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java index 2ddabbe3e5..826757738e 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsErrors.java @@ -35,6 +35,7 @@ public class SettingsErrors implements Serializable { public final boolean ignore; + public final boolean failFast; public final int tries; public final boolean skipReadValidation; public final boolean skipUnsupportedColumns; @@ -51,6 +52,7 @@ private enum DelayPolicy { public SettingsErrors(Options options) { ignore = options.ignore.setByUser(); + failFast = options.failFast.setByUser(); this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1); skipReadValidation = options.skipReadValidation.setByUser(); skipUnsupportedColumns = options.skipUnsupportedColumns.setByUser(); @@ -93,6 +95,7 @@ public static final class Options extends GroupedOptions { final OptionSimple retries = new OptionSimple("retries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false); final OptionSimple ignore = new OptionSimple("ignore", "", null, "Do not fail on errors", false); + final OptionSimple failFast = new OptionSimple("fail-fast", "", null, "Fail on first thread failure when running for set ", false); final OptionSimple skipReadValidation = new OptionSimple("skip-read-validation", "", null, "Skip read validation and message output", false); final OptionSimple skipUnsupportedColumns = new OptionSimple("skip-unsupported-columns", "", null, "Skip unsupported columns, such as maps and embedded collections, when generating data for a user profile.", false); @@ -103,7 +106,7 @@ public static final class Options extends GroupedOptions @Override public List options() { - return Arrays.asList(retries, ignore, skipReadValidation, skipUnsupportedColumns, + return Arrays.asList(retries, ignore, failFast, skipReadValidation, skipUnsupportedColumns, delayPolicy, minDelayMs, maxDelayMs); } @@ -128,6 +131,7 @@ public boolean happy() { public void printSettings(ResultLogger out) { out.printf(" Ignore: %b%n", ignore); + out.printf(" Fail fast setting: %b%n", failFast); out.printf(" Tries: %d%n", tries); if (delayPolicy == DelayPolicy.CONSTANT && minDelayMs == 0) { return;