From 43dcf8f15e417d2595fa0507da73363361632550 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 22 May 2024 15:30:03 -0400 Subject: [PATCH 01/16] Initial commit on DASB-421 and DASB-428 --- .../ocio/processingstatusapi/Application.kt | 1 + .../ReportManagerConfig.kt | 1 + .../cosmos/CosmosContainerManager.kt | 6 +- .../cosmos/CosmosRepository.kt | 2 + .../processingstatusapi/plugins/ServiceBus.kt | 101 ++++++++++++++++-- .../plugins/ServiceBusProcessor.kt | 93 ++++++++++------ .../src/main/ps1/SendMessageToQueue.ps1 | 23 ++++ .../src/main/resources/application.conf | 2 + 8 files changed, 186 insertions(+), 43 deletions(-) create mode 100644 pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt index 489b3c56..1c69b8c1 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt @@ -16,6 +16,7 @@ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinAp val uri = environment.config.property("azure.cosmos_db.client.endpoint").getString() val authKey = environment.config.property("azure.cosmos_db.client.key").getString() single { CosmosRepository(uri, authKey, "Reports", "/uploadId") } + single { CosmosRepository(uri, authKey, "Reports-DeadLetter", "/uploadId") } } return modules(listOf(cosmosModule)) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManagerConfig.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManagerConfig.kt index c899bf2b..56abb9d4 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManagerConfig.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManagerConfig.kt @@ -8,5 +8,6 @@ package gov.cdc.ocio.processingstatusapi */ class ReportManagerConfig { val reportsContainerName = "Reports" + val reportsDeadLetterContainerName = "Reports-DeadLetter" private val partitionKey = "/uploadId" } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt index 0e214e44..8043a60d 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt @@ -17,8 +17,10 @@ class CosmosContainerManager { val logger = KotlinLogging.logger {} logger.info("Create database $databaseName if not exists...") // Create database if not exists - val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) - return cosmosClient.getDatabase(databaseResponse.properties.id) + // val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) + // return cosmosClient.getDatabase(databaseResponse.properties.id) + + return cosmosClient.getDatabase(databaseName) } fun initDatabaseContainer(uri: String, authKey: String, containerName: String, partitionKey: String): CosmosContainer? { diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt index 28953ba8..80c21451 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt @@ -6,4 +6,6 @@ class CosmosRepository(uri: String, authKey: String, reportsContainerName: Strin val reportsContainer = CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! + + } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index fe716d63..21e9cb06 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -1,11 +1,18 @@ package gov.cdc.ocio.processingstatusapi.plugins +import com.azure.core.amqp.exception.AmqpException +import com.azure.core.exception.AzureException import com.azure.messaging.servicebus.* +import com.azure.messaging.servicebus.models.DeadLetterOptions +import gov.cdc.ocio.processingstatusapi.cosmos.CosmosContainerManager +import gov.cdc.ocio.processingstatusapi.cosmos.CosmosRepository import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* import io.ktor.util.logging.* +import io.netty.channel.ConnectTimeoutException +import org.apache.qpid.proton.engine.TransportException import java.util.concurrent.TimeUnit internal val LOGGER = KtorSimpleLogger("pstatus-report-sink") @@ -13,6 +20,8 @@ internal val LOGGER = KtorSimpleLogger("pstatus-report-sink") class AzureServiceBusConfiguration(config: ApplicationConfig) { var connectionString: String = config.tryGetString("connection_string") ?: "" var queueName: String = config.tryGetString("queue_name") ?: "" + var topicName: String = config.tryGetString("topic_name") ?: "" + var subscriptionName: String = config.tryGetString("subscription_name") ?: "" } val AzureServiceBus = createApplicationPlugin( @@ -22,8 +31,11 @@ val AzureServiceBus = createApplicationPlugin( val connectionString = pluginConfig.connectionString val queueName = pluginConfig.queueName + val topicName = pluginConfig.topicName + val subscriptionName = pluginConfig.subscriptionName - val processorClient by lazy { +// Initialize Service Bus client for queue + val processorQueueClient by lazy { ServiceBusClientBuilder() .connectionString(connectionString) .processor() @@ -33,31 +45,89 @@ val AzureServiceBus = createApplicationPlugin( .buildProcessorClient() } + // Initialize Service Bus client for topic + val processorTopicClient by lazy { + ServiceBusClientBuilder() + .connectionString(connectionString) + .processor() + .topicName(topicName) + .subscriptionName(subscriptionName) + .processMessage{ context -> processMessage(context) } + .processError { context -> processError(context) } + .buildProcessorClient() + } + // handles received messages @Throws(InterruptedException::class) fun receiveMessages() { - // Create an instance of the processor through the ServiceBusClientBuilder - println("Starting the Azure service bus processor") - println("connectionString = $connectionString, queueName = $queueName") - processorClient.start() + try { + // Create an instance of the processor through the ServiceBusClientBuilder + println("Starting the Azure service bus processor") + println("connectionString = $connectionString, queueName = $queueName, topicName= $topicName, subscriptionName=$subscriptionName") + processorQueueClient.start() + processorTopicClient.start() + } + + catch (e:AmqpException){ + println("Non-ServiceBusException occurred : ${e.message}") + } + catch (e:TransportException){ + println("Non-ServiceBusException occurred : ${e.message}") + } + + catch (e:Exception){ + println("Non-ServiceBusException occurred : ${e.message}") + } + + } + + fun sendMessage() { + val senderClient = ServiceBusClientBuilder() + .connectionString(connectionString) + .sender() + .queueName(queueName) + .buildClient() + try { + val message = ServiceBusMessage("Hello, Service Bus!") + senderClient.sendMessage(message) + println("Message sent to the queue.") + + } + catch (e:AmqpException){ + println("Non-ServiceBusException occurred : ${e.message}") + } + catch (e:TransportException){ + println("Non-ServiceBusException occurred : ${e.message}") + } + + catch (e:Exception){ + println("Non-ServiceBusException occurred : ${e.message}") + } + finally { + senderClient.close() + } } on(MonitoringEvent(ApplicationStarted)) { application -> application.log.info("Server is started") - receiveMessages() + receiveMessages() + // sendMessage() } on(MonitoringEvent(ApplicationStopped)) { application -> application.log.info("Server is stopped") println("Stopping and closing the processor") - processorClient.close() + processorQueueClient.close() + processorTopicClient.close() // Release resources and unsubscribe from events application.environment.monitor.unsubscribe(ApplicationStarted) {} application.environment.monitor.unsubscribe(ApplicationStopped) {} } } + private fun processMessage(context: ServiceBusReceivedMessageContext) { val message = context.message + LOGGER.trace( "Processing message. Session: {}, Sequence #: {}. Contents: {}", message.messageId, @@ -68,11 +138,24 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { ServiceBusProcessor().withMessage(message.body.toString()) } catch (e: BadRequestException) { LOGGER.warn("Unable to parse the message: {}", e.localizedMessage) - } catch (e: Exception) { + } + catch (e: IllegalArgumentException) { // TODO : Is this the only exception at this time or more generic one??? + LOGGER.warn("Message rejected: {}", e.localizedMessage) + //Writing to deadletter + // TODO : Will this do it for queue and topic based on the context. + // TODO : Should this be "ValidationError" or something generic + context.deadLetter(DeadLetterOptions().setDeadLetterReason("ValidationError").setDeadLetterErrorDescription(e.message)) + + LOGGER.info("Message sent to the dead-letter queue.") + } + catch (e: Exception) { LOGGER.warn("Failed to process service bus message: {}", e.localizedMessage) } + } + + private fun processError(context: ServiceBusErrorContext) { System.out.printf( "Error when receiving messages from namespace: '%s'. Entity: '%s'%n", @@ -106,6 +189,8 @@ private fun processError(context: ServiceBusErrorContext) { } } + + fun Application.serviceBusModule() { install(AzureServiceBus) { // any additional configuration goes here diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index a4c294d4..7c4b9974 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -1,5 +1,8 @@ package gov.cdc.ocio.processingstatusapi.plugins +import com.azure.messaging.servicebus.ServiceBusClientBuilder +import com.azure.messaging.servicebus.ServiceBusReceivedMessage +import com.azure.messaging.servicebus.models.DeadLetterOptions import com.google.gson.GsonBuilder import com.google.gson.JsonSyntaxException import com.google.gson.ToNumberPolicy @@ -45,7 +48,12 @@ class ServiceBusProcessor { } logger.info { "After Message received = $sbMessage" } createReport(gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) - } catch (e: JsonSyntaxException) { + } + catch (e:IllegalArgumentException){ + println("Validation failed: ${e.message}") + throw e + } + catch (e: JsonSyntaxException) { logger.error("Failed to parse CreateReportSBMessage: ${e.localizedMessage}") throw BadStateException("Unable to interpret the create report message") } @@ -59,44 +67,63 @@ class ServiceBusProcessor { */ @Throws(BadRequestException::class) private fun createReport(createReportMessage: CreateReportSBMessage) { + try { + validateReport(createReportMessage) + val uploadId = createReportMessage.uploadId + val stageName = createReportMessage.stageName + logger.info("Creating report for uploadId = ${uploadId} with stageName = $stageName") + ReportManager().createReportWithUploadId( + createReportMessage.uploadId!!, + createReportMessage.dataStreamId!!, + createReportMessage.dataStreamRoute!!, + createReportMessage.stageName!!, + createReportMessage.contentType!!, + createReportMessage.messageId!!, + createReportMessage.status, + createReportMessage.contentAsString!!, // it was Content I changed to ContentAsString + createReportMessage.dispositionType, + Source.SERVICEBUS + ) + } + catch (e:IllegalArgumentException){ + throw e + } + catch (e: Exception) { + println("Failed to process service bus message:${e.message}") - val uploadId = createReportMessage.uploadId - ?: throw BadRequestException("Missing required field upload_id") - - val dataStreamId = createReportMessage.dataStreamId - ?: throw BadRequestException("Missing required field data_stream_id") - - val dataStreamRoute = createReportMessage.dataStreamRoute - ?: throw BadRequestException("Missing required field data_stream_route") + } - val stageName = createReportMessage.stageName - ?: throw BadRequestException("Missing required field stage_name") + } - val contentType = createReportMessage.contentType - ?: throw BadRequestException("Missing required field content_type") + // Function to validate report + private fun validateReport(createReportMessage: CreateReportSBMessage) { + val missingFields = mutableListOf() - val content: String - try { - content = createReportMessage.contentAsString - ?: throw BadRequestException("Missing required field content") - } catch (ex: BadStateException) { - // assume a bad request - throw BadRequestException(ex.localizedMessage) + if (createReportMessage.uploadId.isNullOrBlank()) { + missingFields.add("uploadId") + } + if (createReportMessage.dataStreamId.isNullOrBlank()) { + missingFields.add("dataStreamId") + } + if (createReportMessage.dataStreamRoute.isNullOrBlank()) { + missingFields.add("dataStreamRoute") + } + if (createReportMessage.stageName.isNullOrBlank()) { + missingFields.add("stageName") + } + if (createReportMessage.contentType.isNullOrBlank()) { + missingFields.add("contentType") + } + if (createReportMessage.contentAsString.isNullOrBlank()) { + missingFields.add("content") } - logger.info("Creating report for uploadId = $uploadId with stageName = $stageName") - ReportManager().createReportWithUploadId( - uploadId, - dataStreamId, - dataStreamRoute, - stageName, - contentType, - createReportMessage.messageId, - createReportMessage.status, - content, - createReportMessage.dispositionType, - Source.SERVICEBUS - ) + if (missingFields.isNotEmpty()) { + val reason ="Missing fields: ${missingFields.joinToString(", ")}" + throw IllegalArgumentException(reason) + } } + + } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 b/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 new file mode 100644 index 00000000..89a7868b --- /dev/null +++ b/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 @@ -0,0 +1,23 @@ +# Set the parameters for the Service Bus +$connectionString = "Endpoint=sb://ocio-ede-dev-processingstatus-testing.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=EkH3H/i66rBE+xTmTdIqMFmlxzYmsteP7+ASbJMhc5w=" +$queueName = "reports-notifications-queue" + +# Import the necessary assemblies +Add-Type -AssemblyName "System.ServiceModel" +Add-Type -Path (Join-Path -Path (Split-Path $env:PSModulePath -Parent) -ChildPath 'Azure.ServiceBus\2.1.1\lib\netstandard2.0\Microsoft.Azure.ServiceBus.dll') + +# Create a queue client +$queueClient = [Microsoft.Azure.ServiceBus.QueueClient]::new($connectionString, $queueName) + +# Create a message +$messageBody = "Hello, Azure Service Bus!" +$message = [Microsoft.Azure.ServiceBus.Message]::new([System.Text.Encoding]::UTF8.GetBytes($messageBody)) + +# Send the message to the queue +$queueClient.SendAsync($message).GetAwaiter().GetResult() + +# Print confirmation +Write-Output "Sent message: $messageBody" + +# Close the client +$queueClient.CloseAsync().GetAwaiter().GetResult() diff --git a/pstatus-report-sink-ktor/src/main/resources/application.conf b/pstatus-report-sink-ktor/src/main/resources/application.conf index 1a143ca4..9f1db1db 100644 --- a/pstatus-report-sink-ktor/src/main/resources/application.conf +++ b/pstatus-report-sink-ktor/src/main/resources/application.conf @@ -13,6 +13,8 @@ azure { service_bus { connection_string = ${SERVICE_BUS_CONNECTION_STRING} queue_name = ${SERVICE_BUS_REPORT_QUEUE_NAME} + topic_name = ${SERVICE_BUS_REPORT_TOPIC_NAME} + subscription_name = ${SERVICE_BUS_REPORT_TOPIC_SUBSCRIPTION_NAME} } cosmos_db { client { From 2f65c39358193093479c756ec18a1df93fa87567 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 22 May 2024 15:56:08 -0400 Subject: [PATCH 02/16] Initial commit on DASB-421 and DASB-428 --- .../processingstatusapi/plugins/ServiceBus.kt | 25 +++++++++++++------ .../src/main/resources/application.conf | 1 + 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index 21e9cb06..2460c108 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -1,5 +1,6 @@ package gov.cdc.ocio.processingstatusapi.plugins +import com.azure.core.amqp.AmqpTransportType import com.azure.core.amqp.exception.AmqpException import com.azure.core.exception.AzureException import com.azure.messaging.servicebus.* @@ -19,6 +20,7 @@ internal val LOGGER = KtorSimpleLogger("pstatus-report-sink") class AzureServiceBusConfiguration(config: ApplicationConfig) { var connectionString: String = config.tryGetString("connection_string") ?: "" + var serviceBusNamespace: String = config.tryGetString("azure_servicebus_namespace") ?: "" var queueName: String = config.tryGetString("queue_name") ?: "" var topicName: String = config.tryGetString("topic_name") ?: "" var subscriptionName: String = config.tryGetString("subscription_name") ?: "" @@ -30,6 +32,7 @@ val AzureServiceBus = createApplicationPlugin( createConfiguration = ::AzureServiceBusConfiguration) { val connectionString = pluginConfig.connectionString + var serviceBusNamespace= pluginConfig.serviceBusNamespace val queueName = pluginConfig.queueName val topicName = pluginConfig.topicName val subscriptionName = pluginConfig.subscriptionName @@ -38,6 +41,8 @@ val AzureServiceBus = createApplicationPlugin( val processorQueueClient by lazy { ServiceBusClientBuilder() .connectionString(connectionString) + .fullyQualifiedNamespace(serviceBusNamespace) + .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .processor() .queueName(queueName) .processMessage{ context -> processMessage(context) } @@ -49,6 +54,8 @@ val AzureServiceBus = createApplicationPlugin( val processorTopicClient by lazy { ServiceBusClientBuilder() .connectionString(connectionString) + .fullyQualifiedNamespace(serviceBusNamespace) + .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .processor() .topicName(topicName) .subscriptionName(subscriptionName) @@ -82,11 +89,15 @@ val AzureServiceBus = createApplicationPlugin( } fun sendMessage() { - val senderClient = ServiceBusClientBuilder() - .connectionString(connectionString) - .sender() - .queueName(queueName) - .buildClient() + val senderClient = ServiceBusClientBuilder() + .connectionString(connectionString) + .fullyQualifiedNamespace(serviceBusNamespace) + .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) + .sender() + .queueName(queueName) + .buildClient() + // val serviceBusMessages = reports.map {ServiceBusMessage(it)} + // senderClient.sendMessages(serviceBusMessages) try { val message = ServiceBusMessage("Hello, Service Bus!") senderClient.sendMessage(message) @@ -110,8 +121,8 @@ val AzureServiceBus = createApplicationPlugin( on(MonitoringEvent(ApplicationStarted)) { application -> application.log.info("Server is started") - receiveMessages() - // sendMessage() + receiveMessages() + // sendMessage() //****This is not working as well**** } on(MonitoringEvent(ApplicationStopped)) { application -> application.log.info("Server is stopped") diff --git a/pstatus-report-sink-ktor/src/main/resources/application.conf b/pstatus-report-sink-ktor/src/main/resources/application.conf index 9f1db1db..acfac84d 100644 --- a/pstatus-report-sink-ktor/src/main/resources/application.conf +++ b/pstatus-report-sink-ktor/src/main/resources/application.conf @@ -12,6 +12,7 @@ ktor { azure { service_bus { connection_string = ${SERVICE_BUS_CONNECTION_STRING} + azure_servicebus_namespace=${AZURE_SERVICE_BUS_NAMESPACE} queue_name = ${SERVICE_BUS_REPORT_QUEUE_NAME} topic_name = ${SERVICE_BUS_REPORT_TOPIC_NAME} subscription_name = ${SERVICE_BUS_REPORT_TOPIC_SUBSCRIPTION_NAME} From d5238e1e241a5144c7b9c93b489d7306173b169f Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Thu, 23 May 2024 10:43:30 -0400 Subject: [PATCH 03/16] Initial commit on DASB-421 and DASB-428 --- .../gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index 2460c108..6757c462 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -122,7 +122,7 @@ val AzureServiceBus = createApplicationPlugin( on(MonitoringEvent(ApplicationStarted)) { application -> application.log.info("Server is started") receiveMessages() - // sendMessage() //****This is not working as well**** + sendMessage() //****This is not working as well**** } on(MonitoringEvent(ApplicationStopped)) { application -> application.log.info("Server is stopped") From 63c68cbe334d441c70279f4a11cab369e3730779 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 29 May 2024 10:26:32 -0400 Subject: [PATCH 04/16] Initial commit on DASB-421 and DASB-428 --- .../ocio/processingstatusapi/Application.kt | 4 +- .../ocio/processingstatusapi/ReportManager.kt | 84 +++++++++++++++++++ .../cosmos/CosmosClientManager.kt | 1 + .../cosmos/CosmosContainerManager.kt | 5 +- .../cosmos/CosmosRepository.kt | 10 ++- .../models/DeadLetterReport.kt | 67 +++++++++++++++ .../processingstatusapi/plugins/ServiceBus.kt | 53 ++---------- .../plugins/ServiceBusProcessor.kt | 30 +++++-- 8 files changed, 195 insertions(+), 59 deletions(-) create mode 100644 pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt index 1c69b8c1..39e5aff0 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt @@ -1,5 +1,6 @@ package gov.cdc.ocio.processingstatusapi +import gov.cdc.ocio.processingstatusapi.cosmos.CosmosDeadLetterRepository import gov.cdc.ocio.processingstatusapi.cosmos.CosmosRepository import gov.cdc.ocio.processingstatusapi.plugins.configureRouting import gov.cdc.ocio.processingstatusapi.plugins.serviceBusModule @@ -16,8 +17,9 @@ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinAp val uri = environment.config.property("azure.cosmos_db.client.endpoint").getString() val authKey = environment.config.property("azure.cosmos_db.client.key").getString() single { CosmosRepository(uri, authKey, "Reports", "/uploadId") } - single { CosmosRepository(uri, authKey, "Reports-DeadLetter", "/uploadId") } + single { CosmosDeadLetterRepository(uri, authKey, "Reports-DeadLetter", "/uploadId") } } + return modules(listOf(cosmosModule)) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index a829f748..e2b94def 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -3,15 +3,18 @@ package gov.cdc.ocio.processingstatusapi import com.azure.cosmos.models.CosmosItemRequestOptions import com.azure.cosmos.models.CosmosQueryRequestOptions import com.azure.cosmos.models.PartitionKey +import com.azure.messaging.servicebus.models.DeadLetterOptions import com.google.gson.GsonBuilder import com.google.gson.ToNumberPolicy import com.google.gson.reflect.TypeToken +import gov.cdc.ocio.processingstatusapi.cosmos.CosmosDeadLetterRepository import gov.cdc.ocio.processingstatusapi.cosmos.CosmosRepository import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException import gov.cdc.ocio.processingstatusapi.exceptions.InvalidSchemaDefException import gov.cdc.ocio.processingstatusapi.models.DispositionType import gov.cdc.ocio.processingstatusapi.models.Report +import gov.cdc.ocio.processingstatusapi.models.ReportDeadLetter import gov.cdc.ocio.processingstatusapi.models.reports.SchemaDefinition import gov.cdc.ocio.processingstatusapi.models.reports.Source import mu.KotlinLogging @@ -19,6 +22,8 @@ import org.koin.core.component.KoinComponent import org.koin.core.component.inject import java.util.* import io.netty.handler.codec.http.HttpResponseStatus +import java.sql.Timestamp +import java.time.LocalDateTime /** * The report manager interacts directly with CosmosDB to persist and retrieve reports. @@ -28,6 +33,7 @@ import io.netty.handler.codec.http.HttpResponseStatus class ReportManager: KoinComponent { private val cosmosRepository by inject() + private val cosmosDeadLetterRepository by inject() private val logger = KotlinLogging.logger {} @@ -241,6 +247,84 @@ class ReportManager: KoinComponent { throw BadStateException("Failed to create reportId = ${stageReport.reportId}, uploadId = $uploadId") } + @Throws(BadStateException::class) + fun createDeadLetterReport(uploadId: String, + dataStreamId: String, + dataStreamRoute: String, + dispositionType: DispositionType, + contentType: String, + content: String, + deadLetterReasons: List + ): String { + + val deadLetterReportId = UUID.randomUUID().toString() + val deadLetterReport = ReportDeadLetter().apply { + this.id = deadLetterReportId + this.uploadId = uploadId + this.dataStreamId = dataStreamId + this.dataStreamRoute = dataStreamRoute + this.dispositionType= dispositionType.toString() + this.contentType = contentType + this.deadLetterReasons= deadLetterReasons + if (contentType.lowercase() == "json") { + val typeObject = object : TypeToken?>() {}.type + val jsonMap: Map = gson.fromJson(content, typeObject) + this.content = jsonMap + } else + this.content = content + } + + var attempts = 0 + do { + try { + val response = cosmosDeadLetterRepository.reportsDeadLetterContainer.createItem( + deadLetterReport, + PartitionKey(uploadId), + CosmosItemRequestOptions() + ) + + logger.info("Creating dead-letter report, response http status code = ${response?.statusCode}, attempt = ${attempts + 1}, uploadId = $uploadId") + if (response != null) { + when (response.statusCode) { + HttpResponseStatus.OK.code(), HttpResponseStatus.CREATED.code() -> { + logger.info("Created report with reportId = ${response.item?.reportId}, uploadId = $uploadId") + + return deadLetterReportId + } + + HttpResponseStatus.TOO_MANY_REQUESTS.code() -> { + // See: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/performance-tips?tabs=trace-net-core#429 + // https://learn.microsoft.com/en-us/rest/api/cosmos-db/common-cosmosdb-rest-response-headers + // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/troubleshoot-request-rate-too-large?tabs=resource-specific + val recommendedDuration = response.responseHeaders["x-ms-retry-after-ms"] + logger.warn("Received 429 (too many requests) from cosmosdb, attempt ${attempts + 1}, will retry after $recommendedDuration millis, uploadId = $uploadId") + val waitMillis = recommendedDuration?.toLong() + Thread.sleep(waitMillis ?: DEFAULT_RETRY_INTERVAL_MILLIS) + } + + else -> { + // Need to retry regardless + val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) + logger.warn("Received response code ${response.statusCode}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") + Thread.sleep(retryAfterDurationMillis) + } + } + } else { + val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) + logger.warn("Received null response from cosmosdb, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") + Thread.sleep(retryAfterDurationMillis) + } + } catch (e: Exception) { + val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) + logger.error("CreateReport: Exception: ${e.localizedMessage}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") + Thread.sleep(retryAfterDurationMillis) + } + + } while (attempts++ < MAX_RETRY_ATTEMPTS) + + throw BadStateException("Failed to create dead-letterReport reportId = ${deadLetterReport.reportId}, uploadId = $uploadId") + } + private fun getCalculatedRetryDuration(attempt: Int): Long { return DEFAULT_RETRY_INTERVAL_MILLIS * (attempt + 1) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosClientManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosClientManager.kt index 5e1854c7..54fdb132 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosClientManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosClientManager.kt @@ -16,6 +16,7 @@ class CosmosClientManager { .endpoint(uri) .key(authKey) .consistencyLevel(ConsistencyLevel.EVENTUAL) + .gatewayMode() .contentResponseOnWriteEnabled(true) .clientTelemetryEnabled(false) .buildClient() diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt index 8043a60d..49b28d97 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt @@ -17,8 +17,9 @@ class CosmosContainerManager { val logger = KotlinLogging.logger {} logger.info("Create database $databaseName if not exists...") // Create database if not exists - // val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) - // return cosmosClient.getDatabase(databaseResponse.properties.id) + //TODO : These 2 lines are throwing exceptions + // val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) + // return cosmosClient.getDatabase(databaseResponse.properties.id) return cosmosClient.getDatabase(databaseName) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt index 80c21451..23c5eaca 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt @@ -3,9 +3,11 @@ package gov.cdc.ocio.processingstatusapi.cosmos import org.koin.core.component.KoinComponent class CosmosRepository(uri: String, authKey: String, reportsContainerName: String, partitionKey: String): KoinComponent { + val reportsContainer = + CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! +} - val reportsContainer = CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! - - - +class CosmosDeadLetterRepository(uri: String, authKey: String, reportsContainerName: String, partitionKey: String): KoinComponent { + val reportsDeadLetterContainer = + CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt new file mode 100644 index 00000000..ba618c26 --- /dev/null +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt @@ -0,0 +1,67 @@ +package gov.cdc.ocio.processingstatusapi.models + +import com.google.gson.* +import com.google.gson.annotations.SerializedName +import java.time.LocalDateTime +import java.util.* + +/** + * Report for a given stage. + * + * @property uploadId String? + * @property reportId String? + * @property dataStreamId String? + * @property dataStreamRoute String? + * @property dispositionType DispositionType? + * @property dexIngestionDateTime DateTime? + * @property timestamp Date + * @property contentType String? + * @property content String? + * @property deadLetterReasons List + */ +data class ReportDeadLetter( + + var id : String? = null, + + @SerializedName("upload_id") + var uploadId: String? = null, + + @SerializedName("report_id") + var reportId: String? = null, + + @SerializedName("data_stream_id") + var dataStreamId: String? = null, + + @SerializedName("data_stream_route") + var dataStreamRoute: String? = null, + + @SerializedName("disposition_type") + var dispositionType: String? = null, + + + @SerializedName("content_type") + var contentType : String? = null, + + var content: Any? = null, + + val dexIngestDateTime: LocalDateTime = LocalDateTime.now(), + + val timestamp: Date = Date(), + + var deadLetterReasons:List? = null +) { + val contentAsString: String? + get() { + if (content == null) return null + + return when (contentType?.lowercase(Locale.getDefault())) { + "json" -> { + if (content is LinkedHashMap<*, *>) + Gson().toJson(content, MutableMap::class.java).toString() + else + content.toString() + } + else -> content.toString() + } + } +} diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index 6757c462..e922bb7a 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -50,7 +50,7 @@ val AzureServiceBus = createApplicationPlugin( .buildProcessorClient() } - // Initialize Service Bus client for topic +// Initialize Service Bus client for topics val processorTopicClient by lazy { ServiceBusClientBuilder() .connectionString(connectionString) @@ -88,41 +88,9 @@ val AzureServiceBus = createApplicationPlugin( } - fun sendMessage() { - val senderClient = ServiceBusClientBuilder() - .connectionString(connectionString) - .fullyQualifiedNamespace(serviceBusNamespace) - .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) - .sender() - .queueName(queueName) - .buildClient() - // val serviceBusMessages = reports.map {ServiceBusMessage(it)} - // senderClient.sendMessages(serviceBusMessages) - try { - val message = ServiceBusMessage("Hello, Service Bus!") - senderClient.sendMessage(message) - println("Message sent to the queue.") - - } - catch (e:AmqpException){ - println("Non-ServiceBusException occurred : ${e.message}") - } - catch (e:TransportException){ - println("Non-ServiceBusException occurred : ${e.message}") - } - - catch (e:Exception){ - println("Non-ServiceBusException occurred : ${e.message}") - } - finally { - senderClient.close() - } - } - on(MonitoringEvent(ApplicationStarted)) { application -> application.log.info("Server is started") receiveMessages() - sendMessage() //****This is not working as well**** } on(MonitoringEvent(ApplicationStopped)) { application -> application.log.info("Server is stopped") @@ -135,7 +103,6 @@ val AzureServiceBus = createApplicationPlugin( } } - private fun processMessage(context: ServiceBusReceivedMessageContext) { val message = context.message @@ -146,27 +113,23 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { message.body ) try { - ServiceBusProcessor().withMessage(message.body.toString()) + ServiceBusProcessor().withMessage(message) } catch (e: BadRequestException) { LOGGER.warn("Unable to parse the message: {}", e.localizedMessage) } catch (e: IllegalArgumentException) { // TODO : Is this the only exception at this time or more generic one??? LOGGER.warn("Message rejected: {}", e.localizedMessage) - //Writing to deadletter - // TODO : Will this do it for queue and topic based on the context. - // TODO : Should this be "ValidationError" or something generic - context.deadLetter(DeadLetterOptions().setDeadLetterReason("ValidationError").setDeadLetterErrorDescription(e.message)) - - LOGGER.info("Message sent to the dead-letter queue.") + val deadLetterOptions = DeadLetterOptions() + .setDeadLetterReason("Validation failed") + .setDeadLetterErrorDescription(e.message) + context.deadLetter(deadLetterOptions) + LOGGER.info("Message sent to the dead-letter queue.") } catch (e: Exception) { LOGGER.warn("Failed to process service bus message: {}", e.localizedMessage) } } - - - private fun processError(context: ServiceBusErrorContext) { System.out.printf( "Error when receiving messages from namespace: '%s'. Entity: '%s'%n", @@ -200,8 +163,6 @@ private fun processError(context: ServiceBusErrorContext) { } } - - fun Application.serviceBusModule() { install(AzureServiceBus) { // any additional configuration goes here diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index 7c4b9974..6020fd0d 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -12,6 +12,11 @@ import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException import gov.cdc.ocio.processingstatusapi.models.reports.CreateReportSBMessage import gov.cdc.ocio.processingstatusapi.models.reports.Source import mu.KotlinLogging +import java.sql.Timestamp +import java.time.Instant +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.format.DateTimeFormatter import java.util.* /** @@ -36,8 +41,10 @@ class ServiceBusProcessor { * @throws BadRequestException */ @Throws(BadRequestException::class) - fun withMessage(message: String) { - var sbMessage = message + fun withMessage(message: ServiceBusReceivedMessage) { + var sbMessageId = message.messageId + var sbMessage = message.body.toString() + var sbMessageStatus = message.state.name try { logger.info { "Before Message received = $sbMessage" } if (sbMessage.contains("destination_id")) { @@ -47,7 +54,7 @@ class ServiceBusProcessor { sbMessage = sbMessage.replace("event_type", "data_stream_route") } logger.info { "After Message received = $sbMessage" } - createReport(gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) + createReport(sbMessageId,sbMessageStatus,gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) } catch (e:IllegalArgumentException){ println("Validation failed: ${e.message}") @@ -66,7 +73,7 @@ class ServiceBusProcessor { * @throws BadRequestException */ @Throws(BadRequestException::class) - private fun createReport(createReportMessage: CreateReportSBMessage) { + private fun createReport(messageId:String, messageStatus:String,createReportMessage: CreateReportSBMessage) { try { validateReport(createReportMessage) val uploadId = createReportMessage.uploadId @@ -78,8 +85,8 @@ class ServiceBusProcessor { createReportMessage.dataStreamRoute!!, createReportMessage.stageName!!, createReportMessage.contentType!!, - createReportMessage.messageId!!, - createReportMessage.status, + messageId, //createReportMessage.messageId is null + messageStatus, //createReportMessage.status is null createReportMessage.contentAsString!!, // it was Content I changed to ContentAsString createReportMessage.dispositionType, Source.SERVICEBUS @@ -120,6 +127,17 @@ class ServiceBusProcessor { if (missingFields.isNotEmpty()) { val reason ="Missing fields: ${missingFields.joinToString(", ")}" + + // Write the content of the deadletter reports to CosmosDb + ReportManager().createDeadLetterReport( + createReportMessage.uploadId!!, + createReportMessage.dataStreamId!!, + createReportMessage.dataStreamRoute!!, + createReportMessage.dispositionType, + createReportMessage.contentType!!, + createReportMessage.contentAsString!!, + missingFields) + throw IllegalArgumentException(reason) } } From 7cccd2189e9b695ae7dce5bd0b3ab960189ada45 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Fri, 31 May 2024 16:02:33 -0400 Subject: [PATCH 05/16] Initial commit on DASB-39 --- .../ocio/processingstatusapi/ReportManager.kt | 29 +++++-------------- .../models/DeadLetterReport.kt | 17 +---------- .../ocio/processingstatusapi/models/Report.kt | 17 +---------- .../models/reports/CreateReportSBMessage.kt | 15 ---------- .../models/reports/SchemaDefinition.kt | 6 ++-- .../plugins/ServiceBusProcessor.kt | 17 +++++++---- 6 files changed, 25 insertions(+), 76 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index e2b94def..a676a2af 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -4,6 +4,7 @@ import com.azure.cosmos.models.CosmosItemRequestOptions import com.azure.cosmos.models.CosmosQueryRequestOptions import com.azure.cosmos.models.PartitionKey import com.azure.messaging.servicebus.models.DeadLetterOptions +import com.google.gson.Gson import com.google.gson.GsonBuilder import com.google.gson.ToNumberPolicy import com.google.gson.reflect.TypeToken @@ -65,7 +66,7 @@ class ReportManager: KoinComponent { contentType: String, messageId: String?, status: String?, - content: String, + content: Any?, dispositionType: DispositionType, source: Source ): String { @@ -102,7 +103,7 @@ class ReportManager: KoinComponent { contentType: String, messageId: String?, status: String?, - content: String, + content: Any?, dispositionType: DispositionType, source: Source): String { @@ -160,7 +161,7 @@ class ReportManager: KoinComponent { contentType: String, messageId: String?, status: String?, - content: String, + content: Any?, source: Source): String { val stageReportId = UUID.randomUUID().toString() val stageReport = Report().apply { @@ -176,7 +177,7 @@ class ReportManager: KoinComponent { if (contentType.lowercase() == "json") { val typeObject = object : TypeToken?>() {}.type - val jsonMap: Map = gson.fromJson(content, typeObject) + val jsonMap: Map = gson.fromJson(Gson().toJson(content, MutableMap::class.java).toString(), typeObject) this.content = jsonMap } else this.content = content @@ -196,21 +197,7 @@ class ReportManager: KoinComponent { when (response.statusCode) { HttpResponseStatus.OK.code(), HttpResponseStatus.CREATED.code() -> { logger.info("Created report with reportId = ${response.item?.reportId}, uploadId = $uploadId") - val enableReportForwarding = System.getenv("EnableReportForwarding") -// if (enableReportForwarding.equals("True", ignoreCase = true)) { -// // Send message to reports-notifications-queue -// val message = NotificationReport( -// response.item?.reportId, -// uploadId, dataStreamId, dataStreamRoute, -// stageName, -// contentType, -// content, -// messageId, -// status, -// source -// ) -// reportMgrConfig.serviceBusSender.sendMessage(ServiceBusMessage(message.toString())) -// } + return stageReportId } @@ -253,7 +240,7 @@ class ReportManager: KoinComponent { dataStreamRoute: String, dispositionType: DispositionType, contentType: String, - content: String, + content: Any?, deadLetterReasons: List ): String { @@ -268,7 +255,7 @@ class ReportManager: KoinComponent { this.deadLetterReasons= deadLetterReasons if (contentType.lowercase() == "json") { val typeObject = object : TypeToken?>() {}.type - val jsonMap: Map = gson.fromJson(content, typeObject) + val jsonMap: Map = gson.fromJson(Gson().toJson(content, MutableMap::class.java).toString(), typeObject) this.content = jsonMap } else this.content = content diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt index ba618c26..4a3e96fc 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt @@ -49,19 +49,4 @@ data class ReportDeadLetter( val timestamp: Date = Date(), var deadLetterReasons:List? = null -) { - val contentAsString: String? - get() { - if (content == null) return null - - return when (contentType?.lowercase(Locale.getDefault())) { - "json" -> { - if (content is LinkedHashMap<*, *>) - Gson().toJson(content, MutableMap::class.java).toString() - else - content.toString() - } - else -> content.toString() - } - } -} +) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt index 697e438e..f1c2b086 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt @@ -49,19 +49,4 @@ data class Report( var content: Any? = null, val timestamp: Date = Date() -) { - val contentAsString: String? - get() { - if (content == null) return null - - return when (contentType?.lowercase(Locale.getDefault())) { - "json" -> { - if (content is LinkedHashMap<*, *>) - Gson().toJson(content, MutableMap::class.java).toString() - else - content.toString() - } - else -> content.toString() - } - } -} +) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt index 1b127713..244857f4 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt @@ -45,20 +45,5 @@ class CreateReportSBMessage: ServiceBusMessage() { // content type will be a Map<*, *>. val content: Any? = null - val contentAsString: String? - get() { - if (content == null) return null - return when (contentType?.lowercase(Locale.getDefault())) { - "json" -> { - val typeObject = object : TypeToken?>() {}.type - try { - Gson().toJson(content as Map<*, *>, typeObject) - } catch (e: ClassCastException) { - throw BadStateException("content_type indicates json, but the content is not in JSON format") - } - } - else -> content.toString() - } - } } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt index 11d41bce..205d39d8 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt @@ -2,6 +2,7 @@ package gov.cdc.ocio.processingstatusapi.models.reports import com.google.gson.Gson import com.google.gson.annotations.SerializedName +import com.google.gson.reflect.TypeToken import gov.cdc.ocio.processingstatusapi.exceptions.InvalidSchemaDefException /** @@ -67,10 +68,9 @@ open class SchemaDefinition(@SerializedName("schema_name") var schemaName: Strin companion object { @Throws(InvalidSchemaDefException::class) - fun fromJsonString(jsonContent: String?): SchemaDefinition { + fun fromJsonString(jsonContent: Any?): SchemaDefinition { if (jsonContent == null) throw InvalidSchemaDefException("Missing schema definition") - - val schemaDefinition = Gson().fromJson(jsonContent, SchemaDefinition::class.java) + val schemaDefinition= Gson().fromJson(Gson().toJson(jsonContent, MutableMap::class.java).toString(), SchemaDefinition::class.java) if (schemaDefinition?.schemaName.isNullOrEmpty()) throw InvalidSchemaDefException("Invalid schema_name provided") diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index 6020fd0d..1f513cec 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -87,7 +87,7 @@ class ServiceBusProcessor { createReportMessage.contentType!!, messageId, //createReportMessage.messageId is null messageStatus, //createReportMessage.status is null - createReportMessage.contentAsString!!, // it was Content I changed to ContentAsString + createReportMessage.content!!, // it was Content I changed to ContentAsString createReportMessage.dispositionType, Source.SERVICEBUS ) @@ -121,7 +121,7 @@ class ServiceBusProcessor { if (createReportMessage.contentType.isNullOrBlank()) { missingFields.add("contentType") } - if (createReportMessage.contentAsString.isNullOrBlank()) { + if (isNullOrEmpty(createReportMessage.content)) { missingFields.add("content") } @@ -135,13 +135,20 @@ class ServiceBusProcessor { createReportMessage.dataStreamRoute!!, createReportMessage.dispositionType, createReportMessage.contentType!!, - createReportMessage.contentAsString!!, + createReportMessage.content!!, missingFields) throw IllegalArgumentException(reason) } } - - + private fun isNullOrEmpty(value: Any?): Boolean { + return when (value) { + null -> true + is String -> value.isEmpty() + is Collection<*> -> value.isEmpty() + is Map<*, *> -> value.isEmpty() + else -> false // You can adjust this to your needs + } + } } \ No newline at end of file From ff46193c8d29e2a91e2ce1038fa7576e16f8f486 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Fri, 31 May 2024 16:16:01 -0400 Subject: [PATCH 06/16] Code cleanup on DASB-39 --- .../cdc/ocio/processingstatusapi/ReportManager.kt | 4 +--- .../processingstatusapi/models/DeadLetterReport.kt | 1 - .../cdc/ocio/processingstatusapi/models/Report.kt | 2 +- .../models/reports/CreateReportSBMessage.kt | 6 +----- .../models/reports/SchemaDefinition.kt | 1 - .../plugins/ServiceBusProcessor.kt | 13 ++++--------- 6 files changed, 7 insertions(+), 20 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index a676a2af..0a753828 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -3,7 +3,6 @@ package gov.cdc.ocio.processingstatusapi import com.azure.cosmos.models.CosmosItemRequestOptions import com.azure.cosmos.models.CosmosQueryRequestOptions import com.azure.cosmos.models.PartitionKey -import com.azure.messaging.servicebus.models.DeadLetterOptions import com.google.gson.Gson import com.google.gson.GsonBuilder import com.google.gson.ToNumberPolicy @@ -23,8 +22,7 @@ import org.koin.core.component.KoinComponent import org.koin.core.component.inject import java.util.* import io.netty.handler.codec.http.HttpResponseStatus -import java.sql.Timestamp -import java.time.LocalDateTime + /** * The report manager interacts directly with CosmosDB to persist and retrieve reports. diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt index 4a3e96fc..631e2971 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt @@ -13,7 +13,6 @@ import java.util.* * @property dataStreamId String? * @property dataStreamRoute String? * @property dispositionType DispositionType? - * @property dexIngestionDateTime DateTime? * @property timestamp Date * @property contentType String? * @property content String? diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt index f1c2b086..84a96e4b 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/Report.kt @@ -1,6 +1,6 @@ package gov.cdc.ocio.processingstatusapi.models -import com.google.gson.* + import com.google.gson.annotations.SerializedName import java.util.* diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt index 244857f4..ba7dc2e5 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt @@ -1,12 +1,8 @@ package gov.cdc.ocio.processingstatusapi.models.reports -import com.google.gson.Gson import com.google.gson.annotations.SerializedName -import com.google.gson.reflect.TypeToken -import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException import gov.cdc.ocio.processingstatusapi.models.ServiceBusMessage -import java.lang.ClassCastException -import java.util.* + /** * Create a report service bus message. diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt index 205d39d8..450e640a 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/SchemaDefinition.kt @@ -2,7 +2,6 @@ package gov.cdc.ocio.processingstatusapi.models.reports import com.google.gson.Gson import com.google.gson.annotations.SerializedName -import com.google.gson.reflect.TypeToken import gov.cdc.ocio.processingstatusapi.exceptions.InvalidSchemaDefException /** diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index 1f513cec..f86c1110 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -1,8 +1,7 @@ package gov.cdc.ocio.processingstatusapi.plugins -import com.azure.messaging.servicebus.ServiceBusClientBuilder + import com.azure.messaging.servicebus.ServiceBusReceivedMessage -import com.azure.messaging.servicebus.models.DeadLetterOptions import com.google.gson.GsonBuilder import com.google.gson.JsonSyntaxException import com.google.gson.ToNumberPolicy @@ -12,11 +11,7 @@ import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException import gov.cdc.ocio.processingstatusapi.models.reports.CreateReportSBMessage import gov.cdc.ocio.processingstatusapi.models.reports.Source import mu.KotlinLogging -import java.sql.Timestamp -import java.time.Instant -import java.time.LocalDateTime -import java.time.LocalTime -import java.time.format.DateTimeFormatter + import java.util.* /** @@ -42,9 +37,9 @@ class ServiceBusProcessor { */ @Throws(BadRequestException::class) fun withMessage(message: ServiceBusReceivedMessage) { - var sbMessageId = message.messageId + val sbMessageId = message.messageId var sbMessage = message.body.toString() - var sbMessageStatus = message.state.name + val sbMessageStatus = message.state.name try { logger.info { "Before Message received = $sbMessage" } if (sbMessage.contains("destination_id")) { From 647bfc7a82e376d16db6978e0b6344ebcf42d618 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Mon, 3 Jun 2024 11:57:40 -0400 Subject: [PATCH 07/16] Code cleanup on DASB-39 --- .../gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt index 631e2971..4941efea 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt @@ -1,6 +1,6 @@ package gov.cdc.ocio.processingstatusapi.models -import com.google.gson.* + import com.google.gson.annotations.SerializedName import java.time.LocalDateTime import java.util.* From 1a832f86a50d31a33d784ad878987dc22a412f31 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 5 Jun 2024 10:14:48 -0400 Subject: [PATCH 08/16] Updated Readme --- pstatus-report-sink-ktor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pstatus-report-sink-ktor/README.md b/pstatus-report-sink-ktor/README.md index a5e6d231..9f529c5c 100644 --- a/pstatus-report-sink-ktor/README.md +++ b/pstatus-report-sink-ktor/README.md @@ -1,5 +1,5 @@ # Overview -This project is the processing status report sink. It listens for messages on an Azure Service bus, validates the messages, and if validated persists them to CosmosDB. +This project is the processing status report sink. It listens for messages on an Azure Service bus queues and topics, validates the messages, and if validated persists them to CosmosDB. If the validation fails due to missing fields or malformed data, then the message is persisted in cosmosdb under a new dead-letter container and the message is also sent to the dead-letter queue under the configured topic subscription(if the message was processed using the topic listener) This is a microservice built using Ktor that can be built as a docker container image. From 04fe08e3a2d77845ca6f6e70d0a110839915a635 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 5 Jun 2024 11:30:42 -0400 Subject: [PATCH 09/16] Removed the ps1 code --- .../src/main/ps1/SendMessageToQueue.ps1 | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 b/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 index 89a7868b..e69de29b 100644 --- a/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 +++ b/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 @@ -1,23 +0,0 @@ -# Set the parameters for the Service Bus -$connectionString = "Endpoint=sb://ocio-ede-dev-processingstatus-testing.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=EkH3H/i66rBE+xTmTdIqMFmlxzYmsteP7+ASbJMhc5w=" -$queueName = "reports-notifications-queue" - -# Import the necessary assemblies -Add-Type -AssemblyName "System.ServiceModel" -Add-Type -Path (Join-Path -Path (Split-Path $env:PSModulePath -Parent) -ChildPath 'Azure.ServiceBus\2.1.1\lib\netstandard2.0\Microsoft.Azure.ServiceBus.dll') - -# Create a queue client -$queueClient = [Microsoft.Azure.ServiceBus.QueueClient]::new($connectionString, $queueName) - -# Create a message -$messageBody = "Hello, Azure Service Bus!" -$message = [Microsoft.Azure.ServiceBus.Message]::new([System.Text.Encoding]::UTF8.GetBytes($messageBody)) - -# Send the message to the queue -$queueClient.SendAsync($message).GetAwaiter().GetResult() - -# Print confirmation -Write-Output "Sent message: $messageBody" - -# Close the client -$queueClient.CloseAsync().GetAwaiter().GetResult() From 10c057536628a1e1a76af7a4d0c89557c827f138 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 5 Jun 2024 11:33:05 -0400 Subject: [PATCH 10/16] Removed the ps1 code --- pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 diff --git a/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 b/pstatus-report-sink-ktor/src/main/ps1/SendMessageToQueue.ps1 deleted file mode 100644 index e69de29b..00000000 From 5c7becd1287270b9b0ff851cdec82541b7daf399 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Thu, 6 Jun 2024 10:25:57 -0400 Subject: [PATCH 11/16] Modified the CosmosContainerManager to use the original way of creating a cosmos db --- .../processingstatusapi/cosmos/CosmosContainerManager.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt index 49b28d97..0c60e9bc 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt @@ -17,11 +17,9 @@ class CosmosContainerManager { val logger = KotlinLogging.logger {} logger.info("Create database $databaseName if not exists...") // Create database if not exists - //TODO : These 2 lines are throwing exceptions - // val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) - // return cosmosClient.getDatabase(databaseResponse.properties.id) - - return cosmosClient.getDatabase(databaseName) + //Alternate way is : - return cosmosClient.getDatabase(databaseName) + val databaseResponse = cosmosClient.createDatabaseIfNotExists(databaseName) + return cosmosClient.getDatabase(databaseResponse.properties.id) } fun initDatabaseContainer(uri: String, authKey: String, containerName: String, partitionKey: String): CosmosContainer? { From f89aa0d66b3abb4a29a740fae1cdc233fb92cda0 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 12 Jun 2024 11:55:40 -0400 Subject: [PATCH 12/16] Added unit tests and based on that modified code accordingly --- .../gradle/wrapper/gradle-wrapper.properties | 4 +- pstatus-report-sink-ktor/build.gradle | 45 +++++ .../ocio/processingstatusapi/ReportManager.kt | 27 ++- .../processingstatusapi/plugins/ServiceBus.kt | 13 +- .../plugins/ServiceBusProcessor.kt | 56 +++--- ...rvice_bus_content_missing_schema_name.json | 10 + ...ce_bus_content_missing_schema_version.json | 10 + .../data/service_bus_escape_quoted_json.json | 8 + .../kotlin/data/service_bus_good_message.json | 144 ++++++++++++++ .../data/service_bus_good_message_V1.json | 144 ++++++++++++++ .../data/service_bus_missing_content.json | 7 + .../service_bus_missing_content_type.json | 7 + .../service_bus_missing_data_stream_id.json | 7 + ...service_bus_missing_data_stream_route.json | 7 + .../data/service_bus_missing_stage_name.json | 7 + .../data/service_bus_missing_upload_id.json | 143 ++++++++++++++ .../src/test/kotlin/test/ServiceBusTests.kt | 181 ++++++++++++++++++ 17 files changed, 777 insertions(+), 43 deletions(-) create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_name.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_version.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_escape_quoted_json.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message_V1.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content_type.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_id.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_route.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_stage_name.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_upload_id.json create mode 100644 pstatus-report-sink-ktor/src/test/kotlin/test/ServiceBusTests.kt diff --git a/processing-status-api-function-app/gradle/wrapper/gradle-wrapper.properties b/processing-status-api-function-app/gradle/wrapper/gradle-wrapper.properties index bdc9a83b..fbcb72cf 100644 --- a/processing-status-api-function-app/gradle/wrapper/gradle-wrapper.properties +++ b/processing-status-api-function-app/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ +#Fri Jun 07 10:39:15 EDT 2024 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip -networkTimeout=10000 +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/pstatus-report-sink-ktor/build.gradle b/pstatus-report-sink-ktor/build.gradle index 7383d277..1f06272e 100644 --- a/pstatus-report-sink-ktor/build.gradle +++ b/pstatus-report-sink-ktor/build.gradle @@ -45,6 +45,51 @@ dependencies { implementation("io.insert-koin:koin-ktor:3.5.6") testImplementation("io.ktor:ktor-server-tests-jvm") testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version") + + implementation "com.microsoft.azure.functions:azure-functions-java-library:3.0.0" + implementation 'com.microsoft.azure:applicationinsights-core:3.4.19' + implementation 'com.azure:azure-cosmos:4.55.0' + implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.14.2" + implementation 'io.opentelemetry:opentelemetry-api:1.29.0' + implementation 'io.opentelemetry:opentelemetry-sdk:1.29.0' + implementation 'io.opentelemetry:opentelemetry-exporter-logging:1.29.0' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp:1.29.0' + implementation 'io.opentelemetry:opentelemetry-semconv:1.29.0-alpha' + implementation 'com.google.code.gson:gson:2.10.1' + implementation group: 'io.github.microutils', name: 'kotlin-logging-jvm', version: '3.0.5' + implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.3.11' + implementation group: 'ch.qos.logback.contrib', name: 'logback-json-classic', version: '0.1.5' + implementation group: 'ch.qos.logback.contrib', name: 'logback-jackson', version: '0.1.5' + + implementation 'com.azure:azure-messaging-servicebus:7.15.0' + implementation 'com.microsoft.azure:azure-servicebus:3.6.7' + implementation 'com.azure:azure-identity:1.8.0' + + implementation 'org.danilopianini:khttp:1.3.1' + implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.9' + + implementation 'org.owasp.encoder:encoder:1.2.3' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.3.1' + + agent "io.opentelemetry.javaagent:opentelemetry-javaagent:1.29.0" + + testImplementation("org.mockito.kotlin:mockito-kotlin:4.0.0") + testImplementation "org.testng:testng:7.4.0" + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + testImplementation "org.mockito:mockito-inline:3.11.2" + testImplementation "io.mockk:mockk:1.13.9" + + +} +test { + useTestNG() + testLogging { + events "passed", "skipped", "failed" + } + systemProperty("isTestEnvironment", "true") + + // Set the test classpath, if required } jib { diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index 0a753828..05ca3a13 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -76,8 +76,21 @@ class ReportManager: KoinComponent { } catch(e: Exception) { throw BadRequestException("Malformed message: ${e.localizedMessage}") } - - return createReport(uploadId, dataStreamId, dataStreamRoute, stageName, contentType, messageId, status, content, dispositionType, source) + if (System.getProperty("isTestEnvironment") != "true") { + return createReport( + uploadId, + dataStreamId, + dataStreamRoute, + stageName, + contentType, + messageId, + status, + content, + dispositionType, + source + ) + } + return uploadId // this is just as a fallback } /** @@ -233,11 +246,11 @@ class ReportManager: KoinComponent { } @Throws(BadStateException::class) - fun createDeadLetterReport(uploadId: String, - dataStreamId: String, - dataStreamRoute: String, + fun createDeadLetterReport(uploadId: String?, + dataStreamId: String?, + dataStreamRoute: String?, dispositionType: DispositionType, - contentType: String, + contentType: String?, content: Any?, deadLetterReasons: List ): String { @@ -251,7 +264,7 @@ class ReportManager: KoinComponent { this.dispositionType= dispositionType.toString() this.contentType = contentType this.deadLetterReasons= deadLetterReasons - if (contentType.lowercase() == "json") { + if (contentType?.lowercase() == "json") { val typeObject = object : TypeToken?>() {}.type val jsonMap: Map = gson.fromJson(Gson().toJson(content, MutableMap::class.java).toString(), typeObject) this.content = jsonMap diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index e922bb7a..28d92e7b 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -2,17 +2,13 @@ package gov.cdc.ocio.processingstatusapi.plugins import com.azure.core.amqp.AmqpTransportType import com.azure.core.amqp.exception.AmqpException -import com.azure.core.exception.AzureException import com.azure.messaging.servicebus.* import com.azure.messaging.servicebus.models.DeadLetterOptions -import gov.cdc.ocio.processingstatusapi.cosmos.CosmosContainerManager -import gov.cdc.ocio.processingstatusapi.cosmos.CosmosRepository import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import io.ktor.server.application.* import io.ktor.server.application.hooks.* import io.ktor.server.config.* import io.ktor.util.logging.* -import io.netty.channel.ConnectTimeoutException import org.apache.qpid.proton.engine.TransportException import java.util.concurrent.TimeUnit @@ -116,14 +112,15 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { ServiceBusProcessor().withMessage(message) } catch (e: BadRequestException) { LOGGER.warn("Unable to parse the message: {}", e.localizedMessage) - } - catch (e: IllegalArgumentException) { // TODO : Is this the only exception at this time or more generic one??? - LOGGER.warn("Message rejected: {}", e.localizedMessage) val deadLetterOptions = DeadLetterOptions() .setDeadLetterReason("Validation failed") .setDeadLetterErrorDescription(e.message) context.deadLetter(deadLetterOptions) - LOGGER.info("Message sent to the dead-letter queue.") + LOGGER.info("Message sent to the dead-letter queue.") + } + catch (e: IllegalArgumentException) { // TODO : Is this the only exception at this time or more generic one??? + LOGGER.warn("Message rejected: {}", e.localizedMessage) + } catch (e: Exception) { LOGGER.warn("Failed to process service bus message: {}", e.localizedMessage) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index f86c1110..072196bb 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -51,7 +51,7 @@ class ServiceBusProcessor { logger.info { "After Message received = $sbMessage" } createReport(sbMessageId,sbMessageStatus,gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) } - catch (e:IllegalArgumentException){ + catch (e:BadRequestException){ println("Validation failed: ${e.message}") throw e } @@ -74,20 +74,22 @@ class ServiceBusProcessor { val uploadId = createReportMessage.uploadId val stageName = createReportMessage.stageName logger.info("Creating report for uploadId = ${uploadId} with stageName = $stageName") - ReportManager().createReportWithUploadId( - createReportMessage.uploadId!!, - createReportMessage.dataStreamId!!, - createReportMessage.dataStreamRoute!!, - createReportMessage.stageName!!, - createReportMessage.contentType!!, - messageId, //createReportMessage.messageId is null - messageStatus, //createReportMessage.status is null - createReportMessage.content!!, // it was Content I changed to ContentAsString - createReportMessage.dispositionType, - Source.SERVICEBUS - ) + + ReportManager().createReportWithUploadId( + createReportMessage.uploadId!!, + createReportMessage.dataStreamId!!, + createReportMessage.dataStreamRoute!!, + createReportMessage.stageName!!, + createReportMessage.contentType!!, + messageId, //createReportMessage.messageId is null + messageStatus, //createReportMessage.status is null + createReportMessage.content!!, // it was Content I changed to ContentAsString + createReportMessage.dispositionType, + Source.SERVICEBUS + ) + } - catch (e:IllegalArgumentException){ + catch (e:BadRequestException){ throw e } catch (e: Exception) { @@ -122,18 +124,20 @@ class ServiceBusProcessor { if (missingFields.isNotEmpty()) { val reason ="Missing fields: ${missingFields.joinToString(", ")}" - - // Write the content of the deadletter reports to CosmosDb - ReportManager().createDeadLetterReport( - createReportMessage.uploadId!!, - createReportMessage.dataStreamId!!, - createReportMessage.dataStreamRoute!!, - createReportMessage.dispositionType, - createReportMessage.contentType!!, - createReportMessage.content!!, - missingFields) - - throw IllegalArgumentException(reason) + //This should not run for unit tests + if (System.getProperty("isTestEnvironment") != "true") { + // Write the content of the deadletter reports to CosmosDb + ReportManager().createDeadLetterReport( + createReportMessage.uploadId, + createReportMessage.dataStreamId, + createReportMessage.dataStreamRoute, + createReportMessage.dispositionType, + createReportMessage.contentType, + createReportMessage.content, + missingFields + ) + } + throw BadRequestException(reason) } } diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_name.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_name.json new file mode 100644 index 00000000..9bd417c0 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_name.json @@ -0,0 +1,10 @@ +{ + "upload_id": "4f99cfde-7c83-44e5-bd49-206ca4df7e16", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "stage_name": "dex-hl7-validation", + "content_type": "json", + "content": { + "schema_version": "0.0.1" + } +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_version.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_version.json new file mode 100644 index 00000000..f39dd2c0 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_content_missing_schema_version.json @@ -0,0 +1,10 @@ +{ + "upload_id": "4f99cfde-7c83-44e5-bd49-206ca4df7e16", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "stage_name": "dex-hl7-validation", + "content_type": "json", + "content": { + "schema_name": "dex-hl7-validation" + } +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_escape_quoted_json.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_escape_quoted_json.json new file mode 100644 index 00000000..75b1c8a4 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_escape_quoted_json.json @@ -0,0 +1,8 @@ +{ + "upload_id": "12345678", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "stage_name": "dex-upload", + "content_type": "json", + "content": "{\"schema_name\":\"upload\",\"schema_version\":\"1.0\",\"tguid\":\"12345678\",\"offset\":27472691,\"size\":27472691,\"filename\":\"some_upload1.csv\",\"meta_destination_id\":\"ndlp\",\"meta_ext_event\":\"routineImmunization\",\"end_time_epoch_millis\":1700009141546,\"start_time_epoch_millis\":1700009137234,\"metadata\":{\"filename\":\"10MB-test-file\",\"filetype\":\"text\/plain\",\"meta_destination_id\":\"ndlp\",\"meta_ext_event\":\"routineImmunization\",\"meta_ext_source\":\"IZGW\",\"meta_ext_sourceversion\":\"V2022-12-31\",\"meta_ext_entity\":\"DD2\",\"meta_username\":\"ygj6@cdc.gov\",\"meta_ext_objectkey\":\"2b18d70c-8559-11ee-b9d1-0242ac120002\",\"meta_ext_filename\":\"10MB-test-file\",\"meta_ext_submissionperiod\":\"1\"}}" +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message.json new file mode 100644 index 00000000..cb416e79 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message.json @@ -0,0 +1,144 @@ +{ + "upload_id": "4f99cfde-7c83-44e5-bd49-206ca4df7e16", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "stage_name": "dex-hl7-validation", + "content_type": "json", + "content": { + "id": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_uuid": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_info": { + "event_code": "10150", + "route": "vaccine_preventable_diseases", + "reporting_jurisdiction": "13", + "type": "CASE", + "local_record_id": "SAI" + }, + "metadata": { + "provenance": { + "event_id": "9b36fb63-e01e-0066-11ab-2ee31d06a24b", + "event_timestamp": "2023-12-14T16:36:47.910526Z", + "file_uuid": "03cb333c-46c0-4a96-8bab-bf494df87528", + "file_path": "https://tfedemessagestoragedev.blob.core.windows.net/hl7ingress/postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "file_timestamp": "2023-12-14T11:36:47-05:00", + "file_size": 14813, + "single_or_batch": "SINGLE", + "message_hash": "74ada45a7c41c98c574b6c4f5c8e1d26", + "ext_system_provider": "POSTMAN", + "ext_original_file_name": "postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "message_index": 1, + "ext_original_file_timestamp": "2023-03-09T12:05:04.074", + "source_metadata": { + "orginal_file_name": "test-d4731f4d-b119-4d78-a0b1-d322658d1d3d.txt" + } + }, + "processes": [ + { + "status": "SUCCESS", + "process_name": "RECEIVER", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:48.847", + "eventhub_offset": 1125281432952, + "eventhub_sequence_number": 89766, + "configs": [], + "start_processing_time": "2023-12-14T11:36:48.892-05:00", + "end_processing_time": "2023-12-14T11:36:48.982-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": [], + "status": "SUCCESS" + }, + "process_name": "REDACTOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:49.253", + "eventhub_offset": 1189705961896, + "eventhub_sequence_number": 86965, + "configs": [ + "case_config.txt" + ], + "start_processing_time": "2023-12-14T11:36:49.34-05:00", + "end_processing_time": "2023-12-14T11:36:49.436-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": { + "structure": [ + { + "line": 4, + "column": 64, + "path": "OBX[1]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 6, + "column": 68, + "path": "OBX[3]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 236, + "path": "PID[1]-11[1].9", + "description": "The primitive Component PID-11.9 (County/Parish Code) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 195, + "path": "PID[1]-11[1].4", + "description": "The primitive Component PID-11.4 (State or Province) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 1, + "column": 226, + "path": "MSH[1]-10[1]", + "description": "The length of Field MSH-10 (Message Control ID) must be within the range [1, 20]. Value \\u003d \\u0027RIBD_N_Meningitidis_Case_Invalid_03\\u0027", + "category": "Length", + "classification": "Warning" + } + ], + "content": [], + "value-set": [] + }, + "error-count": { + "structure": 0, + "value-set": 0, + "content": 0 + }, + "warning-count": { + "structure": 5, + "value-set": 0, + "content": 0 + }, + "status": "VALID_MESSAGE" + }, + "process_name": "STRUCTURE-VALIDATOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:50.097", + "eventhub_offset": 1120986506816, + "eventhub_sequence_number": 96087, + "configs": [ + "NOTF_ORU_V3.0" + ], + "start_processing_time": "2023-12-14T11:36:50.133-05:00", + "end_processing_time": "2023-12-14T11:36:50.205-05:00" + } + ] + }, + "summary": { + "current_status": "VALID_MESSAGE" + }, + "schema_version": "0.0.1", + "schema_name": "dex-hl7-validation" + } +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message_V1.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message_V1.json new file mode 100644 index 00000000..74b6cd46 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_good_message_V1.json @@ -0,0 +1,144 @@ +{ + "upload_id": "4f99cfde-7c83-44e5-bd49-206ca4df7e16", + "destination_id": "dex-testing", + "event_type": "test-event1", + "stage_name": "dex-hl7-validation", + "content_type": "json", + "content": { + "id": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_uuid": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_info": { + "event_code": "10150", + "route": "vaccine_preventable_diseases", + "reporting_jurisdiction": "13", + "type": "CASE", + "local_record_id": "SAI" + }, + "metadata": { + "provenance": { + "event_id": "9b36fb63-e01e-0066-11ab-2ee31d06a24b", + "event_timestamp": "2023-12-14T16:36:47.910526Z", + "file_uuid": "03cb333c-46c0-4a96-8bab-bf494df87528", + "file_path": "https://tfedemessagestoragedev.blob.core.windows.net/hl7ingress/postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "file_timestamp": "2023-12-14T11:36:47-05:00", + "file_size": 14813, + "single_or_batch": "SINGLE", + "message_hash": "74ada45a7c41c98c574b6c4f5c8e1d26", + "ext_system_provider": "POSTMAN", + "ext_original_file_name": "postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "message_index": 1, + "ext_original_file_timestamp": "2023-03-09T12:05:04.074", + "source_metadata": { + "orginal_file_name": "test-d4731f4d-b119-4d78-a0b1-d322658d1d3d.txt" + } + }, + "processes": [ + { + "status": "SUCCESS", + "process_name": "RECEIVER", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:48.847", + "eventhub_offset": 1125281432952, + "eventhub_sequence_number": 89766, + "configs": [], + "start_processing_time": "2023-12-14T11:36:48.892-05:00", + "end_processing_time": "2023-12-14T11:36:48.982-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": [], + "status": "SUCCESS" + }, + "process_name": "REDACTOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:49.253", + "eventhub_offset": 1189705961896, + "eventhub_sequence_number": 86965, + "configs": [ + "case_config.txt" + ], + "start_processing_time": "2023-12-14T11:36:49.34-05:00", + "end_processing_time": "2023-12-14T11:36:49.436-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": { + "structure": [ + { + "line": 4, + "column": 64, + "path": "OBX[1]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 6, + "column": 68, + "path": "OBX[3]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 236, + "path": "PID[1]-11[1].9", + "description": "The primitive Component PID-11.9 (County/Parish Code) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 195, + "path": "PID[1]-11[1].4", + "description": "The primitive Component PID-11.4 (State or Province) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 1, + "column": 226, + "path": "MSH[1]-10[1]", + "description": "The length of Field MSH-10 (Message Control ID) must be within the range [1, 20]. Value \\u003d \\u0027RIBD_N_Meningitidis_Case_Invalid_03\\u0027", + "category": "Length", + "classification": "Warning" + } + ], + "content": [], + "value-set": [] + }, + "error-count": { + "structure": 0, + "value-set": 0, + "content": 0 + }, + "warning-count": { + "structure": 5, + "value-set": 0, + "content": 0 + }, + "status": "VALID_MESSAGE" + }, + "process_name": "STRUCTURE-VALIDATOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:50.097", + "eventhub_offset": 1120986506816, + "eventhub_sequence_number": 96087, + "configs": [ + "NOTF_ORU_V3.0" + ], + "start_processing_time": "2023-12-14T11:36:50.133-05:00", + "end_processing_time": "2023-12-14T11:36:50.205-05:00" + } + ] + }, + "summary": { + "current_status": "VALID_MESSAGE" + }, + "schema_version": "0.0.1", + "schema_name": "dex-hl7-validation" + } +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content.json new file mode 100644 index 00000000..34bfc52b --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content.json @@ -0,0 +1,7 @@ +{ + "upload_id": "12345678", + "stage_name": "dex-stage1", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "content_type": "json" +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content_type.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content_type.json new file mode 100644 index 00000000..d863dc53 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_content_type.json @@ -0,0 +1,7 @@ +{ + "upload_id": "12345678", + "stage_name": "dex-stage1", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1" + +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_id.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_id.json new file mode 100644 index 00000000..5033bc4d --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_id.json @@ -0,0 +1,7 @@ +{ + "upload_id": "12345678", + "data_stream_route": "test-event1", + "stage_name": "dex-stage1", + "content_type": "json", + "content": "" +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_route.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_route.json new file mode 100644 index 00000000..bbe6926a --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_data_stream_route.json @@ -0,0 +1,7 @@ +{ + "upload_id": "12345678", + "stage_name": "dex-stage1", + "data_stream_id": "dex-testing", + "content_type": "json", + "content": "" +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_stage_name.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_stage_name.json new file mode 100644 index 00000000..4dda4740 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_stage_name.json @@ -0,0 +1,7 @@ +{ + "upload_id": "12345678", + "data_stream_id": "dex-testing", + "data_stream_route": "test-event1", + "content_type": "json", + "content": "" +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_upload_id.json b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_upload_id.json new file mode 100644 index 00000000..df0e2405 --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/data/service_bus_missing_upload_id.json @@ -0,0 +1,143 @@ +{ + "data_stream_route": "test-event1", + "stage_name": "dex-stage1", + "data_stream_id": "dex-testing", + "content_type": "json", + "content": { + "id": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_uuid": "dd1c8c4b-a18d-4449-aaf3-5f0ff505bf38", + "message_info": { + "event_code": "10150", + "route": "vaccine_preventable_diseases", + "reporting_jurisdiction": "13", + "type": "CASE", + "local_record_id": "SAI" + }, + "metadata": { + "provenance": { + "event_id": "9b36fb63-e01e-0066-11ab-2ee31d06a24b", + "event_timestamp": "2023-12-14T16:36:47.910526Z", + "file_uuid": "03cb333c-46c0-4a96-8bab-bf494df87528", + "file_path": "https://tfedemessagestoragedev.blob.core.windows.net/hl7ingress/postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "file_timestamp": "2023-12-14T11:36:47-05:00", + "file_size": 14813, + "single_or_batch": "SINGLE", + "message_hash": "74ada45a7c41c98c574b6c4f5c8e1d26", + "ext_system_provider": "POSTMAN", + "ext_original_file_name": "postman-dtx0-c679a16e-fbfc-4388-9147-18d8a605abc0.txt", + "message_index": 1, + "ext_original_file_timestamp": "2023-03-09T12:05:04.074", + "source_metadata": { + "orginal_file_name": "test-d4731f4d-b119-4d78-a0b1-d322658d1d3d.txt" + } + }, + "processes": [ + { + "status": "SUCCESS", + "process_name": "RECEIVER", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:48.847", + "eventhub_offset": 1125281432952, + "eventhub_sequence_number": 89766, + "configs": [], + "start_processing_time": "2023-12-14T11:36:48.892-05:00", + "end_processing_time": "2023-12-14T11:36:48.982-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": [], + "status": "SUCCESS" + }, + "process_name": "REDACTOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:49.253", + "eventhub_offset": 1189705961896, + "eventhub_sequence_number": 86965, + "configs": [ + "case_config.txt" + ], + "start_processing_time": "2023-12-14T11:36:49.34-05:00", + "end_processing_time": "2023-12-14T11:36:49.436-05:00" + }, + { + "status": "SUCCESS", + "report": { + "entries": { + "structure": [ + { + "line": 4, + "column": 64, + "path": "OBX[1]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 6, + "column": 68, + "path": "OBX[3]-5[1]", + "description": "The primitive Field OBX-5 (Observation Value) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 236, + "path": "PID[1]-11[1].9", + "description": "The primitive Component PID-11.9 (County/Parish Code) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 2, + "column": 195, + "path": "PID[1]-11[1].4", + "description": "The primitive Component PID-11.4 (State or Province) contains at least one unescaped delimiter", + "category": "Unescaped Separator", + "classification": "Warning" + }, + { + "line": 1, + "column": 226, + "path": "MSH[1]-10[1]", + "description": "The length of Field MSH-10 (Message Control ID) must be within the range [1, 20]. Value \\u003d \\u0027RIBD_N_Meningitidis_Case_Invalid_03\\u0027", + "category": "Length", + "classification": "Warning" + } + ], + "content": [], + "value-set": [] + }, + "error-count": { + "structure": 0, + "value-set": 0, + "content": 0 + }, + "warning-count": { + "structure": 5, + "value-set": 0, + "content": 0 + }, + "status": "VALID_MESSAGE" + }, + "process_name": "STRUCTURE-VALIDATOR", + "process_version": "1.0.0", + "eventhub_queued_time": "2023-12-14T16:36:50.097", + "eventhub_offset": 1120986506816, + "eventhub_sequence_number": 96087, + "configs": [ + "NOTF_ORU_V3.0" + ], + "start_processing_time": "2023-12-14T11:36:50.133-05:00", + "end_processing_time": "2023-12-14T11:36:50.205-05:00" + } + ] + }, + "summary": { + "current_status": "VALID_MESSAGE" + }, + "schema_version": "0.0.1", + "schema_name": "dex-hl7-validation" + } +} \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/test/kotlin/test/ServiceBusTests.kt b/pstatus-report-sink-ktor/src/test/kotlin/test/ServiceBusTests.kt new file mode 100644 index 00000000..a940bb8a --- /dev/null +++ b/pstatus-report-sink-ktor/src/test/kotlin/test/ServiceBusTests.kt @@ -0,0 +1,181 @@ + +package test + +import com.google.gson.Gson +import com.microsoft.azure.functions.ExecutionContext +import gov.cdc.ocio.processingstatusapi.cosmos.CosmosContainerManager +import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException +import gov.cdc.ocio.processingstatusapi.models.reports.CreateReportSBMessage +import gov.cdc.ocio.processingstatusapi.models.reports.SchemaDefinition +import gov.cdc.ocio.processingstatusapi.plugins.ServiceBusProcessor +import io.mockk.* +import org.mockito.Mockito +import org.testng.Assert +import org.testng.annotations.BeforeMethod +import org.testng.annotations.Test +import java.io.File +import com.azure.messaging.servicebus.ServiceBusReceivedMessage + +class ServiceBusTests { + + private lateinit var context: ExecutionContext + + @BeforeMethod + fun setUp() { + context = Mockito.mock(ExecutionContext::class.java) + mockkObject(CosmosContainerManager) + every { CosmosContainerManager.initDatabaseContainer(any(), any(),any(), any()) } returns null + + } + @Test + fun testParseJsonContentSchemaDefinition() { + val testMessage = File("./src/test/kotlin/data/service_bus_good_message.json").readText() + + val createReportSBMessage = Gson().fromJson(testMessage, CreateReportSBMessage::class.java) + val schemaDefinition = SchemaDefinition.fromJsonString(createReportSBMessage.content) + + Assert.assertEquals(schemaDefinition.schemaName, "dex-hl7-validation") + Assert.assertEquals(schemaDefinition.schemaVersion, "0.0.1") + } + @Test + fun testServiceBusMessageMissingUploadId() { + val testMessage =File("./src/test/kotlin/data/service_bus_missing_upload_id.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + // Mock file + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: uploadId" + } + Assert.assertTrue(exceptionThrown) + } + @Test + fun testServiceBusMessageMissingDataStreamId() { + val testMessage = File("./src/test/kotlin/data/service_bus_missing_data_stream_id.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: dataStreamId, content" + } + Assert.assertTrue(exceptionThrown) + } + + @Test + fun testServiceBusMessageMissingRoute() { + val testMessage = File("./src/test/kotlin/data/service_bus_missing_data_stream_route.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: dataStreamRoute, content" + } + Assert.assertTrue(exceptionThrown) + } + @Test + fun testServiceBusMessageMissingStageName() { + val testMessage = File("./src/test/kotlin/data/service_bus_missing_stage_name.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: stageName, content" + } + Assert.assertTrue(exceptionThrown) + } + + @Test + fun testServiceBusMessageMissingContentType() { + val testMessage = File("./src/test/kotlin/data/service_bus_missing_content_type.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: contentType, content" + } + Assert.assertTrue(exceptionThrown) + } + + @Test + fun testServiceBusMessageMissingContent() { + val testMessage = File("./src/test/kotlin/data/service_bus_missing_content.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Missing fields: content" + } + Assert.assertTrue(exceptionThrown) + } + + @Test + fun testServiceBusMessageContentMissingSchemaName() { + val testMessage = File("./src/test/kotlin/data/service_bus_content_missing_schema_name.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Invalid schema definition: Invalid schema_name provided" + } + Assert.assertTrue(exceptionThrown) + } + + @Test + fun testServiceBusMessageContentMissingSchemaVersion() { + val testMessage = File("./src/test/kotlin/data/service_bus_content_missing_schema_version.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Invalid schema definition: Invalid schema_version provided" + } + Assert.assertTrue(exceptionThrown) + } + @Test + fun testServiceBusGoodMessage_V1() { + val testMessage = File("./src/test/kotlin/data/service_bus_good_message_V1.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } + + @Test + fun testServiceBusGoodMessage() { + val testMessage = File("./src/test/kotlin/data/service_bus_good_message.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } + + @Test + fun testServiceBusMessageEscapeQuotedJson() { + val testMessage = File("./src/test/kotlin/data/service_bus_escape_quoted_json.json").readText() + val serviceBusReceivedMessage = createServiceBusReceivedMessageFromString(testMessage) + var exceptionThrown = false + try { + ServiceBusProcessor().withMessage(serviceBusReceivedMessage) + } catch(ex: BadRequestException) { + exceptionThrown = ex.localizedMessage == "Malformed message: class java.lang.String cannot be cast to class java.util.Map (java.lang.String and java.util.Map are in module java.base of loader 'bootstrap')" + } + Assert.assertTrue(exceptionThrown) + } +} + +fun createServiceBusReceivedMessageFromString(messageBody: String): ServiceBusReceivedMessage { + val message = mockk(relaxed = true) + val messageId ="MessageId123" + val status ="Active" + every{ message.messageId } returns messageId + every{ message.state.name } returns status + every { message.body.toString() } returns messageBody + return message +} + + From b389b09b1eefe2c709915ceb935939d706f414f3 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Wed, 12 Jun 2024 15:48:29 -0400 Subject: [PATCH 13/16] Added class and function headers Added missing stageName from dead-letter report --- .../ocio/processingstatusapi/ReportManager.kt | 18 +++++++++++++ .../models/DeadLetterReport.kt | 5 +++- .../processingstatusapi/plugins/ServiceBus.kt | 27 ++++++++++++++++++- .../plugins/ServiceBusProcessor.kt | 10 ++++++- 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index 05ca3a13..2914c66d 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -244,11 +244,24 @@ class ReportManager: KoinComponent { throw BadStateException("Failed to create reportId = ${stageReport.reportId}, uploadId = $uploadId") } + /** + * Creates a dead-letter report if there is a malformed data or missing required fields + * + * @param uploadId String + * @param dataStreamId String + * @param dataStreamRoute String + * @param stageName String + * @param contentType String + * @param content String + * @return String + * @throws BadStateException + */ @Throws(BadStateException::class) fun createDeadLetterReport(uploadId: String?, dataStreamId: String?, dataStreamRoute: String?, + stageName: String?, dispositionType: DispositionType, contentType: String?, content: Any?, @@ -261,6 +274,7 @@ class ReportManager: KoinComponent { this.uploadId = uploadId this.dataStreamId = dataStreamId this.dataStreamRoute = dataStreamRoute + this.stageName= stageName this.dispositionType= dispositionType.toString() this.contentType = contentType this.deadLetterReasons= deadLetterReasons @@ -323,6 +337,10 @@ class ReportManager: KoinComponent { throw BadStateException("Failed to create dead-letterReport reportId = ${deadLetterReport.reportId}, uploadId = $uploadId") } + /** + * The function which calculates the interval after which the retry should occur + * @param attempt Int + */ private fun getCalculatedRetryDuration(attempt: Int): Long { return DEFAULT_RETRY_INTERVAL_MILLIS * (attempt + 1) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt index 4941efea..0de9b983 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/DeadLetterReport.kt @@ -6,7 +6,7 @@ import java.time.LocalDateTime import java.util.* /** - * Report for a given stage. + * Dead-LetterReport when there is missing fields or malformed data. * * @property uploadId String? * @property reportId String? @@ -34,6 +34,9 @@ data class ReportDeadLetter( @SerializedName("data_stream_route") var dataStreamRoute: String? = null, + @SerializedName("stage_name") + var stageName: String? = null, + @SerializedName("disposition_type") var dispositionType: String? = null, diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index 28d92e7b..6a1190e2 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -14,6 +14,11 @@ import java.util.concurrent.TimeUnit internal val LOGGER = KtorSimpleLogger("pstatus-report-sink") +/** + * Class which initializes configuration values + * @param config ApplicationConfig + * + */ class AzureServiceBusConfiguration(config: ApplicationConfig) { var connectionString: String = config.tryGetString("connection_string") ?: "" var serviceBusNamespace: String = config.tryGetString("azure_servicebus_namespace") ?: "" @@ -60,7 +65,12 @@ val AzureServiceBus = createApplicationPlugin( .buildProcessorClient() } - // handles received messages + /** + * Function which starts receiving messages from queues and topics + * @throws AmqpException + * @throws TransportException + * @throws Exception generic + */ @Throws(InterruptedException::class) fun receiveMessages() { try { @@ -99,6 +109,13 @@ val AzureServiceBus = createApplicationPlugin( } } +/** + * Function which processes the message received in the queue or topics + * @param context ServiceBusReceivedMessageContext + * @throws BadRequestException + * @throws IllegalArgumentException + * @throws Exception generic + */ private fun processMessage(context: ServiceBusReceivedMessageContext) { val message = context.message @@ -127,6 +144,11 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { } } + +/** + * Function to handle and process the error generated during the processing of messages from queue or topics + * @param context ServiceBusErrorContext + */ private fun processError(context: ServiceBusErrorContext) { System.out.printf( "Error when receiving messages from namespace: '%s'. Entity: '%s'%n", @@ -160,6 +182,9 @@ private fun processError(context: ServiceBusErrorContext) { } } +/** + * The main application module which runs always + */ fun Application.serviceBusModule() { install(AzureServiceBus) { // any additional configuration goes here diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index 072196bb..55816aaa 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -99,7 +99,11 @@ class ServiceBusProcessor { } - // Function to validate report + /** + * Function to validate report attributes for missing required fields + * @param createReportMessage CreateReportSBMessage + * @throws BadRequestException + */ private fun validateReport(createReportMessage: CreateReportSBMessage) { val missingFields = mutableListOf() @@ -131,6 +135,7 @@ class ServiceBusProcessor { createReportMessage.uploadId, createReportMessage.dataStreamId, createReportMessage.dataStreamRoute, + createReportMessage.stageName, createReportMessage.dispositionType, createReportMessage.contentType, createReportMessage.content, @@ -141,6 +146,9 @@ class ServiceBusProcessor { } } + /** Function to check whether the value is null or empty based on its type + * @param value Any + */ private fun isNullOrEmpty(value: Any?): Boolean { return when (value) { null -> true From 4ae75aee61ba972ed9aa8a54650105c23d762c74 Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Thu, 13 Jun 2024 17:15:26 -0400 Subject: [PATCH 14/16] Updated the code based on PR comments --- pstatus-report-sink-ktor/build.gradle | 3 +- .../ocio/processingstatusapi/Application.kt | 20 +- .../ocio/processingstatusapi/ReportManager.kt | 191 +++++++++--------- .../cosmos/CosmosContainerManager.kt | 12 ++ .../cosmos/CosmosRepository.kt | 21 +- .../models/reports/CreateReportSBMessage.kt | 2 +- .../processingstatusapi/plugins/ServiceBus.kt | 10 +- .../plugins/ServiceBusProcessor.kt | 100 +++++---- 8 files changed, 211 insertions(+), 148 deletions(-) diff --git a/pstatus-report-sink-ktor/build.gradle b/pstatus-report-sink-ktor/build.gradle index 1f06272e..1e98b9ba 100644 --- a/pstatus-report-sink-ktor/build.gradle +++ b/pstatus-report-sink-ktor/build.gradle @@ -87,7 +87,8 @@ test { testLogging { events "passed", "skipped", "failed" } - systemProperty("isTestEnvironment", "true") + //Change this to "true" if we want to execute unit tests + systemProperty("isTestEnvironment", "false") // Set the test classpath, if required } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt index 39e5aff0..62493c2e 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/Application.kt @@ -10,23 +10,35 @@ import io.ktor.server.netty.* import org.koin.core.KoinApplication import org.koin.dsl.module import org.koin.ktor.plugin.Koin -import org.koin.mp.KoinPlatform.getKoin + +/** + * Load the environment configuration values + * Instantiate a singleton CosmosDatabase container instance + * @param environment ApplicationEnvironment + */ fun KoinApplication.loadKoinModules(environment: ApplicationEnvironment): KoinApplication { val cosmosModule = module { val uri = environment.config.property("azure.cosmos_db.client.endpoint").getString() val authKey = environment.config.property("azure.cosmos_db.client.key").getString() - single { CosmosRepository(uri, authKey, "Reports", "/uploadId") } - single { CosmosDeadLetterRepository(uri, authKey, "Reports-DeadLetter", "/uploadId") } + single(createdAtStart = true) { CosmosRepository(uri, authKey, "Reports", "/uploadId") } + single(createdAtStart = true) { CosmosDeadLetterRepository(uri, authKey, "Reports-DeadLetter", "/uploadId") } } return modules(listOf(cosmosModule)) } +/** + * The main function + * @param args Array + */ fun main(args: Array) { embeddedServer(Netty, commandLineEnvironment(args)).start(wait = true) } +/** + * The main application module which always runs and loads other modules + */ fun Application.module() { configureRouting() serviceBusModule() @@ -34,6 +46,4 @@ fun Application.module() { loadKoinModules(environment) } - // Preload the koin module so the CosmosDB client is already initialized on the first call - getKoin().get() } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index 2914c66d..e08e7bef 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -123,14 +123,14 @@ class ReportManager: KoinComponent { logger.info("Replacing report(s) with stage name = $stageName") // Delete all stages matching the report ID with the same stage name val sqlQuery = "select * from ${reportMgrConfig.reportsContainerName} r where r.uploadId = '$uploadId' and r.stageName = '$stageName'" - val items = cosmosRepository.reportsContainer.queryItems( + val items = cosmosRepository.reportsContainer?.queryItems( sqlQuery, CosmosQueryRequestOptions(), Report::class.java ) if ((items?.count() ?: 0) > 0) { try { items?.forEach { - cosmosRepository.reportsContainer.deleteItem( + cosmosRepository.reportsContainer?.deleteItem( it.id, PartitionKey(it.uploadId), CosmosItemRequestOptions() @@ -193,56 +193,7 @@ class ReportManager: KoinComponent { } else this.content = content } - - var attempts = 0 - do { - try { - val response = cosmosRepository.reportsContainer.createItem( - stageReport, - PartitionKey(uploadId), - CosmosItemRequestOptions() - ) - - logger.info("Creating report, response http status code = ${response?.statusCode}, attempt = ${attempts + 1}, uploadId = $uploadId") - if (response != null) { - when (response.statusCode) { - HttpResponseStatus.OK.code(), HttpResponseStatus.CREATED.code() -> { - logger.info("Created report with reportId = ${response.item?.reportId}, uploadId = $uploadId") - - return stageReportId - } - - HttpResponseStatus.TOO_MANY_REQUESTS.code() -> { - // See: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/performance-tips?tabs=trace-net-core#429 - // https://learn.microsoft.com/en-us/rest/api/cosmos-db/common-cosmosdb-rest-response-headers - // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/troubleshoot-request-rate-too-large?tabs=resource-specific - val recommendedDuration = response.responseHeaders["x-ms-retry-after-ms"] - logger.warn("Received 429 (too many requests) from cosmosdb, attempt ${attempts + 1}, will retry after $recommendedDuration millis, uploadId = $uploadId") - val waitMillis = recommendedDuration?.toLong() - Thread.sleep(waitMillis ?: DEFAULT_RETRY_INTERVAL_MILLIS) - } - - else -> { - // Need to retry regardless - val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) - logger.warn("Received response code ${response.statusCode}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") - Thread.sleep(retryAfterDurationMillis) - } - } - } else { - val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) - logger.warn("Received null response from cosmosdb, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") - Thread.sleep(retryAfterDurationMillis) - } - } catch (e: Exception) { - val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) - logger.error("CreateReport: Exception: ${e.localizedMessage}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") - Thread.sleep(retryAfterDurationMillis) - } - - } while (attempts++ < MAX_RETRY_ATTEMPTS) - - throw BadStateException("Failed to create reportId = ${stageReport.reportId}, uploadId = $uploadId") + return createReportItem(uploadId,stageReportId,stageReport) } /** * Creates a dead-letter report if there is a malformed data or missing required fields @@ -272,79 +223,129 @@ class ReportManager: KoinComponent { val deadLetterReport = ReportDeadLetter().apply { this.id = deadLetterReportId this.uploadId = uploadId + this.reportId = deadLetterReportId this.dataStreamId = dataStreamId this.dataStreamRoute = dataStreamRoute this.stageName= stageName this.dispositionType= dispositionType.toString() this.contentType = contentType this.deadLetterReasons= deadLetterReasons - if (contentType?.lowercase() == "json") { + if (contentType?.lowercase() == "json" && !isNullOrEmpty(content)) { val typeObject = object : TypeToken?>() {}.type val jsonMap: Map = gson.fromJson(Gson().toJson(content, MutableMap::class.java).toString(), typeObject) this.content = jsonMap } else this.content = content } + return createReportItem(uploadId,deadLetterReportId,deadLetterReport) + } + /** + * The function which calculates the interval after which the retry should occur + * @param attempt Int + */ + private fun getCalculatedRetryDuration(attempt: Int): Long { + return DEFAULT_RETRY_INTERVAL_MILLIS * (attempt + 1) + } + /** Function to check whether the value is null or empty based on its type + * @param value Any + */ + private fun isNullOrEmpty(value: Any?): Boolean { + return when (value) { + null -> true + is String -> value.isEmpty() + is Collection<*> -> value.isEmpty() + is Map<*, *> -> value.isEmpty() + else -> false // You can adjust this to your needs + } + } + + fun createReportItem(uploadId: String?, reportId:String, reportType:Any) : String{ + + var responseReportId = "" + var reportTypeName = "Report" + var statusCode:Int? = null + var isValidResponse = false + var recommendedDuration:String?= null var attempts = 0 + do { try { - val response = cosmosDeadLetterRepository.reportsDeadLetterContainer.createItem( - deadLetterReport, - PartitionKey(uploadId), - CosmosItemRequestOptions() - ) + //use when here to determing whether report type is StageReport or DeadLetterReport + when (reportType) { + is Report -> { + val response = cosmosRepository.reportsContainer?.createItem( + reportType, + PartitionKey(uploadId), + CosmosItemRequestOptions()) - logger.info("Creating dead-letter report, response http status code = ${response?.statusCode}, attempt = ${attempts + 1}, uploadId = $uploadId") - if (response != null) { - when (response.statusCode) { - HttpResponseStatus.OK.code(), HttpResponseStatus.CREATED.code() -> { - logger.info("Created report with reportId = ${response.item?.reportId}, uploadId = $uploadId") - - return deadLetterReportId - } + isValidResponse = response!=null + responseReportId = response?.item?.reportId ?: "0" + statusCode = response?.statusCode + recommendedDuration = response?.responseHeaders?.get("x-ms-retry-after-ms") - HttpResponseStatus.TOO_MANY_REQUESTS.code() -> { - // See: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/performance-tips?tabs=trace-net-core#429 - // https://learn.microsoft.com/en-us/rest/api/cosmos-db/common-cosmosdb-rest-response-headers - // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/troubleshoot-request-rate-too-large?tabs=resource-specific - val recommendedDuration = response.responseHeaders["x-ms-retry-after-ms"] - logger.warn("Received 429 (too many requests) from cosmosdb, attempt ${attempts + 1}, will retry after $recommendedDuration millis, uploadId = $uploadId") - val waitMillis = recommendedDuration?.toLong() - Thread.sleep(waitMillis ?: DEFAULT_RETRY_INTERVAL_MILLIS) - } + } + is ReportDeadLetter -> { + val response = cosmosDeadLetterRepository.reportsDeadLetterContainer?.createItem( + reportType, + PartitionKey(uploadId), + CosmosItemRequestOptions()) - else -> { - // Need to retry regardless - val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) - logger.warn("Received response code ${response.statusCode}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") - Thread.sleep(retryAfterDurationMillis) - } + isValidResponse = response!=null + reportTypeName ="dead-letter report" + responseReportId = response?.item?.reportId ?: "0" + statusCode = response?.statusCode + recommendedDuration = response?.responseHeaders?.get("x-ms-retry-after-ms") } - } else { - val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) - logger.warn("Received null response from cosmosdb, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") - Thread.sleep(retryAfterDurationMillis) } - } catch (e: Exception) { + logger.info("Creating ${reportTypeName}, response http status code = ${statusCode}, attempt = ${attempts + 1}, uploadId = $uploadId") + if(isValidResponse){ + + when (statusCode) { + HttpResponseStatus.OK.code(), HttpResponseStatus.CREATED.code() -> { + logger.info("Created report with reportId = ${responseReportId}, uploadId = $uploadId") + return reportId + } + + HttpResponseStatus.TOO_MANY_REQUESTS.code() -> { + // See: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/performance-tips?tabs=trace-net-core#429 + // https://learn.microsoft.com/en-us/rest/api/cosmos-db/common-cosmosdb-rest-response-headers + // https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/troubleshoot-request-rate-too-large?tabs=resource-specific + + logger.warn("Received 429 (too many requests) from cosmosdb, attempt ${attempts + 1}, will retry after $recommendedDuration millis, uploadId = $uploadId") + val waitMillis = recommendedDuration?.toLong() + Thread.sleep(waitMillis ?: DEFAULT_RETRY_INTERVAL_MILLIS) + } + + else -> { + // Need to retry regardless + val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) + logger.warn("Received response code ${statusCode}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") + Thread.sleep(retryAfterDurationMillis) + } + } + } + else { + val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) + logger.warn("Received null response from cosmosdb, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") + Thread.sleep(retryAfterDurationMillis) + } + } + catch (e: Exception) { val retryAfterDurationMillis = getCalculatedRetryDuration(attempts) logger.error("CreateReport: Exception: ${e.localizedMessage}, attempt ${attempts + 1}, will retry after $retryAfterDurationMillis millis, uploadId = $uploadId") Thread.sleep(retryAfterDurationMillis) } - } while (attempts++ < MAX_RETRY_ATTEMPTS) - throw BadStateException("Failed to create dead-letterReport reportId = ${deadLetterReport.reportId}, uploadId = $uploadId") - } - /** - * The function which calculates the interval after which the retry should occur - * @param attempt Int - */ - private fun getCalculatedRetryDuration(attempt: Int): Long { - return DEFAULT_RETRY_INTERVAL_MILLIS * (attempt + 1) + } while (attempts++ < MAX_RETRY_ATTEMPTS) + throw BadStateException("Failed to create dead-letterReport reportId = ${responseReportId}, uploadId = $uploadId") } + + + companion object { const val DEFAULT_RETRY_INTERVAL_MILLIS = 500L const val MAX_RETRY_ATTEMPTS = 100 diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt index 0c60e9bc..15b79c6f 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosContainerManager.kt @@ -12,6 +12,11 @@ class CosmosContainerManager { companion object { + /** + * Function which creates the Cosmos db if not exists , and returns the db instance with which we can get a container instance + * @param cosmosClient CosmosClient + * @param databaseName String + */ @Throws(Exception::class) fun createDatabaseIfNotExists(cosmosClient: CosmosClient, databaseName: String): CosmosDatabase? { val logger = KotlinLogging.logger {} @@ -22,6 +27,13 @@ class CosmosContainerManager { return cosmosClient.getDatabase(databaseResponse.properties.id) } + /** + * The function which creates the cosmos container instance + * @param uri String + * @param authKey String + * @param containerName String + * @param partitionKey String + */ fun initDatabaseContainer(uri: String, authKey: String, containerName: String, partitionKey: String): CosmosContainer? { val logger = KotlinLogging.logger {} try { diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt index 23c5eaca..f29c6c3e 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/cosmos/CosmosRepository.kt @@ -2,12 +2,27 @@ package gov.cdc.ocio.processingstatusapi.cosmos import org.koin.core.component.KoinComponent +/** + * The class which initializes and creates an instance of a cosmos db reports container + * @param uri :String + * @param authKey:String + * @param reportsContainerName:String + * @param partitionKey:String + * + */ class CosmosRepository(uri: String, authKey: String, reportsContainerName: String, partitionKey: String): KoinComponent { val reportsContainer = - CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! + CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey) } - +/** + * The class which initializes and creates an instance of a cosmos db reports deadletter container + * @param uri :String + * @param authKey:String + * @param reportsContainerName:String + * @param partitionKey:String + * + */ class CosmosDeadLetterRepository(uri: String, authKey: String, reportsContainerName: String, partitionKey: String): KoinComponent { val reportsDeadLetterContainer = - CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey)!! + CosmosContainerManager.initDatabaseContainer(uri, authKey, reportsContainerName, partitionKey) } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt index ba7dc2e5..dc32c817 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/reports/CreateReportSBMessage.kt @@ -39,7 +39,7 @@ class CreateReportSBMessage: ServiceBusMessage() { // content will vary depending on content_type so make it any. For example, if content_type is json then the // content type will be a Map<*, *>. - val content: Any? = null + var content: Any? = null } \ No newline at end of file diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt index 6a1190e2..1f82ff54 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBus.kt @@ -33,7 +33,7 @@ val AzureServiceBus = createApplicationPlugin( createConfiguration = ::AzureServiceBusConfiguration) { val connectionString = pluginConfig.connectionString - var serviceBusNamespace= pluginConfig.serviceBusNamespace + val serviceBusNamespace= pluginConfig.serviceBusNamespace val queueName = pluginConfig.queueName val topicName = pluginConfig.topicName val subscriptionName = pluginConfig.subscriptionName @@ -127,7 +127,9 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { ) try { ServiceBusProcessor().withMessage(message) - } catch (e: BadRequestException) { + } + //This will handle all missing required fields, invalid schema definition and malformed json all under the BadRequest exception and writes to dead-letter queue or topics depending on the context + catch (e: BadRequestException) { LOGGER.warn("Unable to parse the message: {}", e.localizedMessage) val deadLetterOptions = DeadLetterOptions() .setDeadLetterReason("Validation failed") @@ -135,10 +137,6 @@ private fun processMessage(context: ServiceBusReceivedMessageContext) { context.deadLetter(deadLetterOptions) LOGGER.info("Message sent to the dead-letter queue.") } - catch (e: IllegalArgumentException) { // TODO : Is this the only exception at this time or more generic one??? - LOGGER.warn("Message rejected: {}", e.localizedMessage) - - } catch (e: Exception) { LOGGER.warn("Failed to process service bus message: {}", e.localizedMessage) } diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt index 55816aaa..f8467ac4 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/ServiceBusProcessor.kt @@ -8,7 +8,9 @@ import com.google.gson.ToNumberPolicy import gov.cdc.ocio.processingstatusapi.ReportManager import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import gov.cdc.ocio.processingstatusapi.exceptions.BadStateException +import gov.cdc.ocio.processingstatusapi.exceptions.InvalidSchemaDefException import gov.cdc.ocio.processingstatusapi.models.reports.CreateReportSBMessage +import gov.cdc.ocio.processingstatusapi.models.reports.SchemaDefinition import gov.cdc.ocio.processingstatusapi.models.reports.Source import mu.KotlinLogging @@ -49,13 +51,11 @@ class ServiceBusProcessor { sbMessage = sbMessage.replace("event_type", "data_stream_route") } logger.info { "After Message received = $sbMessage" } - createReport(sbMessageId,sbMessageStatus,gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) - } - catch (e:BadRequestException){ + createReport(sbMessageId, sbMessageStatus, gson.fromJson(sbMessage, CreateReportSBMessage::class.java)) + } catch (e: BadRequestException) { println("Validation failed: ${e.message}") throw e - } - catch (e: JsonSyntaxException) { + } catch (e: JsonSyntaxException) { logger.error("Failed to parse CreateReportSBMessage: ${e.localizedMessage}") throw BadStateException("Unable to interpret the create report message") } @@ -68,31 +68,29 @@ class ServiceBusProcessor { * @throws BadRequestException */ @Throws(BadRequestException::class) - private fun createReport(messageId:String, messageStatus:String,createReportMessage: CreateReportSBMessage) { + private fun createReport(messageId: String, messageStatus: String, createReportMessage: CreateReportSBMessage) { try { validateReport(createReportMessage) val uploadId = createReportMessage.uploadId val stageName = createReportMessage.stageName logger.info("Creating report for uploadId = ${uploadId} with stageName = $stageName") - ReportManager().createReportWithUploadId( - createReportMessage.uploadId!!, - createReportMessage.dataStreamId!!, - createReportMessage.dataStreamRoute!!, - createReportMessage.stageName!!, - createReportMessage.contentType!!, - messageId, //createReportMessage.messageId is null - messageStatus, //createReportMessage.status is null - createReportMessage.content!!, // it was Content I changed to ContentAsString - createReportMessage.dispositionType, - Source.SERVICEBUS - ) - - } - catch (e:BadRequestException){ - throw e - } - catch (e: Exception) { + ReportManager().createReportWithUploadId( + createReportMessage.uploadId!!, + createReportMessage.dataStreamId!!, + createReportMessage.dataStreamRoute!!, + createReportMessage.stageName!!, + createReportMessage.contentType!!, + messageId, //createReportMessage.messageId is null + messageStatus, //createReportMessage.status is null + createReportMessage.content!!, // it was Content I changed to ContentAsString + createReportMessage.dispositionType, + Source.SERVICEBUS + ) + + } catch (e: BadRequestException) { + throw e + } catch (e: Exception) { println("Failed to process service bus message:${e.message}") } @@ -100,37 +98,53 @@ class ServiceBusProcessor { } /** - * Function to validate report attributes for missing required fields + * Function to validate report attributes for missing required fields, for schema validation and malformed content message * @param createReportMessage CreateReportSBMessage * @throws BadRequestException */ private fun validateReport(createReportMessage: CreateReportSBMessage) { - val missingFields = mutableListOf() + val invalidData = mutableListOf() + var reason = "" if (createReportMessage.uploadId.isNullOrBlank()) { - missingFields.add("uploadId") + invalidData.add("uploadId") } if (createReportMessage.dataStreamId.isNullOrBlank()) { - missingFields.add("dataStreamId") + invalidData.add("dataStreamId") } if (createReportMessage.dataStreamRoute.isNullOrBlank()) { - missingFields.add("dataStreamRoute") + invalidData.add("dataStreamRoute") } if (createReportMessage.stageName.isNullOrBlank()) { - missingFields.add("stageName") + invalidData.add("stageName") } if (createReportMessage.contentType.isNullOrBlank()) { - missingFields.add("contentType") + invalidData.add("contentType") } if (isNullOrEmpty(createReportMessage.content)) { - missingFields.add("content") + invalidData.add("content") } + if (invalidData.isNotEmpty()) { + reason = "Missing fields: ${invalidData.joinToString(", ")}" + } else { + try { + SchemaDefinition.fromJsonString(createReportMessage.content) + } catch (e: InvalidSchemaDefException) { + reason = "Invalid schema definition: ${e.localizedMessage}" + invalidData.add(reason) + + } catch (e: Exception) { + reason = "Malformed message: ${e.localizedMessage}" + invalidData.add(reason) + //convert content to base64 encoded string + createReportMessage.content = convertToStringOrBase64(createReportMessage.content) + } + } - if (missingFields.isNotEmpty()) { - val reason ="Missing fields: ${missingFields.joinToString(", ")}" + if (invalidData.isNotEmpty()) { //This should not run for unit tests if (System.getProperty("isTestEnvironment") != "true") { - // Write the content of the deadletter reports to CosmosDb + // Write the content of the dead-letter reports to CosmosDb ReportManager().createDeadLetterReport( createReportMessage.uploadId, createReportMessage.dataStreamId, @@ -139,11 +153,11 @@ class ServiceBusProcessor { createReportMessage.dispositionType, createReportMessage.contentType, createReportMessage.content, - missingFields + invalidData ) } throw BadRequestException(reason) - } + } } /** Function to check whether the value is null or empty based on its type @@ -158,4 +172,16 @@ class ServiceBusProcessor { else -> false // You can adjust this to your needs } } + + /** + * Convert the malformed content to base64 encoded string + * @param obj Any + */ + private fun convertToStringOrBase64(obj: Any?): String { + val bytes = when (obj) { + is ByteArray -> obj + else -> obj.toString().toByteArray() + } + return Base64.getEncoder().encodeToString(bytes) + } } \ No newline at end of file From 1dd3dee0bbff5d774bf95eff0ccab4b23c2bd3db Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Thu, 13 Jun 2024 17:33:03 -0400 Subject: [PATCH 15/16] Added code to check for base64 encoded before writing to DeadLetter Cosmos container --- .../gov/cdc/ocio/processingstatusapi/ReportManager.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index e08e7bef..f272eae2 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -230,7 +230,7 @@ class ReportManager: KoinComponent { this.dispositionType= dispositionType.toString() this.contentType = contentType this.deadLetterReasons= deadLetterReasons - if (contentType?.lowercase() == "json" && !isNullOrEmpty(content)) { + if (contentType?.lowercase() == "json" && !isNullOrEmpty(content) && !isBase64Encoded(content.toString())) { val typeObject = object : TypeToken?>() {}.type val jsonMap: Map = gson.fromJson(Gson().toJson(content, MutableMap::class.java).toString(), typeObject) this.content = jsonMap @@ -260,6 +260,11 @@ class ReportManager: KoinComponent { } } + private fun isBase64Encoded(value: String): Boolean { + val base64Pattern = "^[A-Za-z0-9+/]+={0,2}$" + return value.matches(base64Pattern.toRegex()) + } + fun createReportItem(uploadId: String?, reportId:String, reportType:Any) : String{ var responseReportId = "" From 82203c732a5985e56752acb97404689af997595d Mon Sep 17 00:00:00 2001 From: Manu Kesava Date: Thu, 13 Jun 2024 18:24:44 -0400 Subject: [PATCH 16/16] Added function headers --- .../cdc/ocio/processingstatusapi/ReportManager.kt | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt index f272eae2..9ca70187 100644 --- a/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt +++ b/pstatus-report-sink-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/ReportManager.kt @@ -260,12 +260,23 @@ class ReportManager: KoinComponent { } } + /** + * The function which checks whether the passed string is Base64 Encoded or not using Regex + * @param value String + */ private fun isBase64Encoded(value: String): Boolean { val base64Pattern = "^[A-Za-z0-9+/]+={0,2}$" return value.matches(base64Pattern.toRegex()) } - fun createReportItem(uploadId: String?, reportId:String, reportType:Any) : String{ + /** + * The common function which writes to cosmos container based on the report type + * @param uploadId String + * @param reportId String + * @reportType Any + */ + + private fun createReportItem(uploadId: String?, reportId:String, reportType:Any) : String{ var responseReportId = "" var reportTypeName = "Report"