diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java b/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java index 8145052ff7..3c68e14d08 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/HystrixKafkaCircuitBreaker.java @@ -45,8 +45,8 @@ public HystrixKafkaCircuitBreaker(final String brokerId) { concurrentExecutionCount = new AtomicInteger(); } - public boolean allowRequest() { - return circuitBreaker.allowRequest(); + public boolean attemptExecution() { + return circuitBreaker.attemptExecution(); } public void markStart() { @@ -66,6 +66,7 @@ public void markFailure() { concurrentExecutionCount.decrementAndGet(); HystrixThreadEventStream.getInstance() .executionDone(ExecutionResult.from(HystrixEventType.FAILURE), commandKey, threadPoolKey); + circuitBreaker.markNonSuccess(); } public String getMetrics() { diff --git a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 40fbd3b222..28087d7d42 100644 --- a/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -273,7 +273,7 @@ public void syncPostBatch(final String topicId, final List batch, fin item.setStep(EventPublishingStep.PUBLISHING); final HystrixKafkaCircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent( item.getBrokerId(), brokerId -> new HystrixKafkaCircuitBreaker(brokerId)); - if (circuitBreaker.allowRequest()) { + if (circuitBreaker.attemptExecution()) { sendFutures.put(item, publishItem(producer, topicId, item, circuitBreaker)); } else { shortCircuited++; diff --git a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index a15649caea..bdd000abb4 100644 --- a/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.collect.Sets.newHashSet; @@ -107,6 +108,8 @@ private enum ConsumerOffsetMode { @SuppressWarnings("unchecked") public KafkaTopicRepositoryTest() { System.setProperty("hystrix.command.1.metrics.healthSnapshot.intervalInMilliseconds", "10"); + System.setProperty("hystrix.command.1.metrics.rollingStats.timeInMilliseconds", "500"); + System.setProperty("hystrix.command.1.circuitBreaker.sleepWindowInMilliseconds", "500"); kafkaProducer = mock(KafkaProducer.class); when(kafkaProducer.partitionsFor(anyString())).then( invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) @@ -316,19 +319,42 @@ public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException() } @Test - public void whenKafkaPublishTimeoutThenCircuitIsOpened() { - + public void checkCircuitBreakerStateBasedOnKafkaResponse() { when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn(1000L); - when(kafkaProducer.partitionsFor(EXPECTED_PRODUCER_RECORD.topic())).thenReturn(ImmutableList.of( new PartitionInfo(EXPECTED_PRODUCER_RECORD.topic(), 1, new Node(1, "host", 9091), null, null))); + //Timeout Exception should cause circuit breaker to open + List batches = setResponseForSendingBatches(new TimeoutException()); + Assert.assertTrue(batches.stream() + .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && + item.getResponse().getDetail().equals("short circuited")) + .count() >= 1); + + //No exception should close the circuit + batches = setResponseForSendingBatches(null); + Assert.assertTrue(batches.stream() + .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.SUBMITTED && + item.getResponse().getDetail().equals("")) + .count() >= 1); + + //Timeout Exception should cause circuit breaker to open again + batches = setResponseForSendingBatches(new TimeoutException()); + Assert.assertTrue(batches.stream() + .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && + item.getResponse().getDetail().equals("short circuited")) + .count() >= 1); + + } + + private List setResponseForSendingBatches(final Exception e) { when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { final Callback callback = (Callback) invocation.getArguments()[1]; - callback.onCompletion(null, new TimeoutException()); + if (callback != null) { + callback.onCompletion(null, e); + } return null; }); - final List batches = new LinkedList<>(); for (int i = 0; i < 100; i++) { try { @@ -338,17 +364,13 @@ public void whenKafkaPublishTimeoutThenCircuitIsOpened() { Collections.emptyList()); batchItem.setPartition("1"); batches.add(batchItem); + TimeUnit.MILLISECONDS.sleep(5); kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), ImmutableList.of(batchItem), "random"); - fail(); - } catch (final EventPublishingException e) { + } catch (final EventPublishingException | InterruptedException ex) { } } - - Assert.assertTrue(batches.stream() - .filter(item -> item.getResponse().getPublishingStatus() == EventPublishingStatus.FAILED && - item.getResponse().getDetail().equals("short circuited")) - .count() >= 1); + return batches; } @Test