diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java index d442972715..5af2339de2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerProcess.java @@ -68,15 +68,7 @@ public void run() { } finally { logger.info("Releasing consumer process thread of subscription {}", getSubscriptionName()); refreshHealthcheck(); - try { - stop(); - } catch (Exception exceptionWhileStopping) { - logger.error("An error occurred while stopping consumer process of subscription {}", - getSubscriptionName(), exceptionWhileStopping); - } finally { - onConsumerStopped.accept(getSubscriptionName()); - Thread.currentThread().setName("consumer-released-thread"); - } + stop(); } } @@ -115,7 +107,6 @@ private void process(Signal signal) { break; case STOP: logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType()); - this.running = false; stop(); break; case RETRANSMIT: @@ -154,12 +145,24 @@ private void start(Signal signal) { } private void stop() { - long startTime = clock.millis(); - logger.info("Stopping consumer for subscription {}", getSubscriptionName()); + if (!running) { + return; + } + this.running = false; + try { + long startTime = clock.millis(); + logger.info("Stopping consumer for subscription {}", getSubscriptionName()); - consumer.tearDown(); + consumer.tearDown(); - logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + logger.info("Stopped consumer for subscription {} in {}ms", getSubscriptionName(), clock.millis() - startTime); + } catch (Exception exceptionWhileStopping) { + logger.error("An error occurred while stopping consumer process of subscription {}", + getSubscriptionName(), exceptionWhileStopping); + } finally { + onConsumerStopped.accept(getSubscriptionName()); + Thread.currentThread().setName("consumer-released-thread"); + } } private void retransmit(Signal signal) {