From c4d6d68f4a4cfa15601a3cc717f1d2fb0fe810c1 Mon Sep 17 00:00:00 2001 From: EmanueleBVtech Date: Fri, 17 Jan 2025 10:01:43 +0100 Subject: [PATCH] feat: [CHK-3347] Added logic to warmup lazy beans (#365) * added logic to warmup lazy beans * spotless apply * done * fixed * removed unused refs * spotless apply * chore: add warmup logging * updated * added more warmup stubs * reused dummy checkpointer class * all jsons ready * warmup functions ready * spotless apply * junit test need more work * updated unit tests * updated unit tests * spotless * Update WarmupRequests.kt * Update WarmupRequestsTest.kt * Update ServicesWarmup.kt * Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmupTest.kt * fix * Update StreamConfig.kt * refactored used object * Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesRequestsTest.kt * Update WarmupRequests.kt * Create EventUtils * Update WarmupRequests.kt * Create EventsUtil * Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventUtils * fix * Update EventsUtil.java * fix * Update src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt Co-authored-by: Pietro Tota <115724836+pietro-tota@users.noreply.github.com> * fix * fix: add tests mongo connection string parameter for connection timeout * fix: add programmatic redis stream message receiver startup --------- Co-authored-by: Pietro Tota Co-authored-by: Pietro Tota <115724836+pietro-tota@users.noreply.github.com> --- .../config/QueuesConsumerConfig.kt | 9 + .../config/redis/stream/StreamConfig.kt | 7 +- ...uthorizationOutcomeWaitingQueueConsumer.kt | 8 + ...tionAuthorizationRequestedQueueConsumer.kt | 8 + .../TransactionClosePaymentQueueConsumer.kt | 8 + ...ansactionClosePaymentRetryQueueConsumer.kt | 9 + .../TransactionExpirationQueueConsumer.kt | 9 + .../TransactionNotificationsQueueConsumer.kt | 8 + ...nsactionNotificationsRetryQueueConsumer.kt | 11 +- .../TransactionRefundRetryQueueConsumer.kt | 8 + .../queues/TransactionsRefundQueueConsumer.kt | 8 + .../eventdispatcher/queues/v2/common.kt | 22 +- ...ndChannelAdapterLifecycleHandlerService.kt | 7 + .../eventdispatcher/warmup/ServicesWarmup.kt | 78 ++++++ .../warmup/annotations/WarmupFunction.kt | 10 + .../warmup/utils/EventsUtil.java | 229 ++++++++++++++++++ .../warmup/utils/WarmupRequests.kt | 84 +++++++ .../warmup/utils/WarmupRequestsTest.kt | 176 ++++++++++++++ .../resources/application.test.properties | 2 +- 19 files changed, 687 insertions(+), 14 deletions(-) create mode 100644 src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt create mode 100644 src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/annotations/WarmupFunction.kt create mode 100644 src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventsUtil.java create mode 100644 src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequests.kt create mode 100644 src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequestsTest.kt diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/QueuesConsumerConfig.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/QueuesConsumerConfig.kt index c31916daa..38692fcdb 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/QueuesConsumerConfig.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/QueuesConsumerConfig.kt @@ -21,6 +21,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionclosureschannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueClosuresMessageSourceEndpoint") fun storageQueueClosuresMessageSource( @@ -33,6 +34,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionretryclosureschannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueRetryClosuresMessageSourceEndpoint") fun storageQueueRetryClosuresMessageSource( @@ -46,6 +48,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionexpiredchannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueExpirationsMessageSourceEndpoint") fun storageQueueExpirationsMessageSource( @@ -58,6 +61,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionrefundretrychannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueRefundRetryMessageSourceEndpoint") fun storageQueueRefundRetryMessageSource( @@ -70,6 +74,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionsrefundchannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueRefundMessageSourceEndpoint") fun storageQueueRefundMessageSource( @@ -82,6 +87,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionretrynotificationschannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueRetryNotificationsMessageSourceEndpoint") fun storageQueueRetryNotificationsMessageSource( @@ -96,6 +102,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionnotificationschannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueNotificationsMessageSourceEndpoint") fun storageQueueNotificationsMessageSource( @@ -108,6 +115,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionsauthorizationrequestedchannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueAuthorizationRequestedMessageSourceEndpoint") fun storageQueueAuthorizationRequestedMessageSource( @@ -121,6 +129,7 @@ class QueuesConsumerConfig { @Bean @InboundChannelAdapter( channel = "transactionsauthorizationoutcomewaitingchannel", + autoStartup = "false", poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")]) @EndpointId("storageQueueAuthorizationOutcomeWaitingMessageSourceEndpoint") fun storageQueueAuthorizationOutcomeWaitingMessageSource( diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/redis/stream/StreamConfig.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/redis/stream/StreamConfig.kt index 5617a5287..b95396553 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/redis/stream/StreamConfig.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/config/redis/stream/StreamConfig.kt @@ -14,6 +14,7 @@ import org.springframework.data.redis.hash.Jackson2HashMapper import org.springframework.data.redis.serializer.RedisSerializationContext import org.springframework.data.redis.serializer.StringRedisSerializer import org.springframework.data.redis.stream.StreamReceiver +import org.springframework.integration.annotation.EndpointId import org.springframework.integration.annotation.InboundChannelAdapter import org.springframework.integration.annotation.Poller @@ -34,7 +35,7 @@ class StreamConfig { EventDispatcherGenericCommand::class.java, EventDispatcherCommandMixin::class.java) val streamReceiverOptions = StreamReceiver.StreamReceiverOptions.builder() - .pollTimeout(Duration.ofMillis(100)) + .pollTimeout(Duration.ofMillis(1000)) .batchSize(1) // read one item per poll .keySerializer( RedisSerializationContext.SerializationPair.fromSerializer(StringRedisSerializer())) @@ -51,7 +52,9 @@ class StreamConfig { @Bean @InboundChannelAdapter( channel = "eventDispatcherReceiverCommandChannel", - poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")]) + poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")], + autoStartup = "false") + @EndpointId("eventDispatcherReceiverCommandChannelEndpoint") fun eventDispatcherReceiverCommandMessageSource( redisStreamReceiver: StreamReceiver>>, eventDispatcherCommandsTemplateWrapper: EventDispatcherCommandsTemplateWrapper, diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationOutcomeWaitingQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationOutcomeWaitingQueueConsumer.kt index 0f3061ada..c631acb26 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationOutcomeWaitingQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationOutcomeWaitingQueueConsumer.kt @@ -9,6 +9,9 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionAuthorizationOutcomeWaitingEvent import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -85,4 +88,9 @@ class TransactionAuthorizationOutcomeWaitingQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionAuthorizationOutcomeWaitingEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationRequestedQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationRequestedQueueConsumer.kt index 9a5fd4b74..792c053f2 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationRequestedQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionAuthorizationRequestedQueueConsumer.kt @@ -9,6 +9,9 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionAuthorizationRequestedEvent import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -84,4 +87,9 @@ class TransactionAuthorizationRequestedQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionAuthorizationRequestedEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentQueueConsumer.kt index ed8f9080a..4d1f7741a 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentQueueConsumer.kt @@ -14,6 +14,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.* import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionClosureRequestedEvent import java.util.* import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -126,4 +129,9 @@ class TransactionClosePaymentQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionClosureRequestedEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentRetryQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentRetryQueueConsumer.kt index 15d713961..dc3d3aa2e 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentRetryQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionClosePaymentRetryQueueConsumer.kt @@ -16,6 +16,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionClosureErrorEvent import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -163,4 +166,10 @@ class TransactionClosePaymentRetryQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionClosureErrorEvent(), DummyCheckpointer, EmptyTransaction()) + .block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionExpirationQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionExpirationQueueConsumer.kt index ca387ca7f..612840622 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionExpirationQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionExpirationQueueConsumer.kt @@ -17,6 +17,9 @@ import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.queues.v1.TransactionExpirationQueueConsumer as TransactionExpirationQueueConsumerV1 import it.pagopa.ecommerce.eventdispatcher.queues.v2.TransactionExpirationQueueConsumer as TransactionExpirationQueueConsumerV2 import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionExpiredEvent import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -171,4 +174,10 @@ class TransactionExpirationQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionExpiredEvent(), DummyCheckpointer, MessageHeaders(emptyMap())) + .block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsQueueConsumer.kt index c3efd5152..bd20f4cd7 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsQueueConsumer.kt @@ -14,6 +14,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionUserReceiptRequestedEvent import it.pagopa.generated.notifications.templates.success.* import java.util.* import org.slf4j.Logger @@ -115,4 +118,9 @@ class TransactionNotificationsQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionUserReceiptRequestedEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsRetryQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsRetryQueueConsumer.kt index 4a391eb19..8d0089fac 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsRetryQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionNotificationsRetryQueueConsumer.kt @@ -14,10 +14,10 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException -import it.pagopa.ecommerce.eventdispatcher.queues.* import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient -import it.pagopa.generated.notifications.templates.success.* -import java.util.* +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionUserReceiptAddErrorEvent import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired @@ -164,4 +164,9 @@ class TransactionNotificationsRetryQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionUserReceiptAddErrorEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionRefundRetryQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionRefundRetryQueueConsumer.kt index 42cfa756b..c3b727dc5 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionRefundRetryQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionRefundRetryQueueConsumer.kt @@ -12,6 +12,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionRefundRetriedEvent import java.util.* import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -119,4 +122,9 @@ class TransactionRefundRetryQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionRefundRetriedEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionsRefundQueueConsumer.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionsRefundQueueConsumer.kt index 001a20be5..d6bb99b6c 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionsRefundQueueConsumer.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/TransactionsRefundQueueConsumer.kt @@ -15,6 +15,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider import it.pagopa.ecommerce.commons.queues.TracingInfo import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionRefundRequestedEvent import java.util.* import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -167,4 +170,9 @@ class TransactionsRefundQueueConsumer( DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT) } } + + @WarmupFunction + fun warmupService() { + messageReceiver(getTransactionRefundRequestedEvent(), DummyCheckpointer).block() + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/v2/common.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/v2/common.kt index 11bc6e361..285f43f94 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/v2/common.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/queues/v2/common.kt @@ -976,6 +976,7 @@ fun runTracedPipelineWithDeadLetterQueue( spanName: String, jsonSerializerProviderV2: StrictJsonSerializerProvider ): Mono { + val nullTransactionId = "00000000000000000000000000000000" // null event ID used in warmup phase val eventLogString = "${queueEvent.event.id}, transactionId: ${queueEvent.event.transactionId}" val deadLetterPipeline = @@ -995,14 +996,19 @@ fun runTracedPipelineWithDeadLetterQueue( else -> DeadLetterTracedQueueAsyncClient.ErrorCategory.PROCESSING_ERROR } logger.error("Exception processing event $eventLogString", pipelineException) - deadLetterTracedQueueAsyncClient.sendAndTraceDeadLetterQueueEvent( - binaryData = BinaryData.fromObject(queueEvent, jsonSerializerProviderV2.createInstance()), - errorContext = - DeadLetterTracedQueueAsyncClient.ErrorContext( - transactionId = TransactionId(queueEvent.event.transactionId), - transactionEventCode = queueEvent.event.eventCode, - errorCategory = errorCategory), - ) + if (queueEvent.event.transactionId != nullTransactionId) { + deadLetterTracedQueueAsyncClient.sendAndTraceDeadLetterQueueEvent( + binaryData = + BinaryData.fromObject(queueEvent, jsonSerializerProviderV2.createInstance()), + errorContext = + DeadLetterTracedQueueAsyncClient.ErrorContext( + transactionId = TransactionId(queueEvent.event.transactionId), + transactionEventCode = queueEvent.event.eventCode, + errorCategory = errorCategory)) + } else { + logger.info("Skipping dead letter queue for warmup event with null transaction ID") + Mono.just(Unit) + } } return tracingUtils diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/services/InboundChannelAdapterLifecycleHandlerService.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/services/InboundChannelAdapterLifecycleHandlerService.kt index dfca90319..1d6d7ad31 100644 --- a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/services/InboundChannelAdapterLifecycleHandlerService.kt +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/services/InboundChannelAdapterLifecycleHandlerService.kt @@ -65,4 +65,11 @@ class InboundChannelAdapterLifecycleHandlerService( .getBeansWithAnnotation(InboundChannelAdapter::class.java) .filterNot { it.value is RedisStreamMessageSource } .keys + + fun invokeCommandForRedisStreamMessageSource(command: String) { + val controllerBusMessage = + MessageBuilder.createMessage( + "@eventDispatcherReceiverCommandChannelEndpoint.$command()", MessageHeaders(mapOf())) + controlBusInput.send(controllerBusMessage) + } } diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt new file mode 100644 index 000000000..34105d2bd --- /dev/null +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt @@ -0,0 +1,78 @@ +package it.pagopa.ecommerce.eventdispatcher.warmup + +import it.pagopa.ecommerce.eventdispatcher.services.InboundChannelAdapterLifecycleHandlerService +import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction +import kotlin.reflect.full.declaredMemberFunctions +import kotlin.reflect.full.hasAnnotation +import kotlin.system.measureTimeMillis +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.ApplicationListener +import org.springframework.context.event.ContextRefreshedEvent +import org.springframework.stereotype.Component +import org.springframework.stereotype.Service +import org.springframework.util.ClassUtils + +@Component +class ServicesWarmup( + @Autowired + private val inboundChannelAdapterLifecycleHandlerService: + InboundChannelAdapterLifecycleHandlerService +) : ApplicationListener { + + private val logger = LoggerFactory.getLogger(this.javaClass) + + override fun onApplicationEvent(event: ContextRefreshedEvent) { + val eventReceiverServices = + event.applicationContext + .getBeansWithAnnotation(Service::class.java) + .map { it.value } + .filter { service -> + service.javaClass.kotlin.declaredMemberFunctions.any { + it.hasAnnotation() + } + } + logger.info("Found services: [{}]", eventReceiverServices.size) + + try { + eventReceiverServices.forEach(this::warmUpService) + } catch (e: Exception) { + logger.error("Exception during service warm-up", e) + } finally { + inboundChannelAdapterLifecycleHandlerService.invokeCommandForAllEndpoints("start") + inboundChannelAdapterLifecycleHandlerService.invokeCommandForRedisStreamMessageSource("start") + } + } + + private fun warmUpService(serviceToWarmUpInstance: Any) { + var warmUpMethods = 0 + val serviceToWarmUpKClass = ClassUtils.getUserClass(serviceToWarmUpInstance).kotlin + val elapsedTime = measureTimeMillis { + runCatching { + serviceToWarmUpKClass.declaredMemberFunctions + .filter { it.hasAnnotation() } + .forEach { + warmUpMethods++ + val result: Result<*> + val intertime = measureTimeMillis { + result = runCatching { + logger.info("Invoking function: [{}]", it.toString()) + it.call(serviceToWarmUpInstance) + } + } + logger.info( + "Warmup function: [{}] -> elapsed time: [{}]. Is ok: [{}] ", + it.toString(), + intertime, + result) + } + } + .getOrElse { logger.error("Exception performing service warm up ", it) } + } + logger.info( + "service: [{}] warm-up executed functions: [{}], elapsed time: [{}] ms", + serviceToWarmUpKClass, + warmUpMethods, + elapsedTime) + } +} diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/annotations/WarmupFunction.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/annotations/WarmupFunction.kt new file mode 100644 index 000000000..5ebf0d200 --- /dev/null +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/annotations/WarmupFunction.kt @@ -0,0 +1,10 @@ +package it.pagopa.ecommerce.eventdispatcher.warmup.annotations + +@Target(AnnotationTarget.FUNCTION) +@Retention(AnnotationRetention.RUNTIME) +/** + * Annotation used to annotate a queue service function to be called during module warm-up phase. + * Warm-up function can be used to simulate a message v2 received from the queue in order to + * initialize all it's resource before the module being ready to serve requests + */ +annotation class WarmupFunction {} diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventsUtil.java b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventsUtil.java new file mode 100644 index 000000000..0fe6a4e37 --- /dev/null +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventsUtil.java @@ -0,0 +1,229 @@ +package it.pagopa.ecommerce.eventdispatcher.warmup.utils; +import it.pagopa.ecommerce.commons.documents.v2.*; +import it.pagopa.ecommerce.commons.documents.v2.authorization.NpgTransactionGatewayAuthorizationRequestedData; +import it.pagopa.ecommerce.commons.domain.v2.TransactionEventCode; +import it.pagopa.ecommerce.commons.generated.server.model.TransactionStatusDto; +import it.pagopa.ecommerce.commons.queues.TracingInfo; +import it.pagopa.ecommerce.commons.queues.TracingUtils; +import org.springframework.http.HttpStatus; +import java.net.URI; + +public class EventsUtil { + + public static TransactionAuthorizationRequestedEvent getTransactionAuthorizationRequestedEventObject() { + // Create the event object + TransactionAuthorizationRequestedEvent event = new TransactionAuthorizationRequestedEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Initialize the data object for the event + TransactionAuthorizationRequestData data = new TransactionAuthorizationRequestData( + 1000, // amount + 50, // fee + "instrumentId", // paymentInstrumentId + "pspId", // pspId + "CP", // paymentTypeCode + "brokerName", // brokerName + "pspChannelCode", // pspChannelCode + "paymentMethodName", // paymentMethodName + "pspBusinessName", // pspBusinessName + true, // isPspOnUs + "authorizationRequestId", // authorizationRequestId + TransactionAuthorizationRequestData.PaymentGateway.NPG, // paymentGateway + "paymentMethodDescription", // paymentMethodDescription + new NpgTransactionGatewayAuthorizationRequestedData( + URI.create("http://test.com"), // URI + "VISA", // cardType + "NPG_SESSION_ID", // sessionId + "NPG_CONFIRM_PAYMENT_SESSION_ID", // confirmSessionId + null // transactionData (nullable) + ), + "idBundle" // idBundle + ); + + // Set the event data + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_AUTHORIZATION_REQUESTED_EVENT.toString()); + + return event; + } + + public static TransactionAuthorizationOutcomeWaitingEvent getTransactionAuthorizationOutcomeWaitingEventObject() { + // Create the event object + TransactionAuthorizationOutcomeWaitingEvent event = new TransactionAuthorizationOutcomeWaitingEvent(); + + // Set properties + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Initialize the data object for the event + TransactionRetriedData data = new TransactionRetriedData(1); + + // Set the event data + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_AUTHORIZATION_OUTCOME_WAITING_EVENT.toString()); + + return event; + } + + + public static TransactionUserReceiptAddErrorEvent getTransactionUserReceiptAddErrorEventObject() { + // Create the event object + TransactionUserReceiptAddErrorEvent event = new TransactionUserReceiptAddErrorEvent(); + + // Set properties + event.setId("12345678-1234-1234-1234-123456789012"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-13T10:26:33.000Z[Etc/UTC]"); + + // Initialize the data object for the event + TransactionUserReceiptData data = new TransactionUserReceiptData(); + data.setLanguage("en"); + data.setPaymentDate("2025-01-12T10:00:00.000Z"); + data.setResponseOutcome(TransactionUserReceiptData.Outcome.OK); // Ensure Outcome is set correctly + + // Set the data and event code in the event object + event.setData(data); + event.setEventCode(TransactionEventCode.TRANSACTION_ADD_USER_RECEIPT_ERROR_EVENT.toString()); + + return event; + } + + public static TransactionClosureRequestedEvent getTransactionClosureRequestedEventObject() { + // Create the event object + TransactionClosureRequestedEvent event = new TransactionClosureRequestedEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_CLOSURE_REQUESTED_EVENT.toString()); + + return event; + } + + public static TransactionClosureErrorEvent getTransactionClosureErrorEventObject() { + // Create the event object + TransactionClosureErrorEvent event = new TransactionClosureErrorEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Create and set the data object + ClosureErrorData data = new ClosureErrorData(); + data.setHttpErrorCode(HttpStatus.INTERNAL_SERVER_ERROR); // Set the HTTP error code + data.setErrorDescription("Sample error message"); // Set the error description + data.setErrorType(ClosureErrorData.ErrorType.KO_RESPONSE_RECEIVED); // Set the error type + + // Set the data object in the event + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_CLOSURE_ERROR_EVENT.toString()); + + return event; + } + + public static TransactionExpiredEvent getTransactionExpiredEventObject() { + // Create the event object + TransactionExpiredEvent event = new TransactionExpiredEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Create and set the data object + TransactionExpiredData data = new TransactionExpiredData(); + data.setStatusBeforeExpiration(TransactionStatusDto.REFUND_REQUESTED); // Set the status before expiration + + // Set the data in the event + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_EXPIRED_EVENT.toString()); + + return event; + } + + public static TransactionUserReceiptRequestedEvent getTransactionUserReceiptRequestedEventObject() { + // Create the event object + TransactionUserReceiptRequestedEvent event = new TransactionUserReceiptRequestedEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Create and set the data object + TransactionUserReceiptData data = new TransactionUserReceiptData(); + data.setResponseOutcome(TransactionUserReceiptData.Outcome.OK); // Set the response outcome + data.setLanguage("en"); // Set the language + data.setPaymentDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); // Set the payment date + + // Set the data in the event + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_USER_RECEIPT_REQUESTED_EVENT.toString()); + + return event; + } + + public static TransactionRefundRetriedEvent getTransactionRefundRetriedEventObject() { + // Create the event object + TransactionRefundRetriedEvent event = new TransactionRefundRetriedEvent(); + + // Set properties + event.setId("7ee814b9-8bb8-4f61-9204-2aa55cb56773"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-10T14:28:47.843515440Z[Etc/UTC]"); + + // Create and set the data object + TransactionRefundRetriedData data = new TransactionRefundRetriedData(); + data.setTransactionGatewayAuthorizationData(null); // Set transactionGatewayAuthorizationData to null + data.setRetryCount(1); // Set the retry count + + // Set the data in the event + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_REFUND_RETRIED_EVENT.toString()); + + return event; + } + + public static TransactionRefundRequestedEvent getTransactionRefundRequestedEventObject() { + // Create the event object + TransactionRefundRequestedEvent event = new TransactionRefundRequestedEvent(); + + // Set properties + event.setId("abcdef12-3456-7890-abcd-ef1234567890"); + event.setTransactionId("00000000000000000000000000000000"); + event.setCreationDate("2025-01-13T10:30:00.000Z[Etc/UTC]"); + + // Create and set the data object + TransactionRefundRequestedData data = new TransactionRefundRequestedData(); + data.setGatewayAuthData(null); // Set gatewayAuthData to null + data.setStatusBeforeRefunded(TransactionStatusDto.AUTHORIZATION_REQUESTED); // Set status before refunded + + // Set the data in the event + event.setData(data); + + // Set the event code + event.setEventCode(TransactionEventCode.TRANSACTION_REFUND_REQUESTED_EVENT.toString()); + + return event; + } +} diff --git a/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequests.kt b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequests.kt new file mode 100644 index 000000000..f71742646 --- /dev/null +++ b/src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequests.kt @@ -0,0 +1,84 @@ +package it.pagopa.ecommerce.payment.requests.warmup.utils + +import com.azure.spring.messaging.checkpoint.Checkpointer +import it.pagopa.ecommerce.commons.documents.BaseTransactionEvent +import it.pagopa.ecommerce.commons.queues.QueueEvent +import it.pagopa.ecommerce.eventdispatcher.config.QueuesConsumerConfig +import it.pagopa.ecommerce.eventdispatcher.warmup.utils.EventsUtil +import reactor.core.publisher.Mono + +object DummyCheckpointer : Checkpointer { + override fun success(): Mono = Mono.empty() + override fun failure(): Mono = Mono.empty() +} + +object WarmupRequests { + + private val queuesConsumerConfig = QueuesConsumerConfig() + val strictSerializerProviderV2 = queuesConsumerConfig.strictSerializerProviderV2() + + fun getTransactionAuthorizationOutcomeWaitingEvent(): ByteArray { + val event = EventsUtil.getTransactionAuthorizationOutcomeWaitingEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionUserReceiptAddErrorEvent(): ByteArray { + val event = EventsUtil.getTransactionUserReceiptAddErrorEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionAuthorizationRequestedEvent(): ByteArray { + val event = EventsUtil.getTransactionAuthorizationRequestedEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionClosureRequestedEvent(): ByteArray { + val event = EventsUtil.getTransactionClosureRequestedEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionClosureErrorEvent(): ByteArray { + val event = EventsUtil.getTransactionClosureErrorEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionExpiredEvent(): ByteArray { + val event = EventsUtil.getTransactionExpiredEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionUserReceiptRequestedEvent(): ByteArray { + val event = EventsUtil.getTransactionUserReceiptRequestedEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionRefundRetriedEvent(): ByteArray { + val event = EventsUtil.getTransactionRefundRetriedEventObject() + return traceAndSerializeEvent(event) + } + + fun getTransactionRefundRequestedEvent(): ByteArray { + val event = EventsUtil.getTransactionRefundRequestedEventObject() + return traceAndSerializeEvent(event) + } + + private fun traceAndSerializeEvent(event: BaseTransactionEvent<*>): ByteArray { + val queueEvent = QueueEvent(event, null) + val objectMapper = strictSerializerProviderV2.objectMapper + var jsonString = objectMapper.writeValueAsString(queueEvent) + + // Replace the "tracingInfo": null with the desired structure + val tracingInfoReplacement = + """ + "tracingInfo": { + "traceparent": "00-5868efa082297543570dafff7d53c70b-56f1d9262e6ee6cf-00", + "tracestate": null, + "baggage": null + } + """.trimIndent() + + // Use regular expression or string replacement to perform the substitution + jsonString = jsonString.replace("\"tracingInfo\":null", tracingInfoReplacement) + return jsonString.toByteArray() + } +} diff --git a/src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequestsTest.kt b/src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequestsTest.kt new file mode 100644 index 000000000..a765160a5 --- /dev/null +++ b/src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/WarmupRequestsTest.kt @@ -0,0 +1,176 @@ +package it.pagopa.ecommerce.eventdispatcher.warmup.utils + +import it.pagopa.ecommerce.commons.documents.v2.TransactionAuthorizationRequestedEvent as TransactionAuthorizationRequestedEventV2 +import it.pagopa.ecommerce.commons.documents.v2.TransactionClosureErrorEvent as TransactionClosureErrorEventV2 +import it.pagopa.ecommerce.commons.documents.v2.TransactionClosureRequestedEvent as TransactionClosureRequestedEventV2 +import it.pagopa.ecommerce.commons.documents.v2.TransactionExpiredEvent as TransactionExpiredEventV2 +import it.pagopa.ecommerce.commons.documents.v2.TransactionRefundRequestedEvent +import it.pagopa.ecommerce.commons.documents.v2.TransactionRefundRetriedEvent as TransactionRefundRetriedEventV2 +import it.pagopa.ecommerce.commons.documents.v2.TransactionUserReceiptAddErrorEvent +import it.pagopa.ecommerce.eventdispatcher.config.QueuesConsumerConfig +import it.pagopa.ecommerce.eventdispatcher.queues.* +import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient +import it.pagopa.ecommerce.eventdispatcher.validation.BeanValidationConfiguration +import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.mockito.kotlin.mock +import org.springframework.context.annotation.Import +import org.springframework.test.context.TestPropertySource +import reactor.test.StepVerifier + +@Import(BeanValidationConfiguration::class) +@TestPropertySource(locations = ["classpath:application.test.properties"]) +class WarmupRequestsTest { + + private val deadLetterTracedQueueAsyncClient: DeadLetterTracedQueueAsyncClient = mock() + private val queuesConsumerConfig = QueuesConsumerConfig() + + val strictSerializerProviderV1 = queuesConsumerConfig.strictSerializerProviderV1() + val strictSerializerProviderV2 = queuesConsumerConfig.strictSerializerProviderV2() + + private val transactionRefundedEventsConsumer = + TransactionsRefundQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionRefundedRetryEventsConsumer = + TransactionRefundRetryQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionNotificationsRetryQueueConsumer = + TransactionNotificationsRetryQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionClosePaymentRetryQueueConsumer = + TransactionClosePaymentRetryQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionClosePaymentQueueConsumer = + TransactionClosePaymentQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionExpirationQueueConsumer = + TransactionExpirationQueueConsumer( + queueConsumerV1 = mock(), + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV1 = strictSerializerProviderV1, + strictSerializerProviderV2 = strictSerializerProviderV2) + + private val transactionAuthorizationRequestedQueueConsumer = + TransactionAuthorizationRequestedQueueConsumer( + queueConsumerV2 = mock(), + deadLetterTracedQueueAsyncClient = deadLetterTracedQueueAsyncClient, + strictSerializerProviderV2 = strictSerializerProviderV2) + + @Test + fun `test parse TransactionUserReceiptAddErrorEvent`() { + val payload = WarmupRequests.getTransactionUserReceiptAddErrorEvent() + val result = transactionNotificationsRetryQueueConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionUserReceiptAddErrorEvent::class.java, event.first::class.java) + assertEquals("TRANSACTION_ADD_USER_RECEIPT_ERROR_EVENT", event.first.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionAuthorizationRequestedEventV2`() { + val payload = WarmupRequests.getTransactionAuthorizationRequestedEvent() + val result = transactionAuthorizationRequestedQueueConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionAuthorizationRequestedEventV2::class.java, event.event::class.java) + assertEquals("TRANSACTION_AUTHORIZATION_REQUESTED_EVENT", event.event.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionClosureRequestedEventV2`() { + val payload = WarmupRequests.getTransactionClosureRequestedEvent() + val result = transactionClosePaymentQueueConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionClosureRequestedEventV2::class.java, event.first::class.java) + assertEquals("TRANSACTION_CLOSURE_REQUESTED_EVENT", event.first.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionClosureErrorEventV2`() { + val payload = WarmupRequests.getTransactionClosureErrorEvent() + val result = transactionClosePaymentRetryQueueConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionClosureErrorEventV2::class.java, event.first::class.java) + assertEquals("TRANSACTION_CLOSURE_ERROR_EVENT", event.first.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionExpiredEventV2`() { + val payload = WarmupRequests.getTransactionExpiredEvent() + val result = transactionExpirationQueueConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionExpiredEventV2::class.java, event.first::class.java) + assertEquals("TRANSACTION_EXPIRED_EVENT", event.first.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionRefundRequestedEvent`() { + val payload = WarmupRequests.getTransactionRefundRequestedEvent() + val result = transactionRefundedEventsConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionRefundRequestedEvent::class.java, event.first::class.java) + assertEquals("TRANSACTION_REFUND_REQUESTED_EVENT", event.first.eventCode) + } + .verifyComplete() + } + + @Test + fun `test parse TransactionRefundRetriedEventV2`() { + val payload = WarmupRequests.getTransactionRefundRetriedEvent() + val result = transactionRefundedRetryEventsConsumer.parseEvent(payload) + + StepVerifier.create(result) + .assertNext { event -> + assertEquals(TransactionRefundRetriedEventV2::class.java, event.first::class.java) + assertEquals("TRANSACTION_REFUND_RETRIED_EVENT", event.first.eventCode) + } + .verifyComplete() + } +} diff --git a/src/test/resources/application.test.properties b/src/test/resources/application.test.properties index a4ae82126..855df6dbc 100644 --- a/src/test/resources/application.test.properties +++ b/src/test/resources/application.test.properties @@ -21,7 +21,7 @@ spring.redis.host=redis spring.redis.password=redis spring.redis.port=9999 spring.redis.ssl=false -spring.data.mongodb.uri=mongodb://mongo:mongo@$mongo:6080/ +spring.data.mongodb.uri=mongodb://mongo:mongo@$mongo:6080/?&readPreference=primaryPreferred&maxStalenessSeconds=90&minPoolSize=0&maxPoolSize=10&maxIdleTimeMS=100&connectTimeoutMS=100&socketTimeoutMS=100&serverSelectionTimeoutMS=100&waitQueueTimeoutMS=100&heartbeatFrequencyMS=100 spring.data.mongodb.database=ecommerce server.port=9000 management.endpoint.health.probes.enabled=true