diff --git a/lib/applicationinsights.json b/lib/applicationinsights.json index 816032191..d4a68927a 100644 --- a/lib/applicationinsights.json +++ b/lib/applicationinsights.json @@ -2,5 +2,26 @@ "connectionString": "${file:/mnt/secrets/wa/app-insights-connection-string}", "role": { "name": "WA Case Event Handler" + }, + "sampling": { + "percentage": 100 + }, + "preview": { + "sampling": { + "overrides": [ + { + "telemetryKind": "request", + "attributes": [ + { + "key": "http.url", + "value": "https?://[^/]+/health.*", + "matchType": "regexp" + } + ], + "percentage": 1 + } + ] + } } } + diff --git a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsConsumer.java b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsConsumer.java index c3c833644..8edd03478 100644 --- a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsConsumer.java +++ b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsConsumer.java @@ -1,5 +1,6 @@ package uk.gov.hmcts.reform.wacaseeventhandler.clients; +import com.azure.messaging.servicebus.ServiceBusException; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.ServiceBusSessionReceiverClient; import lombok.extern.slf4j.Slf4j; @@ -32,7 +33,7 @@ public CcdCaseEventsConsumer(ServiceBusConfiguration serviceBusConfiguration, @SuppressWarnings("squid:S2189") public void run() { try (ServiceBusSessionReceiverClient sessionReceiver = - serviceBusConfiguration.createCcdCaseEventsSessionReceiver()) { + serviceBusConfiguration.createCcdCaseEventsSessionReceiver()) { while (keepRun) { consumeMessage(sessionReceiver); } @@ -42,28 +43,38 @@ public void run() { @SuppressWarnings({"PMD.DataflowAnomalyAnalysis"}) protected void consumeMessage(ServiceBusSessionReceiverClient sessionReceiver) { try (ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession()) { + + if (receiver == null) { + log.warn("ServiceBusReceiverClient receiver was null."); + return; + + } + receiver.receiveMessages(1).forEach( message -> { try { String messageId = message.getMessageId(); String sessionId = message.getSessionId(); - log.info("Received CCD Case Event message with id '{}' and case id '{}'", messageId, sessionId); + log.info("Received CCD Case Event message with id '{}' and case id '{}'", + messageId, sessionId); eventMessageReceiverService.handleCcdCaseEventAsbMessage(messageId, sessionId, - new String(message.getBody().toBytes())); + new String(message.getBody().toBytes())); receiver.complete(message); log.info("CCD Case Event message with id '{}' handled successfully", messageId); } catch (Exception ex) { log.error("Error processing CCD Case Event message with id '{}' - " - + "abandon the processing and ASB will re-deliver it", message.getMessageId()); + + "abandon the processing and ASB will re-deliver it", message.getMessageId()); receiver.abandon(message); } }); } catch (IllegalStateException ex) { - log.info("Timeout: No CCD Case Event messages received waiting for next session."); + log.info("Timeout: No CCD Case Event messages received waiting for next session {}", ex.getMessage()); + } catch (ServiceBusException ex) { + log.error("Error occurred while receiving messages {}", ex.getMessage()); } catch (Exception ex) { - log.error("Error occurred while closing the session", ex); + log.error("Error occurred while closing the session {}", ex.getMessage()); } } diff --git a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsDeadLetterQueueConsumer.java b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsDeadLetterQueueConsumer.java index 0eb15a00f..3e6e4f4f4 100644 --- a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsDeadLetterQueueConsumer.java +++ b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventsDeadLetterQueueConsumer.java @@ -66,7 +66,7 @@ protected void consumeMessage(ServiceBusReceiverClient receiver) { } }); } catch (Exception ex) { - log.error("Error occurred while completing the message processing", ex); + log.error("Error occurred while completing the message processing {}", ex.getMessage()); } } diff --git a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/MessageReadinessConsumer.java b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/MessageReadinessConsumer.java index b2420adc2..648478b61 100644 --- a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/MessageReadinessConsumer.java +++ b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/MessageReadinessConsumer.java @@ -45,12 +45,12 @@ public MessageReadinessConsumer(DeadLetterQueuePeekService deadLetterQueuePeekSe @Override @Transactional public void run() { - log.info("Running message readiness check"); + log.debug("Running message readiness check"); try { final List allMessageInNewState = caseEventMessageRepository.getAllMessagesInNewState(); - log.info("Number of messages to check the readiness {}", allMessageInNewState.size()); + log.debug("Number of messages to check the readiness {}", allMessageInNewState.size()); allMessageInNewState.forEach(this::checkMessageToMoveToReadyState); diff --git a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/services/EventMessageReceiverService.java b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/services/EventMessageReceiverService.java index fd5739614..16906f6c7 100644 --- a/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/services/EventMessageReceiverService.java +++ b/src/main/java/uk/gov/hmcts/reform/wacaseeventhandler/services/EventMessageReceiverService.java @@ -57,7 +57,7 @@ public CaseEventMessage handleAsbMessage(String messageId, String sessionId, Str } public CaseEventMessage handleCcdCaseEventAsbMessage(String messageId, String sessionId, String message) { - log.info("Received CCD Case Events ASB message with id '{}'", messageId); + log.debug("Received CCD Case Events ASB message with id '{}'", messageId); return handleMessage(messageId, sessionId, message, false); } diff --git a/src/test/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventConsumerTest.java b/src/test/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventConsumerTest.java index 0dda2ff03..b86a4cce4 100644 --- a/src/test/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventConsumerTest.java +++ b/src/test/java/uk/gov/hmcts/reform/wacaseeventhandler/clients/CcdCaseEventConsumerTest.java @@ -70,7 +70,7 @@ void given_session_is_accepted_when_receiver_complete_throws_error() { publishMessageToReceiver(); doThrow(new ServiceBusException(new Exception(), ServiceBusErrorSource.UNKNOWN)).doNothing() - .when(receiverClient).complete(receivedMessage); + .when(receiverClient).complete(receivedMessage); underTest.consumeMessage(sessionReceiverClient); @@ -86,7 +86,7 @@ void given_session_is_accepted_when_receiver_complete_throws_error_on_both_calls publishMessageToReceiver(); doThrow(new ServiceBusException(new Exception(), ServiceBusErrorSource.UNKNOWN)) - .when(receiverClient).complete(receivedMessage); + .when(receiverClient).complete(receivedMessage); doThrow(new ServiceBusException(new Exception(), ServiceBusErrorSource.UNKNOWN)) .when(receiverClient).abandon(receivedMessage); @@ -167,4 +167,17 @@ private void publishMessageToReceiver() { when(receiverClient.receiveMessages(1)).thenReturn(new IterableStream<>(iterableStreamFlux)); } + @Test + void should_not_throw_npe_if_receiver_is_null() { + + when(sessionReceiverClient.acceptNextSession()).thenReturn(null); + + underTest.consumeMessage(sessionReceiverClient); + + verify(receiverClient, Mockito.times(0)).complete(receivedMessage); + verify(receiverClient, Mockito.times(0)).abandon(any()); + verify(receiverClient, Mockito.times(0)).deadLetter(any(), any()); + + } + }