Skip to content

Commit

Permalink
feat: [CHK-3347] Added logic to warmup lazy beans (#365)
Browse files Browse the repository at this point in the history
* added logic to warmup lazy beans

* spotless apply

* done

* fixed

* removed unused refs

* spotless apply

* chore: add warmup logging

* updated

* added more warmup stubs

* reused dummy checkpointer class

* all jsons ready

* warmup functions ready

* spotless apply

* junit test need more work

* updated unit tests

* updated unit tests

* spotless

* Update WarmupRequests.kt

* Update WarmupRequestsTest.kt

* Update ServicesWarmup.kt

* Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmupTest.kt

* fix

* Update StreamConfig.kt

* refactored used object

* Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesRequestsTest.kt

* Update WarmupRequests.kt

* Create EventUtils

* Update WarmupRequests.kt

* Create EventsUtil

* Delete src/test/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/utils/EventUtils

* fix

* Update EventsUtil.java

* fix

* Update src/main/kotlin/it/pagopa/ecommerce/eventdispatcher/warmup/ServicesWarmup.kt

Co-authored-by: Pietro Tota <[email protected]>

* fix

* fix: add tests mongo connection string parameter for connection timeout

* fix: add programmatic redis stream message receiver startup

---------

Co-authored-by: Pietro Tota <[email protected]>
Co-authored-by: Pietro Tota <[email protected]>
  • Loading branch information
3 people authored Jan 17, 2025
1 parent 9654a5a commit c4d6d68
Show file tree
Hide file tree
Showing 19 changed files with 687 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionclosureschannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueClosuresMessageSourceEndpoint")
fun storageQueueClosuresMessageSource(
Expand All @@ -33,6 +34,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionretryclosureschannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueRetryClosuresMessageSourceEndpoint")
fun storageQueueRetryClosuresMessageSource(
Expand All @@ -46,6 +48,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionexpiredchannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueExpirationsMessageSourceEndpoint")
fun storageQueueExpirationsMessageSource(
Expand All @@ -58,6 +61,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionrefundretrychannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueRefundRetryMessageSourceEndpoint")
fun storageQueueRefundRetryMessageSource(
Expand All @@ -70,6 +74,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionsrefundchannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueRefundMessageSourceEndpoint")
fun storageQueueRefundMessageSource(
Expand All @@ -82,6 +87,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionretrynotificationschannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueRetryNotificationsMessageSourceEndpoint")
fun storageQueueRetryNotificationsMessageSource(
Expand All @@ -96,6 +102,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionnotificationschannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueNotificationsMessageSourceEndpoint")
fun storageQueueNotificationsMessageSource(
Expand All @@ -108,6 +115,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionsauthorizationrequestedchannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueAuthorizationRequestedMessageSourceEndpoint")
fun storageQueueAuthorizationRequestedMessageSource(
Expand All @@ -121,6 +129,7 @@ class QueuesConsumerConfig {
@Bean
@InboundChannelAdapter(
channel = "transactionsauthorizationoutcomewaitingchannel",
autoStartup = "false",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "10")])
@EndpointId("storageQueueAuthorizationOutcomeWaitingMessageSourceEndpoint")
fun storageQueueAuthorizationOutcomeWaitingMessageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.springframework.data.redis.hash.Jackson2HashMapper
import org.springframework.data.redis.serializer.RedisSerializationContext
import org.springframework.data.redis.serializer.StringRedisSerializer
import org.springframework.data.redis.stream.StreamReceiver
import org.springframework.integration.annotation.EndpointId
import org.springframework.integration.annotation.InboundChannelAdapter
import org.springframework.integration.annotation.Poller

Expand All @@ -34,7 +35,7 @@ class StreamConfig {
EventDispatcherGenericCommand::class.java, EventDispatcherCommandMixin::class.java)
val streamReceiverOptions =
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.pollTimeout(Duration.ofMillis(1000))
.batchSize(1) // read one item per poll
.keySerializer(
RedisSerializationContext.SerializationPair.fromSerializer(StringRedisSerializer()))
Expand All @@ -51,7 +52,9 @@ class StreamConfig {
@Bean
@InboundChannelAdapter(
channel = "eventDispatcherReceiverCommandChannel",
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")])
poller = [Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")],
autoStartup = "false")
@EndpointId("eventDispatcherReceiverCommandChannelEndpoint")
fun eventDispatcherReceiverCommandMessageSource(
redisStreamReceiver: StreamReceiver<String, ObjectRecord<String, LinkedHashMap<*, *>>>,
eventDispatcherCommandsTemplateWrapper: EventDispatcherCommandsTemplateWrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent
import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionAuthorizationOutcomeWaitingEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -85,4 +88,9 @@ class TransactionAuthorizationOutcomeWaitingQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionAuthorizationOutcomeWaitingEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent
import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionAuthorizationRequestedEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -84,4 +87,9 @@ class TransactionAuthorizationRequestedQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionAuthorizationRequestedEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.*
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionClosureRequestedEvent
import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -126,4 +129,9 @@ class TransactionClosePaymentQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionClosureRequestedEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionClosureErrorEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -163,4 +166,10 @@ class TransactionClosePaymentRetryQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionClosureErrorEvent(), DummyCheckpointer, EmptyTransaction())
.block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.queues.v1.TransactionExpirationQueueConsumer as TransactionExpirationQueueConsumerV1
import it.pagopa.ecommerce.eventdispatcher.queues.v2.TransactionExpirationQueueConsumer as TransactionExpirationQueueConsumerV2
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionExpiredEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -171,4 +174,10 @@ class TransactionExpirationQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionExpiredEvent(), DummyCheckpointer, MessageHeaders(emptyMap()))
.block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionUserReceiptRequestedEvent
import it.pagopa.generated.notifications.templates.success.*
import java.util.*
import org.slf4j.Logger
Expand Down Expand Up @@ -115,4 +118,9 @@ class TransactionNotificationsQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionUserReceiptRequestedEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import it.pagopa.ecommerce.commons.queues.QueueEvent
import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.queues.*
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.generated.notifications.templates.success.*
import java.util.*
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionUserReceiptAddErrorEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -164,4 +164,9 @@ class TransactionNotificationsRetryQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionUserReceiptAddErrorEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionRefundRetriedEvent
import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -119,4 +122,9 @@ class TransactionRefundRetryQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionRefundRetriedEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import it.pagopa.ecommerce.commons.queues.TracingInfo
import it.pagopa.ecommerce.eventdispatcher.exceptions.InvalidEventException
import it.pagopa.ecommerce.eventdispatcher.utils.DeadLetterTracedQueueAsyncClient
import it.pagopa.ecommerce.eventdispatcher.warmup.annotations.WarmupFunction
import it.pagopa.ecommerce.payment.requests.warmup.utils.DummyCheckpointer
import it.pagopa.ecommerce.payment.requests.warmup.utils.WarmupRequests.getTransactionRefundRequestedEvent
import java.util.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -167,4 +170,9 @@ class TransactionsRefundQueueConsumer(
DeadLetterTracedQueueAsyncClient.PARSING_EVENT_ERROR_CONTEXT)
}
}

@WarmupFunction
fun warmupService() {
messageReceiver(getTransactionRefundRequestedEvent(), DummyCheckpointer).block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ fun <T> runTracedPipelineWithDeadLetterQueue(
spanName: String,
jsonSerializerProviderV2: StrictJsonSerializerProvider
): Mono<Unit> {
val nullTransactionId = "00000000000000000000000000000000" // null event ID used in warmup phase
val eventLogString = "${queueEvent.event.id}, transactionId: ${queueEvent.event.transactionId}"

val deadLetterPipeline =
Expand All @@ -995,14 +996,19 @@ fun <T> runTracedPipelineWithDeadLetterQueue(
else -> DeadLetterTracedQueueAsyncClient.ErrorCategory.PROCESSING_ERROR
}
logger.error("Exception processing event $eventLogString", pipelineException)
deadLetterTracedQueueAsyncClient.sendAndTraceDeadLetterQueueEvent(
binaryData = BinaryData.fromObject(queueEvent, jsonSerializerProviderV2.createInstance()),
errorContext =
DeadLetterTracedQueueAsyncClient.ErrorContext(
transactionId = TransactionId(queueEvent.event.transactionId),
transactionEventCode = queueEvent.event.eventCode,
errorCategory = errorCategory),
)
if (queueEvent.event.transactionId != nullTransactionId) {
deadLetterTracedQueueAsyncClient.sendAndTraceDeadLetterQueueEvent(
binaryData =
BinaryData.fromObject(queueEvent, jsonSerializerProviderV2.createInstance()),
errorContext =
DeadLetterTracedQueueAsyncClient.ErrorContext(
transactionId = TransactionId(queueEvent.event.transactionId),
transactionEventCode = queueEvent.event.eventCode,
errorCategory = errorCategory))
} else {
logger.info("Skipping dead letter queue for warmup event with null transaction ID")
Mono.just(Unit)
}
}

return tracingUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,11 @@ class InboundChannelAdapterLifecycleHandlerService(
.getBeansWithAnnotation(InboundChannelAdapter::class.java)
.filterNot { it.value is RedisStreamMessageSource }
.keys

fun invokeCommandForRedisStreamMessageSource(command: String) {
val controllerBusMessage =
MessageBuilder.createMessage(
"@eventDispatcherReceiverCommandChannelEndpoint.$command()", MessageHeaders(mapOf()))
controlBusInput.send(controllerBusMessage)
}
}
Loading

0 comments on commit c4d6d68

Please sign in to comment.