Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1111 from zalando/revert-1108-hysterix-fix
Browse files Browse the repository at this point in the history
Revert "added circuit breaker changes and test"
  • Loading branch information
Kunal-Jha authored Oct 24, 2019
2 parents aa4b1fe + 6c46322 commit 9f1483f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public HystrixKafkaCircuitBreaker(final String brokerId) {
concurrentExecutionCount = new AtomicInteger();
}

public boolean attemptExecution() {
return circuitBreaker.attemptExecution();
public boolean allowRequest() {
return circuitBreaker.allowRequest();
}

public void markStart() {
Expand All @@ -66,7 +66,6 @@ public void markFailure() {
concurrentExecutionCount.decrementAndGet();
HystrixThreadEventStream.getInstance()
.executionDone(ExecutionResult.from(HystrixEventType.FAILURE), commandKey, threadPoolKey);
circuitBreaker.markNonSuccess();
}

public String getMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void syncPostBatch(final String topicId, final List<BatchItem> batch, fin
item.setStep(EventPublishingStep.PUBLISHING);
final HystrixKafkaCircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(
item.getBrokerId(), brokerId -> new HystrixKafkaCircuitBreaker(brokerId));
if (circuitBreaker.attemptExecution()) {
if (circuitBreaker.allowRequest()) {
sendFutures.put(item, publishItem(producer, topicId, item, circuitBreaker));
} else {
shortCircuited++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.google.common.collect.Sets.newHashSet;
Expand Down Expand Up @@ -108,8 +107,6 @@ private enum ConsumerOffsetMode {
@SuppressWarnings("unchecked")
public KafkaTopicRepositoryTest() {
System.setProperty("hystrix.command.1.metrics.healthSnapshot.intervalInMilliseconds", "10");
System.setProperty("hystrix.command.1.metrics.rollingStats.timeInMilliseconds", "500");
System.setProperty("hystrix.command.1.circuitBreaker.sleepWindowInMilliseconds", "500");
kafkaProducer = mock(KafkaProducer.class);
when(kafkaProducer.partitionsFor(anyString())).then(
invocation -> partitionsOfTopic((String) invocation.getArguments()[0])
Expand Down Expand Up @@ -319,42 +316,19 @@ public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException()
}

@Test
public void checkCircuitBreakerStateBasedOnKafkaResponse() {
public void whenKafkaPublishTimeoutThenCircuitIsOpened() {

when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn(1000L);

when(kafkaProducer.partitionsFor(EXPECTED_PRODUCER_RECORD.topic())).thenReturn(ImmutableList.of(
new PartitionInfo(EXPECTED_PRODUCER_RECORD.topic(), 1, new Node(1, "host", 9091), null, null)));

//Timeout Exception should cause circuit breaker to open
List<BatchItem> batches = setResponseForSendingBatches(new TimeoutException());
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);

//No exception should close the circuit
batches = setResponseForSendingBatches(null);
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.SUBMITTED &&
item.getResponse().getDetail().equals(""))
.count() >= 1);

//Timeout Exception should cause circuit breaker to open again
batches = setResponseForSendingBatches(new TimeoutException());
Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);

}

private List<BatchItem> setResponseForSendingBatches(final Exception e) {
when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> {
final Callback callback = (Callback) invocation.getArguments()[1];
if (callback != null) {
callback.onCompletion(null, e);
}
callback.onCompletion(null, new TimeoutException());
return null;
});

final List<BatchItem> batches = new LinkedList<>();
for (int i = 0; i < 100; i++) {
try {
Expand All @@ -364,13 +338,17 @@ private List<BatchItem> setResponseForSendingBatches(final Exception e) {
Collections.emptyList());
batchItem.setPartition("1");
batches.add(batchItem);
TimeUnit.MILLISECONDS.sleep(5);
kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(),
ImmutableList.of(batchItem), "random");
} catch (final EventPublishingException | InterruptedException ex) {
fail();
} catch (final EventPublishingException e) {
}
}
return batches;

Assert.assertTrue(batches.stream()
.filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED &&
item.getResponse().getDetail().equals("short circuited"))
.count() >= 1);
}

@Test
Expand Down

0 comments on commit 9f1483f

Please sign in to comment.