Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Revert "Revert "added circuit breaker changes and test""
Browse files Browse the repository at this point in the history
  • Loading branch information
adyach authored Oct 28, 2019
1 parent 9f1483f commit 904bee2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public HystrixKafkaCircuitBreaker(final String brokerId) {
concurrentExecutionCount = new AtomicInteger();
}

public boolean allowRequest() {
return circuitBreaker.allowRequest();
public boolean attemptExecution() {
return circuitBreaker.attemptExecution();
}

public void markStart() {
Expand All @@ -66,6 +66,7 @@ public void markFailure() {
concurrentExecutionCount.decrementAndGet();
HystrixThreadEventStream.getInstance()
.executionDone(ExecutionResult.from(HystrixEventType.FAILURE), commandKey, threadPoolKey);
circuitBreaker.markNonSuccess();
}

public String getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void syncPostBatch(final String topicId, final List<BatchItem> batch, fin
item.setStep(EventPublishingStep.PUBLISHING);
final HystrixKafkaCircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(
item.getBrokerId(), brokerId -> new HystrixKafkaCircuitBreaker(brokerId));
if (circuitBreaker.allowRequest()) {
if (circuitBreaker.attemptExecution()) {
sendFutures.put(item, publishItem(producer, topicId, item, circuitBreaker));
} else {
shortCircuited++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.google.common.collect.Sets.newHashSet;
Expand Down Expand Up @@ -107,6 +108,8 @@ private enum ConsumerOffsetMode {
@SuppressWarnings("unchecked")
public KafkaTopicRepositoryTest() {
System.setProperty("hystrix.command.1.metrics.healthSnapshot.intervalInMilliseconds", "10");
System.setProperty("hystrix.command.1.metrics.rollingStats.timeInMilliseconds", "500");
System.setProperty("hystrix.command.1.circuitBreaker.sleepWindowInMilliseconds", "500");
kafkaProducer = mock(KafkaProducer.class);
when(kafkaProducer.partitionsFor(anyString())).then(
invocation -> partitionsOfTopic((String) invocation.getArguments()[0])
Expand Down Expand Up @@ -316,19 +319,42 @@ public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException()
}

@Test
public void whenKafkaPublishTimeoutThenCircuitIsOpened() {

public void checkCircuitBreakerStateBasedOnKafkaResponse() {
when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn(1000L);

when(kafkaProducer.partitionsFor(EXPECTED_PRODUCER_RECORD.topic())).thenReturn(ImmutableList.of(
new PartitionInfo(EXPECTED_PRODUCER_RECORD.topic(), 1, new Node(1, "host", 9091), null, null)));

//Timeout Exception should cause circuit breaker to open
List<BatchItem> batches = setResponseForSendingBatches(new TimeoutException());
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);

//No exception should close the circuit
batches = setResponseForSendingBatches(null);
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.SUBMITTED &&
item.getResponse().getDetail().equals(""))
.count() >= 1);

//Timeout Exception should cause circuit breaker to open again
batches = setResponseForSendingBatches(new TimeoutException());
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);

}

private List<BatchItem> setResponseForSendingBatches(final Exception e) {
when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> {
final Callback callback = (Callback) invocation.getArguments()[1];
callback.onCompletion(null, new TimeoutException());
if (callback != null) {
callback.onCompletion(null, e);
}
return null;
});

final List<BatchItem> batches = new LinkedList<>();
for (int i = 0; i < 100; i++) {
try {
Expand All @@ -338,17 +364,13 @@ public void whenKafkaPublishTimeoutThenCircuitIsOpened() {
Collections.emptyList());
batchItem.setPartition("1");
batches.add(batchItem);
TimeUnit.MILLISECONDS.sleep(5);
kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(),
ImmutableList.of(batchItem), "random");
fail();
} catch (final EventPublishingException e) {
} catch (final EventPublishingException | InterruptedException ex) {
}
}

Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);
return batches;
}

@Test
Expand Down

0 comments on commit 904bee2

Please sign in to comment.