Skip to content

Commit

Permalink
Add failFast error option
Browse files Browse the repository at this point in the history
Adds an option that will stop other consumer threads in case one of them fails
  • Loading branch information
Bouncheck committed Oct 20, 2024
1 parent d271e92 commit cf3140d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
19 changes: 17 additions & 2 deletions tools/stress/src/org/apache/cassandra/stress/StressAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ private StressMetrics run(OpDistributionFactory operations,

final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings);

final CountDownLatch anyFailed = new CountDownLatch(1);
final CountDownLatch releaseConsumers = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(threadCount);
final CountDownLatch start = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
{
consumers[i] = new Consumer(operations, isWarmup,
done, start, releaseConsumers, workManager, metrics, rateLimiter);
done, start, releaseConsumers, anyFailed, workManager, metrics, rateLimiter);
}

// starting worker threadCount
Expand Down Expand Up @@ -267,7 +268,12 @@ private StressMetrics run(OpDistributionFactory operations,
if (durationUnits != null)
{
try {
done.await(duration, durationUnits);
if(settings.errors.failFast) {
// I'm assuming Consumers don't finish successfully ahead of set duration
anyFailed.await(duration, durationUnits);
} else {
done.await(duration, durationUnits);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -406,13 +412,15 @@ public class Consumer extends Thread implements MeasurementSink
private final CountDownLatch done;
private final CountDownLatch start;
private final CountDownLatch releaseConsumers;
private final CountDownLatch anyFailed;
public final Queue<OpMeasurement> measurementsRecycling;
public final Queue<OpMeasurement> measurementsReporting;
public Consumer(OpDistributionFactory operations,
boolean isWarmup,
CountDownLatch done,
CountDownLatch start,
CountDownLatch releaseConsumers,
CountDownLatch anyFailed,
WorkManager workManager,
StressMetrics metrics,
UniformRateLimiter rateLimiter)
Expand All @@ -421,6 +429,7 @@ public Consumer(OpDistributionFactory operations,
this.done = done;
this.start = start;
this.releaseConsumers = releaseConsumers;
this.anyFailed = anyFailed;
this.metrics = metrics;
this.opStream = new StreamOfOperations(opDistribution, rateLimiter, workManager);
this.measurementsRecycling = new SpscArrayQueue<OpMeasurement>(8*1024);
Expand Down Expand Up @@ -466,6 +475,10 @@ public void run()

while (true)
{
if (settings.errors.failFast && anyFailed.getCount() == 0) {
success = false;
break;
}
// Assumption: All ops are thread local, operations are never shared across threads.
Operation op = opStream.nextOp();
if (op == null)
Expand Down Expand Up @@ -497,6 +510,7 @@ public void run()
output.printException(e);

success = false;
anyFailed.countDown();
opStream.abort();
metrics.cancel();
return;
Expand All @@ -507,6 +521,7 @@ public void run()
{
System.err.println(e.getMessage());
success = false;
anyFailed.countDown();
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class SettingsErrors implements Serializable
{

public final boolean ignore;
public final boolean failFast;
public final int tries;
public final boolean skipReadValidation;
public final boolean skipUnsupportedColumns;
Expand All @@ -51,6 +52,7 @@ private enum DelayPolicy {
public SettingsErrors(Options options)
{
ignore = options.ignore.setByUser();
failFast = options.failFast.setByUser();
this.tries = Math.max(1, Integer.parseInt(options.retries.value()) + 1);
skipReadValidation = options.skipReadValidation.setByUser();
skipUnsupportedColumns = options.skipUnsupportedColumns.setByUser();
Expand Down Expand Up @@ -93,6 +95,7 @@ public static final class Options extends GroupedOptions
{
final OptionSimple retries = new OptionSimple("retries=", "[0-9]+", "9", "Number of tries to perform for each operation before failing", false);
final OptionSimple ignore = new OptionSimple("ignore", "", null, "Do not fail on errors", false);
final OptionSimple failFast = new OptionSimple("fail-fast", "", null, "Fail on first thread failure when running for set <duration>", false);
final OptionSimple skipReadValidation = new OptionSimple("skip-read-validation", "", null, "Skip read validation and message output", false);
final OptionSimple skipUnsupportedColumns = new OptionSimple("skip-unsupported-columns", "", null, "Skip unsupported columns, such as maps and embedded collections, when generating data for a user profile.", false);

Expand All @@ -103,7 +106,7 @@ public static final class Options extends GroupedOptions
@Override
public List<? extends Option> options()
{
return Arrays.asList(retries, ignore, skipReadValidation, skipUnsupportedColumns,
return Arrays.asList(retries, ignore, failFast, skipReadValidation, skipUnsupportedColumns,
delayPolicy, minDelayMs, maxDelayMs);
}

Expand All @@ -128,6 +131,7 @@ public boolean happy() {
public void printSettings(ResultLogger out)
{
out.printf(" Ignore: %b%n", ignore);
out.printf(" Fail fast setting: %b%n", failFast);
out.printf(" Tries: %d%n", tries);
if (delayPolicy == DelayPolicy.CONSTANT && minDelayMs == 0) {
return;
Expand Down

0 comments on commit cf3140d

Please sign in to comment.